Worker thread setup in Julia compared to C++/pthreads

I’m trying to port some code (a 3D renderer) that in C++ uses a simple tasking system to process items of work using multiple OS-level threads. Each item can (and will) take a varying amount of time to process making it less suitable for using @threads as far as I read from different sources. I have experimented with representative test code using various methods, e.g. @threads, @spawn and ThreadPools (specifically @qthreads and @qbthreads).

The original C++ code uses the following multi-threaded setup:

  • I’m on a system with W physical CPU cores
  • I’m using two thread-safe queues, one that contains work items that need processing, plus one that receives the results of processing each item. The work items are smalls set of parameters (a tuple), while a processed work item consists of a piece of a rendered image (e.g. 32x32 pixels)
  • The main thread fills the work queue with items to process, plus it pushes W sentinel values that signal to each worker thread that all items were processed and they can quit
  • The main thread starts W worker threads with pthread_create(..., &func, args), where func contains a loop that keeps retrieving one item from the work queue, processes it, followed by pushing the result in the result queue (the two queue references are passed through args). When a worker thread encounters a sentinel value it simply quits.
  • The main thread waits for all work items to be processed (as it knows the total count of results to expect) and handles each item as it comes in. During this processing it prints some statistics on progress, and in some version of the C++ code it also display the rendered image as it is produced. This is all fairly light weight processing so the main thread doesn’t interfere very much with the worker thread computations
  • When all items have been processed the main thread uses a pthread_join() on each worker thread

I’ve been trying to get a similar setup working in Julia, where I would like to have:

  • W tasks (or worker threads) that each process a work item, taking advance of all W CPU cores simultaneously
  • A main tasks (or thread) that processes results as they come in

But I can’t seem to replicate what I have in the C++ version. Either I have all worker threads active but the main thread blocked, or I need to use W-1 worker threads to get the main thread to run.

The main stumbling block as far as I can tell is that in Julia there is the intermediate layer of Tasks between Julia code and the underlying OS-level threads. There is no direct way to simply start N OS-level threads (as with pthread_create()) and handle locking/communication/etc yourself, so using Tasks and their scheduling in Julia is what to work with. And since the Task scheduling is also handled by Julia and it seems to maximize on running at most W tasks concurrently I can never have W+1 tasks running. I thought I could make the main thread a Julia task, but I simply can’t get it to work in the way I want.

Sorry for the long description. Here is my latest attempt using @spawn that illustrates it all in more detail:

using Base.Threads
import Base.Threads.@spawn

println("nthreads() = $(Threads.nthreads())")

# https://discourse.julialang.org/t/print-functions-in-a-threaded-loop/12112/8
const print_lock = ReentrantLock()

function safe_println(s)
    lock(print_lock) do
        Core.println(s)
    end
end

function worker(index, work_channel, result_channel)

    tid = Threads.threadid()
    safe_println("[$(tid)] worker $(index) | started")
    
    while true

        work = take!(work_channel)       
        safe_println("[$(tid)] worker $(index) | got work to do: $(work)")
        
        if work === nothing
            # Sentinel, all work done
            safe_println("[$(tid)] worker $(index) | done")
            return
        end
        
        # Spin, simulate work of at least index seconds
        t0 = time()
        dummy = 0
        while (time() - t0) < index 
            dummy += 1
        end
        
        # Produce result
        result = (index, tid, work)
        safe_println("[$(tid)] worker $(index) | producing result $(result)")
        put!(result_channel, result)
    end
end

D = 10                  # Data items
W = Threads.nthreads()  # Worker threads

work_channel = Channel{Union{Int,Nothing}}(D+W)
result_channel = Channel{Tuple{Int,Int,Int}}(D)

# Place work in the queue
safe_println("Scheduling $(D) work items")
for i = 1:D
    put!(work_channel, i)
end

# Push sentinels, one per worker thread
for i = 1:W
    put!(work_channel, nothing)
end

# Start worker threads
tasks = []
safe_println("[main] Starting $(W) tasks")
for i = 1:W
    t = @spawn worker(i, work_channel, result_channel)    
    safe_println("[main] worker $(i): $(t)")
    push!(tasks, t)
end

# Wait for all work to complete
safe_println("[main] Waiting for work completion")    

r = zeros(Int, D)
for i = 1:D
    wid, tid, work = take!(result_channel)
    r[work] = tid
    safe_println("[main] Got $(work) from worker $(wid) (thread $(tid))")
