Hi, I have the following code section with millions of loops using multi-threading.
global result_list = Vector{Union{Nothing,SparseVector{Float64,Int64}}}(nothing, tot_num)
Threads.@threads for element in element_list
ind = element_to_ind_Dict[element] # get the index
@show ind
result = function_of_element(element) # the main calculation
if result === nothing
continue
end
@show result
global result_list
result_list[ind] = result
end
I know that in some loops the function_of_element() costs much longer time than the others. Although there are millions of loops waiting for being calculated, it still looks like I am using only one thread for most of the time.
@threads for simply divides the serially ordered work into contiguous chunks and assigns those chunks to nthreads() tasks. So if chunk N has most of the big iterates, its task will keep running long after the others are done. A fairer alternative is to start up nthreads() worker tasks which take work items from a Channel. If you know in advance which iterates are most costly, they should probably go into the queue first.
“Dynamic scheduling” in this context means that once a task has been defined, different portions may execute on different threads (sequentially, unless it has sub-tasks). The logic for building tasks in base/threadingconstructs.jl seems to have been the same for quite a while.
The current situation is apparently an “implementation detail”, and therefore (!) not well documented. The manual explicitly says that the @threads body “must not make any assumptions about the distribution of iterations to tasks …,” which leads some of us to prefer other constructs.
using ThreadsX
function calc_result_list(element_list, element_to_ind_Dict, tot_num)
result_list = Vector{Union{Nothing,SparseVector{Float64,Int64}}}(nothing, tot_num)
ThreadsX.foreach(element_list, basesize=1) do element
ind = element_to_ind_Dict[element] # get the index
@show ind
result = function_of_element(element) # the main calculation
if result === nothing
return
end
@show result
result_list[ind] = result
end
return result_list
end
ThreadsX allows you to pass a basesize. Using a basesize=1 means tasks work on 1 iteration at a time, rather than large chunks.
The above implementation also avoids globals.
Re: other constructs
In addition to Chris Elrod’s example above, there are some good ones in the Transducers/FLoops packages and no end of (potentially dangerous) things one can do oneself with @spawn.
The task structure of @threads :dynamic and @threads :static is actually the same, i.e. both create O(nthreads()) tasks corresponding to contiguous regions of the given (potentially large) iteration range. The only difference is that tasks can migrate (“are non-sticky”) in the former case while they can’t in the latter. Compare this to @sync for ... @spawn ... which create one task per loop iteration and gives a form of load-balancing through Julias task scheduler. In pictures:
Note that neither :static nor :dynamic gives load balancing (as @spawn does). Also note that the task->thread mapping isn’t fixed for :dynamic but is for :static. However, when we sort by workload we see that eventually they do the same thing. So, to summarize, which Julia thread does which chunk is dynamically decided of :dynamic but the chunks are the same as for :static.
(Pluto notebook: load_balancing.jl (45.8 KB) - Be aware though that I use hacky/unsafe threadid() pattern here for simplicity.)
You might want to check out these comments by @tkf: