Multithreaded compute farm

@tkf could you explain this a little more? I always assumed / hoped that ThreadPools would eventually be made obsolete by Julia’s evolution, but I haven’t seen that happen yet. I’d like to triage what you are saying here to determine if that obsolescence has begun, or if there is some new Julia functionality I need to wrap into the API. Thx

1 Like

From your description, it looks as if you are trying to generate a “task stack”, with a queue of tasks can be farmed out and results collected, but some of whom may generate new tasks for the stack. Is that right? If so, I have used that structure in the past and can go dig it out. I admit I have not plowed through this entire chain yet, so let me first verify I am even in the ballpark. :wink:

@nabla Okay, that wasn’t hard to track down. In the ThreadPools test suite, I have a stack test (using the Composable API) that is similar to what you are describing. The test is replicated below - it works on a stupid-simple outer function with test objects (so you’ll have to rework your task function), but it processes tasks in a stack and allows some of those results to generate new tasks, then shuts down when everything is complete. All without lock yoga (my new favorite phrase :slight_smile: ).

I am by no means saying this is the best or right way to do this. Only that it works and is intuitive to me, at least.

mutable struct TestObj
    data :: Int
end

function fn(x)
    x.data
end

@testset "stack tests" begin
    @testset "QueuePool background" begin

        objs = [TestObj(x) for x in 1:64]
        output = []
        pool = QueuePool(2)

        stack = Channel{TestObj}(1024) do stack
            for item in stack
                put!(fn, pool, item)
            end
            close(pool)
        end

        for item in objs
            put!(stack, item)
        end

        for result in poolresults(pool)
            push!(output, result)
            if result % 3 == 0
                put!(stack, TestObj(11))
            end
            if !isready(stack)
                close(stack)
            end
        end

        @test length(output) == 85

    end
end
1 Like

I like a lot this idea: all the bookkeeping (pulling and pushing from the channel) is done on one thread, avoiding many concurrency worries. I’ll benchmark it later for performance; however, I can’t get your code to work on my MWE, adapted from your example:

using Test, ThreadPools

function poisson(λ)
    t = rand()
    k = -1
    while k<20 && t > 0; k += 1; t -= λ^k * exp(-λ) / factorial(k); end
    return k
end

@testset "ThreadPools farm" begin
    λ = 0.95
    inputs = Float64[]
    outputs = Float64[]

    pool = ThreadPools.QueuePool(2)

    stack = Channel{Float64}(16384) do stack
        for item in stack
            put!(x->(@info "Processing $x on $(Threads.threadid())..."; x), pool, item)
        end
        close(pool)
    end

    for _=1:10
        x = rand()
        push!(inputs, x)
        put!(stack, x)
    end

    for result in poolresults(pool)
        push!(outputs, result)
        for _=1:poisson(λ)
            y = rand()
            push!(inputs, y)
            put!(stack, y)
        end
        
        if !isready(stack)
            close(stack)
        end
    end
    
    @test Set(outputs) == Set(inputs)
end

results in

[ Info: Processing 0.492314142808181 on 1...
[snip snip snip]
[ Info: Processing 0.6517998422521871 on 1...
ThreadPools farm: [ Info: Processing 0.9420322969577937 on 1...
Error During Test at REPL[3]:1
  Got exception outside of a @test
  InvalidStateException("Channel is closed.", :closed)

Am I missing something stupid?

By f of f(S) (appeared in \bigcup_{n\ge0} f^n(S)), did you mean a “derived” mapping

\begin{aligned} f' &: 2^T \to 2^T \\ &: S \mapsto \bigcup_{t \in S} f(t) \end{aligned}

? Just making sure, since I often see that f of f(S) in math to mean the “simply lifted” version S \mapsto \{ f(t) | t \in S \}.

If this is correct, I suppose I can rephrase the problem is to find a (non-trivial) fixed point \hat S \subset T of \hat f where

\begin{aligned} \hat f &: 2^T \to 2^T \\ &: S \mapsto S \cup f'(S) \end{aligned}

?

If I understand the problem statement correctly, a simple solution may be:

function fixedpoint(
    f!,
    inits,
    T = eltype(inits);
    ntasks = max(1, Threads.nthreads() - 1),
    buffersize = max(length(inits), 2^20),
)
    inputs = Channel{T}(buffersize)
    outputs = Channel{Vector{T}}(buffersize)
    fp = Set{T}()

    ninputs = 0
    for x in inits
        put!(inputs, x)
        ninputs += 1
    end
    @sync try
        for _ in 1:ntasks
            Threads.@spawn try
                for x in inputs
                    # Note: this can be further optimized by pooling allocated
                    # `ys` inside the centralized `while ninputs > 0` loop.
                    ys = eltype(outputs)(undef, 0)
                    f!(ys, x)
                    push!(outputs, ys)
                end
            finally
                close(outputs)
                close(inputs)
            end
        end

        while ninputs > 0
            ys = take!(outputs)
            ninputs -= 1
            for y in ys
                if !(y in fp)
                    ninputs += 1
                    if ninputs > buffersize
                        error("pending inputs exceeds buffersize")
                    end
                    put!(inputs, y)
                    push!(fp, y)
                end
            end
        end
    finally
        close(outputs)
        close(inputs)
    end

    return fp
end


# MWE
function f!(ys, x)
    d = √(1 - x)
    y1 = round((1 + d) / 3; digits = 10)
    y2 = round((1 - d) / 3; digits = 10)
    push!(ys, y1)
    push!(ys, y2)
end

fp = fixedpoint(f!, [0.25])

The point is to handle the duplication detection in one place so that it is easy to track pending inputs. It also has an extra benefit to keep the set locally in one CPU. Of course, since all the workers are banging two shared channels, you’d need expensive enough f! for this to be useful.

An interesting optimization may be to use a persistent data structure tracking the fixed point, share it with the workers, and then filter out the elements on the worker side. This is a valid optimization due to the monotonicity of \hat f (i.e., it’s OK to prune elements using the “past fixed point candidate” that under-estimates the latest view).

I think it’s still possible to use the feedback skeleton for this but you’d need a concurrent set which is hard to implement and hard to make it efficient. A problem-specific solution like above may not be so bad.

Is this always correct to do this, for arbitrary “stack” usage? Aren’t your test depending on that 11 % 3 != 0?

I think this is counter to the point of channels. Channels are for communicating between threads and should alleviate error-prone, low-level locking.

To @tkf’s point - that close condition does not work for @nabla’s case, closing the stack prematurely. I’ll have to take a look at why it works in the test (that is some old code) and see if there is a better general-purpose solution. But a fix here is easy:

@testset "ThreadPools farm" begin
    λ = 0.95
    inputs = Float64[]
    outputs = Float64[]

    pool = ThreadPools.QueuePool(2)

    stack = Channel{Float64}(16384) do stack
        for item in stack
            put!(x->(@info "Processing $x on $(Threads.threadid())..."; x), pool, item)
        end
        close(pool)
    end

    for _=1:10
        x = rand()
        push!(inputs, x)
        put!(stack, x)
    end

    for result in poolresults(pool)
        push!(outputs, result)
        for _=1:poisson(λ)
            y = rand()
            push!(inputs, y)
            put!(stack, y)
        end

        if length(outputs) == length(inputs)
            close(stack)
        end
    end

    @test Set(outputs) == Set(inputs)
end