Multithreading with dynamic scheduler

I have an expensive function that I need to evaluate for several inputs. To use multithreading, I usually use the following workflow:

inputs = collect(deepcopy(input) for _ = 1 : Threads.nthreads())
Threads.@threads for k in iterable
	id = Threads.threadid()
	change_input!(inputs[id], k)
	restore_input!(inputs[id], k)

Since the time per iteration can vary, I want to use a dynamic scheduler with @spawn. To do so, I must ensure that a specific thread must not switch tasks before finishing the entire loop iteration. How can I do that?

It seems to me that I cannot do this without overwriting the wait function. Is that correct?

Thanks in advance

Could you refactor your code to use a pure function for parallel execution?
Then you could do

f(x) = x^2 # some expensive (pure) function
inputs = 1:10
futures = [Threads.@spawn f(x) for x in inputs]
# maybe more code here which does not depend on results
results = fetch.(futures)
1 Like

You might also consider using ThreadPools.jl which exposes the @qthreads macro for a dynamically scheduled for loop


Maybe something like

function worker(input, channel)
    while true
        k = take!(channel)
        isnothing(k) && break
        change_input!(input, k)
        restore_input!(input, k)

channel = Channel(N)
workers = []
for i = 1:N
    push!(workers, @spawn worker(deepcopy(input), channel))

for k in iterable
    put!(channel, k)

for i = 1:N
    put!(channel, nothing)

for worker in workers
1 Like

I thought about calling input_copy = deepcopy(input) inside the for loop and then calling the pure function expensive_function(input_copy). The problem is that input occupies a lot of memory, so I would rather minimize the number of calls to deepcopy.

I didn’t know about ThreadPools. Thanks!

Actually, I think I cannot guarantee that each worker is assigned to each thread. Maybe this is something where I can use the @tspawnat macro of the ThreadPools package suggested by @biona001.

If your expensive_function does not modify input (and only returns its output), there is no need to copy it.
If it does need to modify the input, maybe there is a way to split input into separate elements (e.g. columns in a matrix) where each instance of expensive_function only operates on a distinct part of it? Then a (distinct) view on input could be passed to each instance of the expensive_function.

Why do you need that guarantee?

For each iteration, I initially modify some of the elements of input (a struct basically composed of Vector{Float64} fields) and then I call expensive_function. Indeed, I could split input into two different variables where only one of them is modified in each iteration. The only problem is that it would demand much work to adapt expensive_function for this case.

Indeed, I didn’t pay attention to the fact that in your code I have each deepcopy(input) binded to a specific task, rather than binded to a specific thread.

I’ll definitely test your solution. Thank you!