Multithreaded compute farm

Dagger seems like a great tool! I see you’re the author… I couldn’t use it, however:

  • in unix.jl:27, Compat.Sys.isapple() does not exist – I changed it to Sys.isapple()
  • after running Dagger.@spawn 1+2 I get
Error in eager scheduler:
ThunkFailedException (Thunk[1](eager_thunk, ()) failure):
UndefVarError: CLOCK_THREAD_CPUTIME_ID not defined

Assuming I can get it to run, though, would the following work?

function f(x)
    # process x
    for _=1:poisson(lambda); Dagger.@spawn f(rand()); end
end
for _=1:10; Dagger.@spawn f(rand()); end

I’m a bit afraid of @spawning millions of tasks without crashing my laptop or, worse, the cluster to which I have access.

1 Like

@jpsamaroo OK, I grepped through the .h files, and adding
const CLOCK_THREAD_CPUTIME_ID = Cint(16)
on apple systems seems to work. However, my brutal implementation does not: after a few thousand calls, it hits

Error in eager scheduler:
Future can be set only once
...
error in running finalizer: ErrorException("task switch not allowed from inside gc finalizer")
jl_error at /usr/local/src/julia/src/rtutils.c:41
jl_switch at /usr/local/src/julia/src/task.c:498
try_yieldto at ./task.jl:737
...

Huh, odd. Would you mind filing an issue with a reproducer, full stacktrace, and your system information?

2 Likes

Ah, yes, the initialization should be outside:

function parallel_feedback(
    f,
    inits,
    T = eltype(inits);
    ntasks = Threads.nthreads(),
    size = Inf, # we'd need something like BoundedChannel above if bounded
)
    worklist = Channel{T}(size)
    @sync begin
        for _ in 1:ntasks
            Threads.@spawn try
                f(worklist)
            finally
                close(worklist)
            end
        end
        try
            for x in inits
                put!(worklist, x)
            end
        catch
            close(worklist)
            rethrow()
        end
    end
end

(Side note: I moved put!(worklist, x) inside @syncso that iterate(inits, _) can run in parallel with other works. It may be useful when inits = Iterators.Map(f, _) where f is somewhat expensive.)

Also, we’d need some termination condition to break out of for x in worklist loop:

let counter = Threads.Atomic{Int}(0)
    parallel_feedback(rand(10)) do worklist
        for x in worklist
            ...
            if Threads.atomic_add!(counter, 1) > 10  # or maybe something more relevant to the problem
                break
            end
        end
    end
end

It’d then terminate the all workers if at least one worker hit the break (which closes worklist) and then other workers try to go to the next iteration of for x in worklist loop.

Let me know if this happens with the above fix. I think all workers will wait at the line for x in worklist line as long as worklist is not closed.

There is already a ThreadPools.jl which I think makes sense to use for these types of problems.

1 Like

I don’t think ThreadPools.jl is relevant here unless, e.g., you want to combine this with a GUI application:

See also the last paragraph of Automatically fusing together several for-loops - #18 by tkf and the discussion.

Create a thread pool with N threads where N is similar to your number of cores, then push the first unit of work to the channel… When you generate new units in the processing, push them to the channel. The threads are all just grabbing work, sending completed work to the output, and pushing any work they generate to the work channel.

It works great for this kind of problem.

It is not possible to do this in Julia, provided what you mean by thread is OS thread (e.g., pthread in Linux). There is no construct in Julia that lets you do this. In particular, ThreadPools.jl does not help you do this.

What Base.Threads lets you do is to schedule Julia task in arbitrary worker thread implemented on top of OS-native threading API. ThreadPools.jl is built on top of this and lets you control the set of worker threads (i.e., pool), mainly for separating latency-oriented and throughput-oriented code.

I think you’d need to understand the characteristics of ThreadPools.jl. Otherwise, using it blindly just introduces overhead.

2 Likes

Although the motivation for writing it may have been separating latency and throughput oriented code, it works great for handling situations where the amount of time each computational task takes is widely varying. So for example if you want to do a stochastic simulation until something occurs in the simulation and then output some property of the state at that time… the amount of time it takes to run that simulation might vary from say 10ms to 30 hours. ThreadPools works great to ensure that as soon as a task completes, that newly freed thread can grab the next chunk of work.

As far as I can tell ThreadPools DOES in fact control which julia worker thread, and hence OS thread, the task runs on. I’m not sure if that’s because it has access to some very low level stuff in Julia, or not. I could be mistaken. But see for example this discussion of how bthreads keeps the main thread free: ThreadPools.jl · ThreadPool Documentation

Also there’s the macro it has @tspawnat “The package also exposes a lower-level @tspawnat macro that mimics the Base.Threads.@spawn macro, but allows direct thread assignment for users who want to develop their own scheduling.”

