I want to do some work on a list, and each step of the “work” depends on the previous one. Some steps can be IO-bound while some other can be CPU-bound. I want for example to organize as follows:
- do step 1 in parallel with at most 5 threads; put results in a channel
items1
- do step 2 in parallel with at most 12 threads; put results in a channel
items2
With channels, the following snippet is what works for me. But it’s verbose; for example I use channels to simulate thread pools with a specific number of tasks. I wonder whether there is something cleaner. I tried with ThreadPools.jl and OhMyThreads.jl without success, but I don’t know these packages well.
With channels:
function work_channels(nn)
work1 = Channel{Nothing}(5)
for _=1:5; put!(work1, nothing); end
work2 = Channel{Nothing}(12)
for _=1:12; put!(work2, nothing); end
items1 = Channel(Inf)
items2 = Channel(Inf)
@sync begin
@async begin
@sync for i = 1:nn
take!(work1)
Threads.@spawn begin
sleep(0.2) # fake work
put!(items1, -i)
put!(work1, nothing)
end
end
close(work1)
close(items1)
end
@async begin
@sync for x in items1
take!(work2)
Threads.@spawn begin
sleep(0.1) # fake work
put!(items2, x^2)
put!(work2, nothing)
end
end
close(work2)
close(items2)
end
end
items2
end
With ThreadPools.jl, I do not manage to have step 2 start before step 1 is fully done (as @info
reveals):
using ThreadPools: twith, QueuePool, tforeach
function work_queuepools(nn)
items1 = Channel(Inf)
items2 = Channel(Inf)
@sync begin
Threads.@spawn begin
twith(QueuePool(2, 5)) do pool
tforeach(pool, 1:nn) do i
sleep(0.2)
@info "put 1 $(-i)"
put!(items1, -i)
end
end
close(items1)
end
Threads.@spawn begin
twith(QueuePool(2, 12)) do pool
tforeach(pool, items1) do x
sleep(0.1)
@info "put 2 $(x^2)"
put!(items2, x^2)
end
end
close(items2)
end
end
items2
end
With OhMyThreads.jl, I don’t seem to be able to pass a Channel
as the iterator to functions like tforeach
.