@tkf could you explain this a little more? I always assumed / hoped that ThreadPools would eventually be made obsolete by Julia’s evolution, but I haven’t seen that happen yet. I’d like to triage what you are saying here to determine if that obsolescence has begun, or if there is some new Julia functionality I need to wrap into the API. Thx
From your description, it looks as if you are trying to generate a “task stack”, with a queue of tasks can be farmed out and results collected, but some of whom may generate new tasks for the stack. Is that right? If so, I have used that structure in the past and can go dig it out. I admit I have not plowed through this entire chain yet, so let me first verify I am even in the ballpark.
@nabla Okay, that wasn’t hard to track down. In the ThreadPools test suite, I have a stack test (using the Composable API) that is similar to what you are describing. The test is replicated below - it works on a stupid-simple outer function with test objects (so you’ll have to rework your task function), but it processes tasks in a stack and allows some of those results to generate new tasks, then shuts down when everything is complete. All without lock yoga (my new favorite phrase ).
I am by no means saying this is the best or right way to do this. Only that it works and is intuitive to me, at least.
mutable struct TestObj
data :: Int
end
function fn(x)
x.data
end
@testset "stack tests" begin
@testset "QueuePool background" begin
objs = [TestObj(x) for x in 1:64]
output = []
pool = QueuePool(2)
stack = Channel{TestObj}(1024) do stack
for item in stack
put!(fn, pool, item)
end
close(pool)
end
for item in objs
put!(stack, item)
end
for result in poolresults(pool)
push!(output, result)
if result % 3 == 0
put!(stack, TestObj(11))
end
if !isready(stack)
close(stack)
end
end
@test length(output) == 85
end
end
I like a lot this idea: all the bookkeeping (pulling and pushing from the channel) is done on one thread, avoiding many concurrency worries. I’ll benchmark it later for performance; however, I can’t get your code to work on my MWE, adapted from your example:
using Test, ThreadPools
function poisson(λ)
t = rand()
k = -1
while k<20 && t > 0; k += 1; t -= λ^k * exp(-λ) / factorial(k); end
return k
end
@testset "ThreadPools farm" begin
λ = 0.95
inputs = Float64[]
outputs = Float64[]
pool = ThreadPools.QueuePool(2)
stack = Channel{Float64}(16384) do stack
for item in stack
put!(x->(@info "Processing $x on $(Threads.threadid())..."; x), pool, item)
end
close(pool)
end
for _=1:10
x = rand()
push!(inputs, x)
put!(stack, x)
end
for result in poolresults(pool)
push!(outputs, result)
for _=1:poisson(λ)
y = rand()
push!(inputs, y)
put!(stack, y)
end
if !isready(stack)
close(stack)
end
end
@test Set(outputs) == Set(inputs)
end
results in
[ Info: Processing 0.492314142808181 on 1...
[snip snip snip]
[ Info: Processing 0.6517998422521871 on 1...
ThreadPools farm: [ Info: Processing 0.9420322969577937 on 1...
Error During Test at REPL[3]:1
Got exception outside of a @test
InvalidStateException("Channel is closed.", :closed)
Am I missing something stupid?
By f of f(S) (appeared in \bigcup_{n\ge0} f^n(S)), did you mean a “derived” mapping
? Just making sure, since I often see that f of f(S) in math to mean the “simply lifted” version S \mapsto \{ f(t) | t \in S \}.
If this is correct, I suppose I can rephrase the problem is to find a (non-trivial) fixed point \hat S \subset T of \hat f where
?
If I understand the problem statement correctly, a simple solution may be:
function fixedpoint(
f!,
inits,
T = eltype(inits);
ntasks = max(1, Threads.nthreads() - 1),
buffersize = max(length(inits), 2^20),
)
inputs = Channel{T}(buffersize)
outputs = Channel{Vector{T}}(buffersize)
fp = Set{T}()
ninputs = 0
for x in inits
put!(inputs, x)
ninputs += 1
end
@sync try
for _ in 1:ntasks
Threads.@spawn try
for x in inputs
# Note: this can be further optimized by pooling allocated
# `ys` inside the centralized `while ninputs > 0` loop.
ys = eltype(outputs)(undef, 0)
f!(ys, x)
push!(outputs, ys)
end
finally
close(outputs)
close(inputs)
end
end
while ninputs > 0
ys = take!(outputs)
ninputs -= 1
for y in ys
if !(y in fp)
ninputs += 1
if ninputs > buffersize
error("pending inputs exceeds buffersize")
end
put!(inputs, y)
push!(fp, y)
end
end
end
finally
close(outputs)
close(inputs)
end
return fp
end
# MWE
function f!(ys, x)
d = √(1 - x)
y1 = round((1 + d) / 3; digits = 10)
y2 = round((1 - d) / 3; digits = 10)
push!(ys, y1)
push!(ys, y2)
end
fp = fixedpoint(f!, [0.25])
The point is to handle the duplication detection in one place so that it is easy to track pending inputs. It also has an extra benefit to keep the set locally in one CPU. Of course, since all the workers are banging two shared channels, you’d need expensive enough f!
for this to be useful.
An interesting optimization may be to use a persistent data structure tracking the fixed point, share it with the workers, and then filter out the elements on the worker side. This is a valid optimization due to the monotonicity of \hat f (i.e., it’s OK to prune elements using the “past fixed point candidate” that under-estimates the latest view).
I think it’s still possible to use the feedback skeleton for this but you’d need a concurrent set which is hard to implement and hard to make it efficient. A problem-specific solution like above may not be so bad.
Is this always correct to do this, for arbitrary “stack” usage? Aren’t your test depending on that 11 % 3 != 0
?
I think this is counter to the point of channels. Channels are for communicating between threads and should alleviate error-prone, low-level locking.
To @tkf’s point - that close condition does not work for @nabla’s case, closing the stack prematurely. I’ll have to take a look at why it works in the test (that is some old code) and see if there is a better general-purpose solution. But a fix here is easy:
@testset "ThreadPools farm" begin
λ = 0.95
inputs = Float64[]
outputs = Float64[]
pool = ThreadPools.QueuePool(2)
stack = Channel{Float64}(16384) do stack
for item in stack
put!(x->(@info "Processing $x on $(Threads.threadid())..."; x), pool, item)
end
close(pool)
end
for _=1:10
x = rand()
push!(inputs, x)
put!(stack, x)
end
for result in poolresults(pool)
push!(outputs, result)
for _=1:poisson(λ)
y = rand()
push!(inputs, y)
put!(stack, y)
end
if length(outputs) == length(inputs)
close(stack)
end
end
@test Set(outputs) == Set(inputs)
end