I have code that assumes that each iteration of a Threads@threads for
loop is run on a single thread. I.e., that one can use Threads.threadid()
at the beginning of the loop to determine which thread is executing that loop iteration, and that this will not change during that iteration. Is this an OK assumption going forward?
In general yes, but there are corner cases: FAQ Ā· FLoops
The tasks created with @threads
are sticky, so, yes, they donāt migrate between Julia threads. You might want to take a look at: Julia 1.7 says it can switch the thread your task is on. How often does that happen, and how can it be disabled? - #4 by carstenbauer.
(The mentioned corner case doesnāt apply to @threads for
as the linked documentation mentions itself in the blue note box.)
Thanks, your link provides a definitive answer as to why this is the case.
One less thing to worry about!
I am afraid that you need to worry about this.
Itās OK for now as was pointed out. But itāll break soon in āgeneralā cases
where āgeneralā means you donāt know or canāt control the yield points.
Well, but he can just use @threads :static for
and then itās safe even in 1.8 or whenever other modes get added, no?
Yeah, thatās fine. Iād argue new code āshouldnātā be using it but itās in the API so ultimately nothing stops people from using it.
I know and understand that the prevalent opinion is to not rely on static / sticky scheduling but to expect and use the dynamic mechanisms that we have (or will have) to enable composable multithreading. But, IMO, itās definitely a good thing that we have a static scheduling option a part of the API (to me, your comment somewhat sounds like itās a mistake of the past and that you would get rid of the option if you could). Not every code is library code that needs to be composable. In my specific application, I might know exactly what I want or need so that the dynamic scheduler might just introduce unnecessary overhead and opaqueness. But more importantly, in HPC, in particular when using external tools (written in a / for a pre-task-based multithreading world) and when fine-tuning the performance to a particular cluster one often times needs to know and be able to reliably control which OS thread gets which tasks (i.e. workloads). For example, if I want to use hardware performance counters through, say, LIKWID.jl), I need to make sure that a certain computation runs on a certain OS thread (which must be pinned to the correct CPU-core). Or if I want to respect the NUMA structure of a system I need to make sure that certain tasks donāt migrate to Julia threads running in a different NUMA domain etc. Of course, the majority of people probably shouldnāt or donāt need to care about these things and/or the benefits of dynamic task scheduling and composable multithreading just outweigh the disadvantages. But, IMO, we should also have options for more fine-grained control as part of the official API.
(I hope this doesnāt come across as a rant )
I do recognize the need for controlling how tasks are mapped to OS threads. It is a reasonable thing to do for āapplicationā (= non-library) code where you need to squeeze out the last bit of performance, at least for now. The application authors have more knowledge on how the components are organized than the generic julia
scheduler and it helps optimize the performance.
But I donāt think @threads :static for i in 1:nthreads()
is the right approach. If you want to ask for āone task for each worker thread,ā then there should be an API to directly express this. Iād imagine the main new usages of @threads :static
would actually be ad-hoc implementations of this API. I think there should be a specific API to do it because nthreads()
may not reflect the number of worker threads you can use. For example, after https://github.com/JuliaLang/julia/pull/42302, nthreads()
may not reflect the number of worker threads you can use (although maybe I should clarify that I suggested this behavior). This can also happen with other conceivable changes like I/O- or finalizer-specific worker thread in the scheduler. So, I donāt think not discouraging @threads :static for i in 1:nthreads()
is a good idea even in application code. I think itās better to have an API like Threads.foreachworker(f)
specifically for this.
(-1)^3 = -1 so okay, :static is bad. But what if the underlying thing has chunking structure, for example what if the thing weāre looping over are grouped in cache or file encoding (blob)? Dynamic scheduling would imply extra overhead because threads are now reading scattered data across multiple chunks where in :static the overlap is minimal (only happens at boundary)
Yeah, I knew that sentence was stupid but I wanted to hint that my argument is stricter than āI donāt think encouraging @threads :static for i in 1:nthreads()
is a good idea.ā
APIs using :static
scheduling internally is not usable in parallel application. This is because :static
scheduling throws when it is started on non-primary worker thread. This is a rather artificial limitation but we canāt fix it. Waiting for all worker threads to be free is just simply a bad idea if we want to use it within parallel program where other components may be using some workers. So, donāt use it in packages if you want to make them useful for other people. Otherwise, nothing stops you from micro-optimizing your code.
Itās not clear what kind of effects you are talking about [1]. But, FWIW, I am working on scheduling algorithms and implementations for making Julia scheduler āawareā of cache in some sense. I donāt think we need to throw away cache-friendliness in order to have composable parallelism. But, if you have some examples where Juliaās dynamic scheduling gets in the way of having a good performance, please do let me know.
Is it about task migration at yield point effectively invalidates the CPU cache? But then itās not clear what āoverlapā is referring to. The worry about overlap could be interpreted as effect of false sharing due to concurrent writes but do you have an example where invalidation of
O(nthreads())
cachelines matter? Also, itās contradict with the phrase āreading scattered dataā. ā©ļø
Iām referring to high-level cache, not CPU cache. Think of the data youāre iterating over this way:
[data1, data2, data3...][data41, data42, data43, data44][...][..][.....]
letās call a []
a basket, in :static
, each thread would get many baskets, with small fraction overlap on the boundary, for example, a thread gets data 1-401, but the boundary looks like [...data400][data401, data402, data403][...]
. The problem is that each basket can only be read, parse, decompressed as a whole, so the sensible thing to do is process a basket and store in a thread-local cache.
Now :dynamic
comes in, suddenly you have multiple threads access the same basket much more often, and replacing many local caches just to read a few data in a basket before throwing away.
Benchmarking with I/O stuff is hard and although I can see an increase in allocation, the speed impact seems to be minimal:
julia> using UnROOT, Base.Threads
julia> const tree = LazyTree("/home/akako/Downloads/Run2012BC_DoubleMuParked_Muons.root", "Events");
julia> function burn(N)
res = 0.0
for i = 1:N
res+=cos(sin(i))
end
res
end
julia> function main(tree)
res = [0 for _ = 1:nthreads()]
@threads :static for evt in tree
burn(10^2) # simulate computing
res[threadid()] += length(evt.Muon_pt)
end
res
end
julia> @time main(tree);
12.181016 seconds (21.00 k allocations: 2.663 GiB, 0.01% gc time)
julia> @time main_dynamic(tree)
13.050303 seconds (21.48 k allocations: 2.917 GiB, 0.03% gc time)
10-element Vector{Int64}:
15265992
15204865
15193408
14531832
14541762
14539929
14532228
15236139
15057163
15219138
I donāt know under what real-world load this problem may be amplified, but I can see it may go wrong.
I strongly suggest avoiding reasoning about worker (OS) threads in Julia programs. It is a very low-level construct and arguably it should be avoided in idiomatic Julia programs. The primitive for concurrent execution in Julia is Task
and not thread. Basically, if you write threadid()
or nthreads()
anywhere, thatās a big warning sign unless you know what you are doing. (Note: I already have mentioned the exceptions which I consider is fine. Iām not repeating them and my comment here only discusses what should be done in principle.)
There are various ways to specify ālocalā resources. For example, FLoops.jl provides @init
macro to declare basecase-local (approximately task-local) resource. See also Concurrency patterns for controlled parallelisms for various composable idioms for handling local resources.
what do you suggest to do then? I donāt care thread-locality per se, but I do care having a buffer that can be indexed by threadid()
because thereās no other way to organize the basket cache in a thread-safe way.
When a task (thread, technicality doesnāt matter for this) wants to read an event, it checks if the event is in a basket already decompressed, we canāt afford to do lookup every cache for every event because you have 10^8 * 100 to go through easily, so what we do is assume most of the time iteration is linear (:static
makes this model very realistic) and check the event number against two number (beginning and end of the basket) and decide if we need to read from disk ā this is the reason cache is āthread-localā, itās just a way to keep track of things
I really want to find a better and easier-to-implement way to do this, we spent many hours experimenting with thread-safe LRU caches and stuff, they are an order of magnitude slower. This IS low-level I guess.
Iād try making cache local to task (or any logically serial region of the code), as I suggested above.
you have as many tasks as the number of iterations right? in code like:
@threads for event in a_million_events
... # we have a million tasks here?
end
but you donāt want to read form disk and decompress for that many times, you probably have 100 iterations between disk reads, which means this cache cannot be task local?
No, @threads
currently spawns only nthreads()
tasks, and each task processes a contiguous region of the input. In the current implementation, the task tree structure of :dynamic
and :static
is identital. The only difference is that the underlying tasks in :dynamic
can be scheduled on an arbitrary worker thread and is allowed to migrate across worker threads, like the usual tasks created via Threads.@spawn
.
The details may change in the future but I believe the number of tasks will always be O(nthreads())
and each task process one contiguous region (or a few contiguous regions). Iām almost certain this will always be the case since otherwise, itād be essentially equivalent to @sync for x in xs; @spawn ...; end
.
(This is not apparent from the current docstring Multi-Threading Ā· The Julia Language and so we probably should add some more clarifications.)
oops, sorry i had a more ādynamicā understanding which is wrong, ok yeah, I agree fully that I donāt care (OS) threads locality, I just need the task that has contiguous region of input to have a fixed āplaceā for storing itās decompressed chunkā¦ which Iām currently using buffers[threadid()]
and I donāt think thereās an alternative even though itās not correct anymore
Yeah, I realized that the current docstring is not very clear about this. Iāll try to feedback our conversation to a PR to make it more understandable.
You are right. This pattern is invalid with new @threads
since the underlying task can migrate across thread. This is especially the case since (presumably) you are doing I/O in the loop.
If you have a quick workaround, you can do
@sync @threads for i in 1:nthreads()
@async handle_i_th_partition(i)
end
or
if Threads.threadid() == 1
@threads :static for ...
end
else
for ... # sequential fallback
end
end
to correctly use the buffers[threadid()]
pattern. However, as you might have noticed, Iām strongly against this pattern and I believe there are various ways to tackle this.
If your project is on GitHub, and if you want a less abstract comment from me, feel free to ping me on the actual code. I canāt promise if I can give good feedback in time but Iām curious to look at challenging examples on parallel and concurrent programming in Julia.
If your project is on GitHub, and if you want a less abstract comment from me, feel free to ping me on the actual code. I canāt promise if I can give good feedback in time but Iām curious to look at challenging examples on parallel and concurrent programming in Julia.
Hope you donāt mind, Iām taking you up on this offer: itās not clear to me how weāre supposed to work around the case of per-iteration pre-allocated buffers (the buffers[tid]
pattern described above). As an example, this is the very innermost loop of our code (called repeatedly), and itās non-allocating, itās not clear to me how to do this with @threads :dynamic
or with floops
: https://github.com/JuliaMolSim/DFTK.jl/blob/master/src/terms/Hamiltonian.jl#L124. Another simpler example (although this one is called once so presumably we could use floops more easily there): https://github.com/JuliaMolSim/DFTK.jl/blob/master/src/densities.jl#L24.