How to use many (non-sticky) tasks while maximizing local storage reuse?

After reading the WIP blogpost https://github.com/JuliaLang/www.julialang.org/pull/1904, I’m trying to adapt to the new recommended approach to multithreading, now that tasks can migrate between threads. I would like to get some confirmation that I’m understanding the new recommendation correctly.

As I understand it, the new approach is to forego Threads.@threads altogether in favor of a chunk-centric @spawn. Your input data is divided in a number of chunks of equal lengths, and a task is spawned on each of these chunks. You can have many chunks per thread, creating a kind of common task queue. Each task takes care of allocating any storage it needs before iterating over its assigned chunk, instead of the caller doing it beforehand as in the @threads approach. This way threadid() is avoided and task migration is not a problem.

There is a twist in my case. My local storage is a complicated Solver() object with potentially large internal preallocated space. I cannot have each task allocating a new copy of Solver(), that is too expensive. One solver copy per thread is ok, but one per task is suboptimal.

This would be my code doing things the Old Way (using @threads :static)

function solve(inputs)
	solvers = [Solver() for _ in 1:nthreads()]
	solutions = Vector{Solution}(undef, length(inputs))
	@threads :static for (i, input) in enumerate(inputs)
		solutions[i] = solve!(solvers[threadid()], input)
	end
	return solutions
end

where each solve!(solver, input) mutates internal solver fields. Here we pay the price of having nthreads() copies of Solver().

With the new task-based approach I would need to bundle each chunk with its own copy of the storage. This would be the analogous code

function solve(inputs; number_of_chunks = nthreads())
	chunk_size = max(1, length(inputs) ÷ number_of_chunks)
	chunks = Iterators.partition(inputs, chunk_size)
	solvers = [Solver() for _ in chunks]
	tasks = map(zip(solvers, chucks)) do (solver, chunk)
		@spawn map(solver!, Iterators.repeated(solver), chunk)	
	end
	solutions = Iterators.flatten(fetch.(tasks))
	return solutions
end

Question 1: If number_of_chunks = nthreads(), aren’t the two approaches equivalent?

If number_of_chunks > nthreads() we should get some degree of load balancing with the second approach, which is nice. However we also increase the number of Solver() copies, which is not so nice.

Question 2: Is there any way we can get number_of_chunks > nthreads() but still use only nthreads() copies of the Solver() storage that get reused?

I think Question 2 should be possible (and optimal) if we had sticky tasks by default. We could then still use threadid() and get load-balanding by spawning lots of sticky tasks that digested as threads become idle. So non-sticky tasks seem like a drag in my (probably naive) view.

Question 3: What do we gain from non-sticky tasks? Can we disable them somehow so we can still use threadid() or would that be a bad idea for some reason?

XRef: Multithreading, preallocation of objects, threadid() and task migration

1 Like

I think you can get all alternatives here: Multithreading with shared memory caches

1 Like

Thanks! Very useful, I just replied over there too. It would seem that your suggestion of using a Channel with the desired number of solvers would be ideal… as long as taking and putting them back into the channel does not have a huge overhead. It seems to be only like 91 ns in my computer.

For completeness, this would be the type of Channel-based solution suggested by @lmiq in the other thread

function build_solvers(nsolvers = nthreads())
	solvers = Channel{Solver}(nsolvers)
	for _ in 1:nsolvers
		put!(solvers, Solver())
	end
	return solvers
end

function solve(solvers, inputs; number_of_chunks = length(solvers))
	chunk_size = max(1, length(inputs) ÷ number_of_chunks)
	chunks = Iterators.partition(inputs, chunk_size)
	tasks = map(chunks) do chunk
		@spawn begin
			solver = take!(solvers)
			results = map(c -> solver!(solver, c), chunk)	
			put!(solvers, solver)
			results
		end
	end
	solutions = Iterators.flatten(fetch.(tasks))
	return solutions
end
2 Likes

The use of Iterators.partition is not ideal for this task, because it may partition in uneven number of tasks per chunk, for example:

julia> length.(Iterators.partition(1:10,4))
3-element Vector{Int64}:
 4
 4
 2

which is why I wrote the (simple but convenient) ChunkSplitters package, which will result in:

julia> using ChunkSplitters

julia> length.(map(first,chunks(1:10,3)))
3-element Vector{Int64}:
 4
 3
 3