In any case, I believe ThreadPools was absolutely built for the original problem description of pushing work chunks into a queue, and then letting threads dequeue them and process them, and potentially push more chunks into the queue.

Julia tasks created by Threads.@spawn does already. There’s no need to use ThreadPools.jl for only this purpose.

My understanding is that @spawn sends tasks to some thread, and then they’re stuck on that thread, with no migration. (Unless that changed recently). So if you spawn two long tasks and they wind up on the same thread, they’ll run in serial. ThreadPools gets rid of that problem.

This will not be true in Julia 1.7: Refetch PTLS via Task struct (enable task migration) by vtjnash · Pull Request #40715 · JuliaLang/julia · GitHub

But this is irrelevant to this discussion. Unless a coroutine-like construct is implemented, it is impossible to work around this restriction inside user code since this is the property of the runtime system. The last time I checked it, ThreadPools.jl did not implement coroutine.

3 Likes

You keep insisting that thread pools cannot do what I’ve already done with theead pools I don’t understand what you’re talking about.

Create a channel, create a thread pool with 4 threads. Push 4 numbers into the channel. In the thread pools run a function that reads from the channel and calculates how many normal random numbers you have to generate before you get a value bigger than what it read from the channel… These tasks will run on separate threads, and do their work just fine… (Use local RNGs)

If ThreadPools.jl fits your need, I think it’s great and it’s not my intention to discourage it at all. All I want to do is to properly characterize what it actually is (or was, when I checked the last time) for helping you and other readers to understand the use case of it. I also didn’t say it’s not possible to use or implement “thread pool.” I was trying to explain that you don’t need ThreadPools.jl for this and ThreadPools.jl introduces overheads since it tries to disable what Julia runtime does (e.g., task migration across OS threads, in Julia 1.7 and later). There are much simpler and composable ways to implement task pools. In particular, the code in the OP already does it.

2 Likes

I seem to have started a heated exchange, so please let me summarize my understanding.

  • ThreadPools.jl is great in that it creates a channel-like object on which new tasks can be pushed (at wish, in the form of a function and arguments).
  • @tkf suggested a direct implementation, that’s so short that there’s no need to look for a package
  • Dagger.jl lets you create step-by-step a directed acyclic graph of computations. Each current leaf can, itself, extend the dag (actually a tree) and wait for its completion. I’m a bit nervous of the overhead of creating gazillions of tasks, perhaps in parallel, but in principle it should work cleanly.

One thing that the first two suggestions don’t address is to stop when all the work is finished. I must have been a bit unclear, so let me try to rephrase it mathematically.

We have a map f\colon T \to 2^T with t\in f(t) for all t, and an initial subset S\subseteq T. We are looking for the fixed point \bigcup_{n\ge0} f^n(S). In words, f is a map taking an object t, doing something with it, and creating 0 or more new objects on which to keep working.

Thinking about a channel abstraction, we start nthreads() workers who take items from the channel, do some work on them (probably modifying a global state), and push new items back on the channel. When all workers are idle and the channel is empty, we close the channel and return control to the user.

It’s especially for this stopping condition that I had to do some unpleasant lock yoga.

@tkf could you explain this a little more? I always assumed / hoped that ThreadPools would eventually be made obsolete by Julia’s evolution, but I haven’t seen that happen yet. I’d like to triage what you are saying here to determine if that obsolescence has begun, or if there is some new Julia functionality I need to wrap into the API. Thx

1 Like

From your description, it looks as if you are trying to generate a “task stack”, with a queue of tasks can be farmed out and results collected, but some of whom may generate new tasks for the stack. Is that right? If so, I have used that structure in the past and can go dig it out. I admit I have not plowed through this entire chain yet, so let me first verify I am even in the ballpark. :wink:

@nabla Okay, that wasn’t hard to track down. In the ThreadPools test suite, I have a stack test (using the Composable API) that is similar to what you are describing. The test is replicated below - it works on a stupid-simple outer function with test objects (so you’ll have to rework your task function), but it processes tasks in a stack and allows some of those results to generate new tasks, then shuts down when everything is complete. All without lock yoga (my new favorite phrase :slight_smile: ).

I am by no means saying this is the best or right way to do this. Only that it works and is intuitive to me, at least.

mutable struct TestObj
    data :: Int
end

function fn(x)
    x.data
end

@testset "stack tests" begin
    @testset "QueuePool background" begin

        objs = [TestObj(x) for x in 1:64]
        output = []
        pool = QueuePool(2)

        stack = Channel{TestObj}(1024) do stack
            for item in stack
                put!(fn, pool, item)
            end
            close(pool)
        end

        for item in objs
            put!(stack, item)
        end

        for result in poolresults(pool)
            push!(output, result)
            if result % 3 == 0
                put!(stack, TestObj(11))
            end
            if !isready(stack)
                close(stack)
            end
        end

        @test length(output) == 85

    end
