I’m trying to port some code (a 3D renderer) that in C++ uses a simple tasking system to process items of work using multiple OS-level threads. Each item can (and will) take a varying amount of time to process making it less suitable for using @threads
as far as I read from different sources. I have experimented with representative test code using various methods, e.g. @threads
, @spawn
and ThreadPools (specifically @qthreads
and @qbthreads
).
The original C++ code uses the following multi-threaded setup:
- I’m on a system with W physical CPU cores
- I’m using two thread-safe queues, one that contains work items that need processing, plus one that receives the results of processing each item. The work items are smalls set of parameters (a tuple), while a processed work item consists of a piece of a rendered image (e.g. 32x32 pixels)
- The main thread fills the work queue with items to process, plus it pushes W sentinel values that signal to each worker thread that all items were processed and they can quit
- The main thread starts W worker threads with
pthread_create(..., &func, args)
, wherefunc
contains a loop that keeps retrieving one item from the work queue, processes it, followed by pushing the result in the result queue (the two queue references are passed throughargs
). When a worker thread encounters a sentinel value it simply quits. - The main thread waits for all work items to be processed (as it knows the total count of results to expect) and handles each item as it comes in. During this processing it prints some statistics on progress, and in some version of the C++ code it also display the rendered image as it is produced. This is all fairly light weight processing so the main thread doesn’t interfere very much with the worker thread computations
- When all items have been processed the main thread uses a
pthread_join()
on each worker thread
I’ve been trying to get a similar setup working in Julia, where I would like to have:
- W tasks (or worker threads) that each process a work item, taking advance of all W CPU cores simultaneously
- A main tasks (or thread) that processes results as they come in
But I can’t seem to replicate what I have in the C++ version. Either I have all worker threads active but the main thread blocked, or I need to use W-1 worker threads to get the main thread to run.
The main stumbling block as far as I can tell is that in Julia there is the intermediate layer of Tasks between Julia code and the underlying OS-level threads. There is no direct way to simply start N OS-level threads (as with pthread_create()
) and handle locking/communication/etc yourself, so using Tasks and their scheduling in Julia is what to work with. And since the Task scheduling is also handled by Julia and it seems to maximize on running at most W tasks concurrently I can never have W+1 tasks running. I thought I could make the main thread a Julia task, but I simply can’t get it to work in the way I want.
Sorry for the long description. Here is my latest attempt using @spawn
that illustrates it all in more detail:
using Base.Threads
import Base.Threads.@spawn
println("nthreads() = $(Threads.nthreads())")
# https://discourse.julialang.org/t/print-functions-in-a-threaded-loop/12112/8
const print_lock = ReentrantLock()
function safe_println(s)
lock(print_lock) do
Core.println(s)
end
end
function worker(index, work_channel, result_channel)
tid = Threads.threadid()
safe_println("[$(tid)] worker $(index) | started")
while true
work = take!(work_channel)
safe_println("[$(tid)] worker $(index) | got work to do: $(work)")
if work === nothing
# Sentinel, all work done
safe_println("[$(tid)] worker $(index) | done")
return
end
# Spin, simulate work of at least index seconds
t0 = time()
dummy = 0
while (time() - t0) < index
dummy += 1
end
# Produce result
result = (index, tid, work)
safe_println("[$(tid)] worker $(index) | producing result $(result)")
put!(result_channel, result)
end
end
D = 10 # Data items
W = Threads.nthreads() # Worker threads
work_channel = Channel{Union{Int,Nothing}}(D+W)
result_channel = Channel{Tuple{Int,Int,Int}}(D)
# Place work in the queue
safe_println("Scheduling $(D) work items")
for i = 1:D
put!(work_channel, i)
end
# Push sentinels, one per worker thread
for i = 1:W
put!(work_channel, nothing)
end
# Start worker threads
tasks = []
safe_println("[main] Starting $(W) tasks")
for i = 1:W
t = @spawn worker(i, work_channel, result_channel)
safe_println("[main] worker $(i): $(t)")
push!(tasks, t)
end
# Wait for all work to complete
safe_println("[main] Waiting for work completion")
r = zeros(Int, D)
for i = 1:D
wid, tid, work = take!(result_channel)
r[work] = tid
safe_println("[main] Got $(work) from worker $(wid) (thread $(tid))")
end
safe_println("[main] Received all work items")
safe_println("[main] $(r)")
safe_println("[main] Waiting for task completion")
for t in tasks
wait(t)
end
Running this on my 4-core system with -t 4
(Julia 1.6 RC1) results consistently in
nthreads() = 4
Scheduling 10 work items
[main] Starting 4 tasks
[main] worker 1: Task (runnable) @0x00007f6952139ba0
[main] worker 2: Task (runnable) @0x00007f6952738400
[main] worker 3: Task (runnable) @0x00007f6952738550
[main] worker 4: Task (runnable) @0x00007f69527386a0
[main] Waiting for work completion
[4] worker 1 | started
[4] worker 1 | got work to do: 1
[2] worker 2 | started
[2] worker 2 | got work to do: 2
[3] worker 3 | started
[3] worker 3 | got work to do: 3
[1] worker 4 | started
[1] worker 4 | got work to do: 4
[4] worker 1 | producing result (1, 4, 1)
[4] worker 1 | got work to do: 5
[2] worker 2 | producing result (2, 2, 2)
[2] worker 2 | got work to do: 6
[4] worker 1 | producing result (1, 4, 5)
[4] worker 1 | got work to do: 7
[3] worker 3 | producing result (3, 3, 3)
[3] worker 3 | got work to do: 8
[4] worker 1 | producing result (1, 4, 7)
[4] worker 1 | got work to do: 9
[2] worker 2 | producing result (2, 2, 6)
[2] worker 2 | got work to do: 10
[4] worker 1 | producing result (1, 4, 9)
[4] worker 1 | got work to do: nothing
[4] worker 1 | done
[1] worker 4 | producing result (4, 1, 4)
[1] worker 4 | got work to do: nothing
[1] worker 4 | done
[main] Got 1 from worker 1 (thread 4)
[main] Got 2 from worker 2 (thread 2)
[main] Got 5 from worker 1 (thread 4)
[main] Got 3 from worker 3 (thread 3)
[main] Got 7 from worker 1 (thread 4)
[main] Got 6 from worker 2 (thread 2)
[main] Got 9 from worker 1 (thread 4)
[main] Got 4 from worker 4 (thread 1)
[2] worker 2 | producing result (2, 2, 10)
[2] worker 2 | got work to do: nothing
[2] worker 2 | done
[main] Got 10 from worker 2 (thread 2)
[3] worker 3 | producing result (3, 3, 8)
[3] worker 3 | got work to do: nothing
[3] worker 3 | done
[main] Got 8 from worker 3 (thread 3)
[main] Received all work items
[main] [4, 2, 3, 1, 4, 2, 4, 3, 4, 2]
[main] Waiting for task completion
I.e. the main task only starts running after the worker task on thread 1 (= main thread) is done. I can understand why this happens, but don’t see how to get around this.
I’ve tried increasing the number of OS-level threads available in Julia, e.g -t 5
or higher. In some of the test runs this indeed allows the main thread to see the results as they come in, but then it’s because thread 1 (i.e main thread) doesn’t have a worker task assigned. But this only starts to happen regularly from -t 8
or higher (e.g. double the number of physical cores).
In short, I could use some pointers how to approach this. I find the intermediate Task layer a bit confusing, although I can understand why it’s there, if the goal is to abstract away from OS-level threads into something perhaps a bit more user-friendly. But the lack of low-level control over OS-level threads is a bit surprising, as even in Python a threading.Thread
corresponds directly to a pthread (which can be useful when having computations that release the GIL, or working with lots of I/O connections that spend a lot of time being blocked, in which case the number of OS-level threads is not an issue).