end

safe_println("[main] Received all work items")    
safe_println("[main] $(r)")

safe_println("[main] Waiting for task completion")
for t in tasks 
    wait(t)
end

Running this on my 4-core system with -t 4 (Julia 1.6 RC1) results consistently in

nthreads() = 4
Scheduling 10 work items
[main] Starting 4 tasks
[main] worker 1: Task (runnable) @0x00007f6952139ba0
[main] worker 2: Task (runnable) @0x00007f6952738400
[main] worker 3: Task (runnable) @0x00007f6952738550
[main] worker 4: Task (runnable) @0x00007f69527386a0
[main] Waiting for work completion
[4] worker 1 | started
[4] worker 1 | got work to do: 1
[2] worker 2 | started
[2] worker 2 | got work to do: 2
[3] worker 3 | started
[3] worker 3 | got work to do: 3
[1] worker 4 | started
[1] worker 4 | got work to do: 4
[4] worker 1 | producing result (1, 4, 1)
[4] worker 1 | got work to do: 5
[2] worker 2 | producing result (2, 2, 2)
[2] worker 2 | got work to do: 6
[4] worker 1 | producing result (1, 4, 5)
[4] worker 1 | got work to do: 7
[3] worker 3 | producing result (3, 3, 3)
[3] worker 3 | got work to do: 8
[4] worker 1 | producing result (1, 4, 7)
[4] worker 1 | got work to do: 9
[2] worker 2 | producing result (2, 2, 6)
[2] worker 2 | got work to do: 10
[4] worker 1 | producing result (1, 4, 9)
[4] worker 1 | got work to do: nothing
[4] worker 1 | done
[1] worker 4 | producing result (4, 1, 4)
[1] worker 4 | got work to do: nothing
[1] worker 4 | done
[main] Got 1 from worker 1 (thread 4)
[main] Got 2 from worker 2 (thread 2)
[main] Got 5 from worker 1 (thread 4)
[main] Got 3 from worker 3 (thread 3)
[main] Got 7 from worker 1 (thread 4)
[main] Got 6 from worker 2 (thread 2)
[main] Got 9 from worker 1 (thread 4)
[main] Got 4 from worker 4 (thread 1)
[2] worker 2 | producing result (2, 2, 10)
[2] worker 2 | got work to do: nothing
[2] worker 2 | done
[main] Got 10 from worker 2 (thread 2)
[3] worker 3 | producing result (3, 3, 8)
[3] worker 3 | got work to do: nothing
[3] worker 3 | done
[main] Got 8 from worker 3 (thread 3)
[main] Received all work items
[main] [4, 2, 3, 1, 4, 2, 4, 3, 4, 2]
[main] Waiting for task completion

I.e. the main task only starts running after the worker task on thread 1 (= main thread) is done. I can understand why this happens, but don’t see how to get around this.

I’ve tried increasing the number of OS-level threads available in Julia, e.g -t 5 or higher. In some of the test runs this indeed allows the main thread to see the results as they come in, but then it’s because thread 1 (i.e main thread) doesn’t have a worker task assigned. But this only starts to happen regularly from -t 8 or higher (e.g. double the number of physical cores).

In short, I could use some pointers how to approach this. I find the intermediate Task layer a bit confusing, although I can understand why it’s there, if the goal is to abstract away from OS-level threads into something perhaps a bit more user-friendly. But the lack of low-level control over OS-level threads is a bit surprising, as even in Python a threading.Thread corresponds directly to a pthread (which can be useful when having computations that release the GIL, or working with lots of I/O connections that spend a lot of time being blocked, in which case the number of OS-level threads is not an issue).

2 Likes

To avoid the main OS thread (threadid() == 1) to be blocked, you’d need to start nthreads() - 1 workers, as you mentioned. A simple solution may be to start Julia with julia -t$number_of_cpus_plus_one and use something like

function foreach_on_non_primary(f, input::Channel)
    workers = Threads.Atomic{Int}(0)
    @sync for _ in 1:Threads.nthreads()
        Threads.@spawn if Threads.threadid() > 1
            Threads.atomic_add!(workers, 1)
            for x in input
                f(x)
            end
        end
    end
    if workers[] == 0
        # Failed to start workers. Fallback to more robust (less controlled)
        # solution:
        @sync for _ in 1:Threads.nthreads()
            Threads.@spawn for x in input
                f(x)
            end
        end
    end
end

for work in work_source
    push!(work_channel, work)
end
close(work_channel)  # instead of sentinel, you can just close it (otherwise deadlock)
foreach_on_non_primary(work_channel) do work
    push!(result_channel, compute(work))
end

(Note to wider audience: I encourage not using approach like this in released packages. Julia’s parallel task runtime needs to control the task scheduling to make parallel programs composable. Manually managing task scheduling like this defeats the design philosophy of Julia’s parallel runtime. On the other hand, if you are writing an application (not library), I don’t think there is a problem in using tricks like this. For more discussion, see the documentation of FoldsThreads.TaskPoolEx and [ANN] FoldsThreads.jl: A zoo of pluggable thread-based data-parallel execution mechanisms)

I think ThreadPools.jl’s “background threading API” like bforeach and qbforeach are somewhat equivalent to the function foreach_on_non_primary I wrote above. But they are more reliable when it comes to distributing tasks across OS threads if the worker function f yields to the scheduler immediately.

Note that the lack of user’s control yields the increase in system’s control. This is a key theme in Julia (or maybe rather in any programming systems in general) for unlocking a larger class of optimizations. Indeed, this is quite opposite in philosophy when compared to “systems” language like C++ or glue language like Python. For example, we are looking for even more restricted form of task parallelism (more discussion in my PR: RFC: Experimental API for may-happen in parallel parallelism by tkf · Pull Request #39773 · JuliaLang/julia · GitHub). I agree that Julia’s task parallelism could be confusing if you already are familiar with programming using OS-level threading. But the lack of control is actually there for leaving room for the system to improve the performance.

7 Likes

In Julia and similar systems (Cilk, Go, Intel TBB…), you would normally let the threading scheduler handle this for you. Just @spawn a task for each work item that needs to be completed, and wait for them to be finished (typically by enclosing all the @spawn calls in a @sync block). The runtime will schedule the work onto threads for you, keeping the threads busy by sending them new tasks when they are idle.

(As others have noted, a big advantage of systems like this is that they give you composable parallelism. e.g. your code can be called multiple times in parallel by someone else’s code, alongside other parallel work, and the whole thing will share OS threads in a load-balanced way.)

2 Likes

For this case I still don’t see how you can have the main thread handle results as they become available, as @sync blocks until all spawned tasks are finished. So you would need to @spawn a dedicated handling task (that possibly is limited to the main thread, similar to @tkf’s answer above)?

I can understand that limiting user control would be a trade-off if the goal is more system control. I am wondering how well that meshes with using non-Julia libraries that have their own OS-level threading restrictions and/or do multi-threading at that level themselves. Pretty much all GUI libraries I know require any API calls to come from the main thread. And doing multi-threaded computations together with keeping a GUI responsive and updating when a GUI mainloop is involved usually is already a nice puzzle where there’s pretty much only one way to do it right and many ways to do it wrong. I’ve tried to extend my test code to use ImageView for displaying incoming processed work items, but it seems the underlying Gtk.jl has some issues in a multi-threaded setup, e.g

(But then again, Gtk.jl seems to have more fundamental issues when not used from the REPL)

Plus there’s libraries that use multi-threaded processing using pthreads themselves, outside of Julia’s knowledge or control. Is there enough flexibility in the Julia way of controlling the threading setup to allow some escape holes for these situations?

I think a robust solution you can implement right now is to separate throughput-oriented parts and latency-oriented parts in separated Julia processes. It’s a tedious solution to do manually but it’s conceivable to create an easy-to-use GUI framework on top of this idea.

But, since Julia already has some special casing for external libraries that do not like called from Julia’s coroutine (IIUC), maybe we can get something in the base system at some point. OTOH, multi-process GUI tooling does not sound crazy either (cf Chrome).

For libraries geared toward parallel computation (e.g., linear algebra) rather than concurrency (e.g., GUI or web server), I don’t think we can have a composable ecosystem unless we have a central mechanism for distributing computing resources (e.g., CPUs) that prioritizes computation. IIUC @stevengj took an approach to make FFTW (external library) scheduler-generic so that it composes well with Julia programs. I think it’s an ideal approach for compute-oriented libraries (although we need upstream change for this).

Use a Channel?

1 Like

I have hard time understanding how spawning only nthreads() threads and regularly yielding (e.g. by reading from a channel) hinders composability versus starting a lot of threads. Could you please explain this?