I have an expensive function that I need to evaluate for several inputs. To use multithreading, I usually use the following workflow:
inputs = collect(deepcopy(input) for _ = 1 : Threads.nthreads())
Threads.@threads for k in iterable
id = Threads.threadid()
change_input!(inputs[id], k)
expensive_function(inputs[id])
restore_input!(inputs[id], k)
end
Since the time per iteration can vary, I want to use a dynamic scheduler with @spawn
. To do so, I must ensure that a specific thread must not switch tasks before finishing the entire loop iteration. How can I do that?
It seems to me that I cannot do this without overwriting the wait
function. Is that correct?
Thanks in advance
Could you refactor your code to use a pure function for parallel execution?
Then you could do
f(x) = x^2 # some expensive (pure) function
inputs = 1:10
futures = [Threads.@spawn f(x) for x in inputs]
# maybe more code here which does not depend on results
results = fetch.(futures)
1 Like
You might also consider using ThreadPools.jl which exposes the @qthreads
macro for a dynamically scheduled for loop
2 Likes
Maybe something like
function worker(input, channel)
while true
k = take!(channel)
isnothing(k) && break
change_input!(input, k)
expensive_function(input)
restore_input!(input, k)
end
end
channel = Channel(N)
workers = []
for i = 1:N
push!(workers, @spawn worker(deepcopy(input), channel))
end
for k in iterable
put!(channel, k)
end
for i = 1:N
put!(channel, nothing)
end
for worker in workers
wait(worker)
end
1 Like
I thought about calling input_copy = deepcopy(input)
inside the for
loop and then calling the pure function expensive_function(input_copy)
. The problem is that input
occupies a lot of memory, so I would rather minimize the number of calls to deepcopy
.
I didn’t know about ThreadPools. Thanks!
Actually, I think I cannot guarantee that each worker
is assigned to each thread. Maybe this is something where I can use the @tspawnat
macro of the ThreadPools package suggested by @biona001.
If your expensive_function does not modify input (and only returns its output), there is no need to copy it.
If it does need to modify the input, maybe there is a way to split input into separate elements (e.g. columns in a matrix) where each instance of expensive_function only operates on a distinct part of it? Then a (distinct) view on input could be passed to each instance of the expensive_function.
Why do you need that guarantee?
For each iteration, I initially modify some of the elements of input
(a struct
basically composed of Vector{Float64}
fields) and then I call expensive_function
. Indeed, I could split input
into two different variables where only one of them is modified in each iteration. The only problem is that it would demand much work to adapt expensive_function
for this case.
Indeed, I didn’t pay attention to the fact that in your code I have each deepcopy(input)
binded to a specific task, rather than binded to a specific thread.
I’ll definitely test your solution. Thank you!