My only experience with multithreading is decorating for-loops with Threads.@threads
, as well as using the parallelized functions in the ThreadsX
package, but I’d like to learn multithreading at a lower level. Suppose I’d like to run 5 tasks, A, B, C, D and E. The three tasks, A, B and C can be run in parallel. Task D can only be run after both A and B have completed, and task E can only be run after both B and C have completed. How do I ask Julia to dynamically schedule these tasks?
The package Dagger.jl seems to offer one way to do it:
https://juliaparallel.org/Dagger.jl/dev/task-spawning/
However, I’d like to know how I can do it myself with low-level primitives like channels.
Often you have dependencies between tasks because you need the output of a previous task as input to a later one and then you probably want to pass that data via the channel. But ignoring that and only focusing on the scheduling, this should give an idea how it can be done:
using Base.Threads: @spawn
function A(ABD)
println("Starting A")
sleep(rand(1:5))
println("Finished A")
put!(ABD, "")
end
function B(ABD, BCE)
println("Starting B")
sleep(rand(1:5))
println("Finished B")
put!(ABD, "")
put!(BCE, "")
end
function C(BCE)
println("Starting C")
sleep(rand(1:5))
println("Finished C")
put!(BCE, "")
end
function D(ABD)
take!(ABD)
take!(ABD)
println("Starting D")
sleep(rand(1:5))
println("Finished D")
end
function E(BCE)
take!(BCE)
take!(BCE)
println("Starting E")
sleep(rand(1:5))
println("Finished E")
end
function run()
ABD = Channel(2)
BCE = Channel(2)
@sync begin
@spawn A(ABD)
@spawn B(ABD, BCE)
@spawn C(BCE)
@spawn D(ABD)
@spawn E(BCE)
end
return
end
2 Likes
Instead of using Channel
s to synchronize the tasks, you can use wait
to ask each task to delay its execution until its dependencies have completed:
using Base.Threads: @spawn
function run_task(name)
println("Starting $name")
sleep(rand(1:5))
println("Finished $name")
end
function run()
@sync begin
A = @spawn run_task(:A)
B = @spawn run_task(:B)
C = @spawn run_task(:C)
D = @spawn begin
wait(A); wait(B)
run_task(:D)
end
E = @spawn begin
wait(B); wait(C)
run_task(:E)
end
end
end
This is more or less what’s performed internally by DataFlowTasks.jl
.
2 Likes