end
1 Like

I like a lot this idea: all the bookkeeping (pulling and pushing from the channel) is done on one thread, avoiding many concurrency worries. I’ll benchmark it later for performance; however, I can’t get your code to work on my MWE, adapted from your example:

using Test, ThreadPools

function poisson(λ)
    t = rand()
    k = -1
    while k<20 && t > 0; k += 1; t -= λ^k * exp(-λ) / factorial(k); end
    return k
end

@testset "ThreadPools farm" begin
    λ = 0.95
    inputs = Float64[]
    outputs = Float64[]

    pool = ThreadPools.QueuePool(2)

    stack = Channel{Float64}(16384) do stack
        for item in stack
            put!(x->(@info "Processing $x on $(Threads.threadid())..."; x), pool, item)
        end
        close(pool)
    end

    for _=1:10
        x = rand()
        push!(inputs, x)
        put!(stack, x)
    end

    for result in poolresults(pool)
        push!(outputs, result)
        for _=1:poisson(λ)
            y = rand()
            push!(inputs, y)
            put!(stack, y)
        end
        
        if !isready(stack)
            close(stack)
        end
    end
    
    @test Set(outputs) == Set(inputs)
end

results in

[ Info: Processing 0.492314142808181 on 1...
[snip snip snip]
[ Info: Processing 0.6517998422521871 on 1...
ThreadPools farm: [ Info: Processing 0.9420322969577937 on 1...
Error During Test at REPL[3]:1
  Got exception outside of a @test
  InvalidStateException("Channel is closed.", :closed)

Am I missing something stupid?

By f of f(S) (appeared in \bigcup_{n\ge0} f^n(S)), did you mean a “derived” mapping

\begin{aligned} f' &: 2^T \to 2^T \\ &: S \mapsto \bigcup_{t \in S} f(t) \end{aligned}

? Just making sure, since I often see that f of f(S) in math to mean the “simply lifted” version S \mapsto \{ f(t) | t \in S \}.

If this is correct, I suppose I can rephrase the problem is to find a (non-trivial) fixed point \hat S \subset T of \hat f where

\begin{aligned} \hat f &: 2^T \to 2^T \\ &: S \mapsto S \cup f'(S) \end{aligned}

?

If I understand the problem statement correctly, a simple solution may be:

function fixedpoint(
    f!,
    inits,
    T = eltype(inits);
    ntasks = max(1, Threads.nthreads() - 1),
    buffersize = max(length(inits), 2^20),
)
    inputs = Channel{T}(buffersize)
    outputs = Channel{Vector{T}}(buffersize)
    fp = Set{T}()

    ninputs = 0
    for x in inits
        put!(inputs, x)
        ninputs += 1
    end
    @sync try
        for _ in 1:ntasks
            Threads.@spawn try
                for x in inputs
                    # Note: this can be further optimized by pooling allocated
                    # `ys` inside the centralized `while ninputs > 0` loop.
                    ys = eltype(outputs)(undef, 0)
                    f!(ys, x)
                    push!(outputs, ys)
                end
            finally
                close(outputs)
                close(inputs)
            end
        end

        while ninputs > 0
            ys = take!(outputs)
            ninputs -= 1
            for y in ys
                if !(y in fp)
                    ninputs += 1
                    if ninputs > buffersize
                        error("pending inputs exceeds buffersize")
                    end
                    put!(inputs, y)
                    push!(fp, y)
                end
            end
        end
    finally
        close(outputs)
        close(inputs)
    end

    return fp
end


# MWE
function f!(ys, x)
    d = √(1 - x)
    y1 = round((1 + d) / 3; digits = 10)
    y2 = round((1 - d) / 3; digits = 10)
    push!(ys, y1)
    push!(ys, y2)
end

fp = fixedpoint(f!, [0.25])

The point is to handle the duplication detection in one place so that it is easy to track pending inputs. It also has an extra benefit to keep the set locally in one CPU. Of course, since all the workers are banging two shared channels, you’d need expensive enough f! for this to be useful.

An interesting optimization may be to use a persistent data structure tracking the fixed point, share it with the workers, and then filter out the elements on the worker side. This is a valid optimization due to the monotonicity of \hat f (i.e., it’s OK to prune elements using the “past fixed point candidate” that under-estimates the latest view).

I think it’s still possible to use the feedback skeleton for this but you’d need a concurrent set which is hard to implement and hard to make it efficient. A problem-specific solution like above may not be so bad.

Is this always correct to do this, for arbitrary “stack” usage? Aren’t your test depending on that 11 % 3 != 0?