Multithreaded compute farm

I searched for a function in Julia that creates a “compute farm” based on a channel: workers take tasks from a channel and put back 0 or more tasks to the same channel. (many mathematical problems seem to be approachable in this way; e.g. exploring a graph which is accessible neighbour-by-neighbour).

I couldn’t find one, so wrote it myself. It seems to perform reasonably well, but since I’m far from expert on multithreading I’d be very happy if someone could review my code and give useful criticism.

The function itself is fairly short, the idea is to create an object ChannelFarm{T} similar to Channel{T} and use it as

ch = ChannelFarm{T}() do x
    # do something with x::T
    return a tuple or vector of 0 or more objects of type T
end
put!(ch,initial object(s))

and will do something with all the objects of type T, either put initially or created within the function.

Here goes my code:

module Farm

using Base.Threads, Test

import Base: put!, take!, wait, close

struct ChannelFarm{T} <: AbstractChannel{T}
    busy::Atomic{UInt}
    done::Threads.Condition
    ch::Channel{T}
    function ChannelFarm{T}(f::Function, size=16384; nthreads=nthreads()) where T
        busy = Atomic{UInt}(nthreads)
        done = Threads.Condition()
        ch = Channel{T}(size)
        for _=1:nthreads
            bind(ch, Threads.@spawn while true
                 atomic_sub!(busy,UInt(1))
                 if busy[] == 0 # last one turns the lights off
                     lock(done)
                     notify(done)
                     unlock(done)
                 end
                 x = take!(ch)
                 atomic_add!(busy,UInt(1))
                 for y=f(x)
                     Base.n_avail(ch)<size || throw("ChannelFarm: channel is full")
                     put!(ch,y)
                 end
             end)
        end
        new(busy,done,ch)
    end
end

put!(chf::ChannelFarm{T},x::T) where T = put!(chf.ch,x)

function wait(chf::ChannelFarm)
    lock(chf.done)
    wait(chf.done)
    unlock(chf.done)
end

close(chf::ChannelFarm) = close(chf.ch)

@testset "ChannelFarm" begin
    λ = 0.9
    inputs = [Float64[] for _=0:nthreads()]
    outputs = [Float64[] for _=1:nthreads()]
    
    ch = ChannelFarm{Float64}() do x
        sleep(x/10)
        push!(outputs[threadid()],x) # record it
        t = rand()
        next = Float64[]
        for k=0:20 # simulate Poisson distribution
            t -= λ^k * exp(-λ) / factorial(k)
            t<0 && break
            y = rand()
            push!(inputs[threadid()],y)
            push!(next,y)
        end
        next
    end
    for t=1:10
        y = rand()
        sleep(y) # give some time to the queues to start
        push!(inputs[nthreads()+1],y)
        put!(ch, y)
    end
    wait(ch)
    close(ch)

    vinputs = Set(vcat(inputs...))
    voutputs = Set(vcat(inputs...))
    @test vinputs == voutputs
end

end

Cool idea!

This should instead be:

last = atomic_sub!(busy, UInt(1))
    if last == 1

Because otherwise between the modify and read operations it’s possible for the value to change (unlikely, but semantically possible).

Instead of:

try to use:

lock(done)
    notify(done)
end

in case notify throws an error (I have no clue if it can, but it’s good practice).

If you have to do this, it’s usually an indication that you’re doing something wrong. You should try to structure put!(ch, y) so that it can be called immediately after construction. Relying on the sleep approach to ensure that your tasks are scheduled will potentially become a source of bugs in the future.

And in your tests:

You probably meant vcat(outputs...) :slightly_smiling_face:

Otherwise this seems generally fine to me! You could also consider using Dagger to distribute this work, assuming you only submit functional programs (no side effects or mutating inputs).

2 Likes

I’m curious where you picked up the term “compute farm” from. In the parallel skeleton literature, (task) farm skeleton refers to something different (FYI, my Concurrency patterns for controlled parallelisms tutorial includes a simple implementation of task farm in Julia.) I think what you are referring to is close to the feedback skeleton (although I don’t know how standard this nomenclature is):

— Castro, David, Kevin Hammond, and Susmit Sarkar. 2016. “Farms, Pipes, Streams and Reforestation: Reasoning about Structured Parallel Processes Using Types and Hylomorphisms.” ACM SIGPLAN Notices 51 (9): 4–17. https://doi.org/10.1145/3022670.2951920.

I agree with @jpsamaroo’s comment on the bug in using atomic operation on busy. To avoid a bug like this, it’d be better to use a pre-existing abstraction. This is what C++ calls latch and CountDownLatch (although I’ve seen some dispute on the naming). Anyway, I think you can wrap it into a abstraction called latch or barrier, which makes testing and reviewing easier. FYI, my SyncBarriers.jl library has a barrier implementations that can be used for this. A fuzzy barrier is useful for this purpose since (presumably) the workers don’t have to wait for other workers.

Having said that, I also agree with:

One solution may be to use a simpler solution like below (untested), which doesn’t require the barrier dance at all:

function parallel_feedback(f, inits, T = eltype(inits); ntasks=Threads.nthreads())
    worklist = Channel{T}() do worklist
        for x in inits
            put!(worklist, x)
        end
    end
    @sync for _ in 1:ntasks
        Threads.@spawn try
            f(worklist)
        finally
            close(worklist)
        end
    end
end

It can be used as

parallel_feedback(rand(10)) do worklist
    for x in worklist
        for k in 0:20 # simulate Poisson distribution
            t -= λ^k * exp(-λ) / factorial(k)
            t < 0 && break
            y = rand()
            put!(worklist, y)
            # Note: omitting actual output handling
        end
    end
end

This has an extra benefit that you don’t need to allocate the temporary next vector (cutting GC interaction is important for scaling up threaded program ATM). It also makes the work available right away for other workers (i.e., the loop inside f does not have to complete for “submitting” the work). Furthermore, it is now fairly easy to swap the worklist implementation with a list of work-stealing deque, which is very appealing here since the single worker case degenerate into the usual (single-threaded) worklist-based recursion. This makes this construct robustly perform well when nested inside other parallel programs.

Also, if you want to bound the channel size, you can create a simple wrapper type like
struct BoundedChannel{T}
    ch::Channel{T}
end

function Base.put!(ch::BoundedChannel, x)
    Base.n_avail(ch.ch)<size || error("BoundedChannel: channel is full")
    put!(ch.ch, x)
end

Base.iterate(ch::BoundedChannel, _ = nothing) = iterate(ch.ch)

and use it as f(BoundedChannel(worklist)) instead of f(worklist).
(Edit: n_avail is an internal function)

Some more random comments:

  • ChannelFarm{T}(f::Function, ...): every object in Julia is potentially callable. I don’t think bounding ::Function is a good idea. See https://github.com/JuliaLang/julia/pull/34296 for some discussion.
  • ChannelFarm{T}(...; nthreads=nthreads()): The argument nthreads does not specify the number of threads. It specifies the number of worker tasks. I’d call it ntasks (or maybe nworkers).
4 Likes

One more important thing:

I agree that this kind of “fixed-point solver” comes up in many problems. But something like visiting data structures (like a graph) probably should be handled via a divide-and-conquer pattern, to reduce communications between the tasks (although maintaining what node is visited efficiently probably requires some lock-free data structure). An important ingredient for a high-performance parallel program is to not communicate (often). A queue or channel is all about communication and introducing a bottleneck. So, I think it’s a good idea to exploring how to avoid it if possible.

1 Like

Thanks a lot, that’s very helpful! However, I can’t get your code to work as-is. Copying your suggestion

    for x in worklist
        @info "Processing $x$"; sleep(x/100) # simulate work
        t = rand()
        for k in 0:20 # simulate Poisson distribution
            t -= λ^k * exp(-λ) / factorial(k)
            t < 0 && break
            y = rand()
            put!(worklist, y)
        end
    end
end

The put(worklist,y) call blocks if there’s no available worker.

I tried to fix it by creating a channel with big size (Channel{T}(10^4) in parallel_feedback) but then there’s another problem: the channel gets closed by Julia after the worker processed just one or two xs. I tried to keep it alive with bind(worklist,Threads.@spawn try ...) with no success.

I think however that there’s another fundamental problem with your approach, which I had tried to anticipate (inelegantly) with my lock dance: say I have 64 workers (ntasks=nthreads()). Already at the beginning, only 10 of the threads will survive! Worse: suppose that at one moment, the workload gets empty, and just one thread is working. 63 of the threads stop working, since the workload is exhausted. Then the workload picks up again from that single task, and we’re working single-threaded from now on.

I should have been more clear with my typical usage parameters. A few million items will go through the worklist, and each one will take a few milliseconds. Single-threadedly, the runtime should be a few days, and multithreadedly a few hours.

It’s very difficult to divide-and-conquer this problem, since I’m exploring (a small portion) of an infinite graph neighbour-by-neighbour, making local decisions as to in which direction it’s best to continue the exploration.

Dagger seems like a great tool! I see you’re the author… I couldn’t use it, however:

  • in unix.jl:27, Compat.Sys.isapple() does not exist – I changed it to Sys.isapple()
  • after running Dagger.@spawn 1+2 I get
Error in eager scheduler:
ThunkFailedException (Thunk[1](eager_thunk, ()) failure):
UndefVarError: CLOCK_THREAD_CPUTIME_ID not defined

Assuming I can get it to run, though, would the following work?

function f(x)
    # process x
    for _=1:poisson(lambda); Dagger.@spawn f(rand()); end
end
for _=1:10; Dagger.@spawn f(rand()); end

I’m a bit afraid of @spawning millions of tasks without crashing my laptop or, worse, the cluster to which I have access.

1 Like

@jpsamaroo OK, I grepped through the .h files, and adding
const CLOCK_THREAD_CPUTIME_ID = Cint(16)
on apple systems seems to work. However, my brutal implementation does not: after a few thousand calls, it hits

Error in eager scheduler:
Future can be set only once
...
error in running finalizer: ErrorException("task switch not allowed from inside gc finalizer")
jl_error at /usr/local/src/julia/src/rtutils.c:41
jl_switch at /usr/local/src/julia/src/task.c:498
try_yieldto at ./task.jl:737
...

Huh, odd. Would you mind filing an issue with a reproducer, full stacktrace, and your system information?

2 Likes

Ah, yes, the initialization should be outside:

function parallel_feedback(
    f,
    inits,
    T = eltype(inits);
    ntasks = Threads.nthreads(),
    size = Inf, # we'd need something like BoundedChannel above if bounded
)
    worklist = Channel{T}(size)
    @sync begin
        for _ in 1:ntasks
            Threads.@spawn try
                f(worklist)
            finally
                close(worklist)
            end
        end
        try
            for x in inits
                put!(worklist, x)
            end
        catch
            close(worklist)
            rethrow()
        end
    end
end

(Side note: I moved put!(worklist, x) inside @syncso that iterate(inits, _) can run in parallel with other works. It may be useful when inits = Iterators.Map(f, _) where f is somewhat expensive.)

Also, we’d need some termination condition to break out of for x in worklist loop:

let counter = Threads.Atomic{Int}(0)
    parallel_feedback(rand(10)) do worklist
        for x in worklist
            ...
            if Threads.atomic_add!(counter, 1) > 10  # or maybe something more relevant to the problem
                break
            end
        end
    end
end

It’d then terminate the all workers if at least one worker hit the break (which closes worklist) and then other workers try to go to the next iteration of for x in worklist loop.

Let me know if this happens with the above fix. I think all workers will wait at the line for x in worklist line as long as worklist is not closed.

There is already a ThreadPools.jl which I think makes sense to use for these types of problems.

1 Like

I don’t think ThreadPools.jl is relevant here unless, e.g., you want to combine this with a GUI application:

See also the last paragraph of Automatically fusing together several for-loops - #18 by tkf and the discussion.

Create a thread pool with N threads where N is similar to your number of cores, then push the first unit of work to the channel… When you generate new units in the processing, push them to the channel. The threads are all just grabbing work, sending completed work to the output, and pushing any work they generate to the work channel.

It works great for this kind of problem.

It is not possible to do this in Julia, provided what you mean by thread is OS thread (e.g., pthread in Linux). There is no construct in Julia that lets you do this. In particular, ThreadPools.jl does not help you do this.

What Base.Threads lets you do is to schedule Julia task in arbitrary worker thread implemented on top of OS-native threading API. ThreadPools.jl is built on top of this and lets you control the set of worker threads (i.e., pool), mainly for separating latency-oriented and throughput-oriented code.

I think you’d need to understand the characteristics of ThreadPools.jl. Otherwise, using it blindly just introduces overhead.

2 Likes

Although the motivation for writing it may have been separating latency and throughput oriented code, it works great for handling situations where the amount of time each computational task takes is widely varying. So for example if you want to do a stochastic simulation until something occurs in the simulation and then output some property of the state at that time… the amount of time it takes to run that simulation might vary from say 10ms to 30 hours. ThreadPools works great to ensure that as soon as a task completes, that newly freed thread can grab the next chunk of work.

As far as I can tell ThreadPools DOES in fact control which julia worker thread, and hence OS thread, the task runs on. I’m not sure if that’s because it has access to some very low level stuff in Julia, or not. I could be mistaken. But see for example this discussion of how bthreads keeps the main thread free: ThreadPools.jl · ThreadPool Documentation

Also there’s the macro it has @tspawnat “The package also exposes a lower-level @tspawnat macro that mimics the Base.Threads.@spawn macro, but allows direct thread assignment for users who want to develop their own scheduling.”

In any case, I believe ThreadPools was absolutely built for the original problem description of pushing work chunks into a queue, and then letting threads dequeue them and process them, and potentially push more chunks into the queue.

Julia tasks created by Threads.@spawn does already. There’s no need to use ThreadPools.jl for only this purpose.

My understanding is that @spawn sends tasks to some thread, and then they’re stuck on that thread, with no migration. (Unless that changed recently). So if you spawn two long tasks and they wind up on the same thread, they’ll run in serial. ThreadPools gets rid of that problem.

This will not be true in Julia 1.7: Refetch PTLS via Task struct (enable task migration) by vtjnash · Pull Request #40715 · JuliaLang/julia · GitHub

But this is irrelevant to this discussion. Unless a coroutine-like construct is implemented, it is impossible to work around this restriction inside user code since this is the property of the runtime system. The last time I checked it, ThreadPools.jl did not implement coroutine.

3 Likes

You keep insisting that thread pools cannot do what I’ve already done with theead pools I don’t understand what you’re talking about.

Create a channel, create a thread pool with 4 threads. Push 4 numbers into the channel. In the thread pools run a function that reads from the channel and calculates how many normal random numbers you have to generate before you get a value bigger than what it read from the channel… These tasks will run on separate threads, and do their work just fine… (Use local RNGs)

If ThreadPools.jl fits your need, I think it’s great and it’s not my intention to discourage it at all. All I want to do is to properly characterize what it actually is (or was, when I checked the last time) for helping you and other readers to understand the use case of it. I also didn’t say it’s not possible to use or implement “thread pool.” I was trying to explain that you don’t need ThreadPools.jl for this and ThreadPools.jl introduces overheads since it tries to disable what Julia runtime does (e.g., task migration across OS threads, in Julia 1.7 and later). There are much simpler and composable ways to implement task pools. In particular, the code in the OP already does it.

2 Likes

I seem to have started a heated exchange, so please let me summarize my understanding.

  • ThreadPools.jl is great in that it creates a channel-like object on which new tasks can be pushed (at wish, in the form of a function and arguments).
  • @tkf suggested a direct implementation, that’s so short that there’s no need to look for a package
  • Dagger.jl lets you create step-by-step a directed acyclic graph of computations. Each current leaf can, itself, extend the dag (actually a tree) and wait for its completion. I’m a bit nervous of the overhead of creating gazillions of tasks, perhaps in parallel, but in principle it should work cleanly.

One thing that the first two suggestions don’t address is to stop when all the work is finished. I must have been a bit unclear, so let me try to rephrase it mathematically.

We have a map f\colon T \to 2^T with t\in f(t) for all t, and an initial subset S\subseteq T. We are looking for the fixed point \bigcup_{n\ge0} f^n(S). In words, f is a map taking an object t, doing something with it, and creating 0 or more new objects on which to keep working.

Thinking about a channel abstraction, we start nthreads() workers who take items from the channel, do some work on them (probably modifying a global state), and push new items back on the channel. When all workers are idle and the channel is empty, we close the channel and return control to the user.

It’s especially for this stopping condition that I had to do some unpleasant lock yoga.