Pipeline of async tasks with channels

I want to do some work on a list, and each step of the “work” depends on the previous one. Some steps can be IO-bound while some other can be CPU-bound. I want for example to organize as follows:

  • do step 1 in parallel with at most 5 threads; put results in a channel items1
  • do step 2 in parallel with at most 12 threads; put results in a channel items2

With channels, the following snippet is what works for me. But it’s verbose; for example I use channels to simulate thread pools with a specific number of tasks. I wonder whether there is something cleaner. I tried with ThreadPools.jl and OhMyThreads.jl without success, but I don’t know these packages well.

With channels:

function work_channels(nn)
    work1 = Channel{Nothing}(5)
    for _=1:5; put!(work1, nothing); end
    work2 = Channel{Nothing}(12)
    for _=1:12; put!(work2, nothing); end

    items1 = Channel(Inf)
    items2 = Channel(Inf)

    @sync begin
        @async begin
            @sync for i = 1:nn
                take!(work1)
                Threads.@spawn begin
                    sleep(0.2) # fake work
                    put!(items1, -i)
                    put!(work1, nothing)
                end
            end
            close(work1)
            close(items1)
        end

        @async begin
            @sync for x in items1
                take!(work2)
                Threads.@spawn begin
                    sleep(0.1) # fake work
                    put!(items2, x^2)
                    put!(work2, nothing)
                end
            end
            close(work2)
            close(items2)
        end
    end
    items2
end

With ThreadPools.jl, I do not manage to have step 2 start before step 1 is fully done (as @info reveals):

using ThreadPools: twith, QueuePool, tforeach

function work_queuepools(nn)
    items1 = Channel(Inf)
    items2 = Channel(Inf)

    @sync begin
        Threads.@spawn begin
            twith(QueuePool(2, 5)) do pool
                tforeach(pool, 1:nn) do i
                    sleep(0.2)
                    @info "put 1 $(-i)"
                    put!(items1, -i)
                end
            end
            close(items1)
        end

        Threads.@spawn begin
            twith(QueuePool(2, 12)) do pool
                tforeach(pool, items1) do x
                    sleep(0.1)
                    @info "put 2 $(x^2)"
                    put!(items2, x^2)
                end
            end
            close(items2)
        end
    end
    items2
end

With OhMyThreads.jl, I don’t seem to be able to pass a Channel as the iterator to functions like tforeach.

This is so very close to being really convenient to do with just Base :frowning: If we’d have the ability to add custom sized threadpools dynamically, you could use Threads.@threads :greedy for

Alternatively, you could take a look at how :greedy is implemented and add a user supplied upper limit on the spawned tasks to :greedy?

It’s doable with Threads.@threads and Base.Semaphore:


function wchan(nn)

    items1 = Channel(Inf)
    items2 = Channel(Inf)

    # step 1 should have max 5 concurrent
    sem1 = Base.Semaphore(5)
    step1 = @async begin
        Threads.@threads for i in 1:nn
            Base.acquire(sem1)
            sleep(0.2)
            @ccall printf("step1: %d\n"::Cstring; i::Cint)::Cint
            put!(items1, i)
            Base.release(sem1)
        end
        close(items1)
    end

    sem2 = Base.Semaphore(12)
    step2 = @async begin
        Threads.@threads for i in 1:nn
            Base.acquire(sem2)
            x = take!(items1)
            sleep(0.2)
            @ccall printf("step2: %d\n"::Cstring; x::Cint)::Cint
            put!(items2, x^2)
            Base.release(sem2)
        end
        close(items2)
    end
    wait.((step1, step2))

    items2
end

nn = 50
ch = wchan(nn)
result = sort([take!(ch) for _ in 1:nn])

I use @ccall printf since it’s immediate, not waiting for some interactive thread to write.

Does transducers do what you want?

using Transducers: Map, append_unordered!

function work_channels(inch)
    items1 = Channel()
    items2 = Channel()
    @async try
        append_unordered!(items1, Map(i -> begin sleep(0.1); @info "i = $i"; i - 1 end), inch;
                          ntasks = 5)
    finally close(items1) end
           
    @async try
        append_unordered!(items2, Map(x -> begin sleep(0.2); @info "x = $x"; x^2 end), items1;
                          ntasks = 12)
    finally close(items2) end
    items2
end

Channel(Map(identity), 1:nn) |> work_channels |> collect

Otherwise, you might need/want to implement some helper functions mimicking map, filter and friends for channels. For inspiration check clojure.core.async

1 Like