Multithreading - schedule tasks with dependencies?

My only experience with multithreading is decorating for-loops with Threads.@threads, as well as using the parallelized functions in the ThreadsX package, but I’d like to learn multithreading at a lower level. Suppose I’d like to run 5 tasks, A, B, C, D and E. The three tasks, A, B and C can be run in parallel. Task D can only be run after both A and B have completed, and task E can only be run after both B and C have completed. How do I ask Julia to dynamically schedule these tasks?

The package Dagger.jl seems to offer one way to do it:
https://juliaparallel.org/Dagger.jl/dev/task-spawning/
However, I’d like to know how I can do it myself with low-level primitives like channels.

Often you have dependencies between tasks because you need the output of a previous task as input to a later one and then you probably want to pass that data via the channel. But ignoring that and only focusing on the scheduling, this should give an idea how it can be done:

using Base.Threads: @spawn

function A(ABD)
    println("Starting A")
    sleep(rand(1:5))
    println("Finished A")
    put!(ABD, "")
end

function B(ABD, BCE)
    println("Starting B")
    sleep(rand(1:5))
    println("Finished B")
    put!(ABD, "")
    put!(BCE, "")
end

function C(BCE)
    println("Starting C")
    sleep(rand(1:5))
    println("Finished C")
    put!(BCE, "")
end

function D(ABD)
    take!(ABD)
    take!(ABD)
    println("Starting D")
    sleep(rand(1:5))
    println("Finished D")
end

function E(BCE)
    take!(BCE)
    take!(BCE)
    println("Starting E")
    sleep(rand(1:5))
    println("Finished E")
end

function run()
    ABD = Channel(2)
    BCE = Channel(2)
    @sync begin
        @spawn A(ABD)
        @spawn B(ABD, BCE)
        @spawn C(BCE)
        @spawn D(ABD)
        @spawn E(BCE)
    end
    return
end
2 Likes

Instead of using Channels to synchronize the tasks, you can use wait to ask each task to delay its execution until its dependencies have completed:

using Base.Threads: @spawn

function run_task(name)
    println("Starting $name")
    sleep(rand(1:5))
    println("Finished $name")
end

function run()
    @sync begin
        A = @spawn run_task(:A)
        B = @spawn run_task(:B)
        C = @spawn run_task(:C)
        D = @spawn begin
            wait(A); wait(B)
            run_task(:D)
        end
        E = @spawn begin
            wait(B); wait(C)
            run_task(:E)
        end
    end
end

This is more or less what’s performed internally by DataFlowTasks.jl.

2 Likes