Then, I think that if you do not want to spawn many tasks, I prefer the following pattern:

using ChunkSplitters 

function solve(solvers, inputs; number_of_chunks=length(solvers))
    @threads for (i_range, i_chunk) in chunks(inputs, number_of_chunks)
         for i in i_range
            solver = solvers[i_chunk]
            input = inputs[i]
            ...
        end
    end
    # reduce results
    return ...
end

Which has two good properties: 1) It only spawns the number of tasks associated to the number of chunks desired; 2) it has essentially no overhead relative to a simple @threads-ed loop. (You could do the same with @sync and @spawn, but there would be no advantage here. To improve load balancing you can simply increase the number of chunks (or use the :scatter chunking option if there is a correlation between the index of the task and its cost).

If you don’t mind spawning many tasks, you can use channels directly, and there is no need for chunking:

function solve(solvers, input)
    @sync for task in inputs
        @spawn begin
             solver = take!(solvers)
             ...
             put!(solvers, solver)
         end
   end
    # reduce 
    return ...
end

You don’t need chunking because the channels are blocking, the spawned tasks will only run when a channel is available. That is better if the tasks are very uneven and if the time required for spawning tasks is not important relative to the time of each task.

ps: Yet, I think there is something about channels that I don’t get exactly right… When I try experimenting with them I often reach locked states and I don’t understand exactly why… It seems that I can get to a stalled take! call with the pattern above. This was the issue: When using channels, errors do not get raised, and computation stalls.

1 Like

The two approaches are mostly equivalent, but the problem with using @threads :static is that if there’s anything else multithreaded going on elsewhere in your code that you’re not aware of, it’ll destructively interfere.

E.g. if solve! is actually doing some multithreaded stuff interally, you could end up being reduced to worse than single threaded speeds if you use @threads :static, but that is not the case with @spawn.

This should be possible if you combine a channel with the chunking approach as put forth here: Multithreading with shared memory caches - #6 by danielwe

For your case, that’d look something like

function solve2(inputs; number_of_chunks = 20 * nthreads())
    solutions = Vector{Solution}(undef, length(inputs))
    
	chunk_size = max(1, length(inputs) ÷ number_of_chunks)
	chunks = Iterators.partition(enumerate(inputs), chunk_size)

    chunk_queue = Channel{eltype(chunks)}(Inf)
    foreach(chunk -> put!(ch, chunk), chunks)
    close(chunk_queue)
    
	@sync for _ ∈ 1:nthreads()
        @spawn begin
            solver = Solver()
            for chunk ∈ chunk_queue
                for (i, input) in chunk
                    solutions[i] = solve!(solve)
                end
            end
        end
	end
	return solutions
end

In this case, we only ever create nthreads() different tasks, but there’s by default 20 chunks per task, and each task is pulling chunks to work on from the shared Channel.

Taking from the channel has some overhead due to locks, so we don’t want to do it every iteration, but that’s why we store the work in chunks so that after taking a chunk, we can do a fast sequential loop over the chunk.

There may be a more elegant and more efficient way to write this, I’m not sure, I’m not an experienced user of channels or this specific pattern.

Mostly, we gain composability. One function can do multithreading without worrying if some other function is doing multithreading. We also gain a lot of general flexibility beyond being able to just statically schedule a for loop which is all the old scheduler was really capable of.

1 Like

Oops, I missed that you wrote down a nice channel based solution here. This is slightly different from what I suggested but should end up being somewhat similar. I’d be interested to hear if anyone explores the relative tradeoffs.

1 Like

There is a small test here: FLoops @init allocate only once - #13 by lmiq

Channels have a small, but noticeable, overhead, which may or may not be important depending on the application. (although that is not exactly the same pattern of channels use you have proposed)

2 Likes

Note that @threads (or, explicitly, @threads :dynamic) isn’t the same as @sync @spawn. In particular, perhaps contrary to intuition, the former does not implement proper load balancing whereas the latter does. Check out my comment here.

2 Likes

That comment specially refers to spawning tasks for a limited number of chunks, defined manually (or using ChunkSplitters). If the number of chunks is equal to the number of available threads, there will be no difference between using @threads or @spawn. To gain some load balancing we need to have more chunks than threads.

But I understand your comment, you mean that when one chunk is finished, @threads won’t migrate a task to an idle thread, while @spawn will use it, right?

1 Like