Behavior of `Threads.@threads for` loop

I have code that assumes that each iteration of a Threads@threads for loop is run on a single thread. I.e., that one can use Threads.threadid() at the beginning of the loop to determine which thread is executing that loop iteration, and that this will not change during that iteration. Is this an OK assumption going forward?

In general yes, but there are corner cases: FAQ Ā· FLoops

1 Like

The tasks created with @threads are sticky, so, yes, they donā€™t migrate between Julia threads. You might want to take a look at: Julia 1.7 says it can switch the thread your task is on. How often does that happen, and how can it be disabled? - #4 by carstenbauer.

(The mentioned corner case doesnā€™t apply to @threads for as the linked documentation mentions itself in the blue note box.)

2 Likes

Thanks, your link provides a definitive answer as to why this is the case.

One less thing to worry about!

I am afraid that you need to worry about this.

Itā€™s OK for now as was pointed out. But itā€™ll break soon in ā€œgeneralā€ cases

where ā€œgeneralā€ means you donā€™t know or canā€™t control the yield points.

1 Like

Well, but he can just use @threads :static for and then itā€™s safe even in 1.8 or whenever other modes get added, no?

Yeah, thatā€™s fine. Iā€™d argue new code ā€œshouldnā€™tā€ be using it but itā€™s in the API so ultimately nothing stops people from using it.

I know and understand that the prevalent opinion is to not rely on static / sticky scheduling but to expect and use the dynamic mechanisms that we have (or will have) to enable composable multithreading. But, IMO, itā€™s definitely a good thing that we have a static scheduling option a part of the API (to me, your comment somewhat sounds like itā€™s a mistake of the past and that you would get rid of the option if you could). Not every code is library code that needs to be composable. In my specific application, I might know exactly what I want or need so that the dynamic scheduler might just introduce unnecessary overhead and opaqueness. But more importantly, in HPC, in particular when using external tools (written in a / for a pre-task-based multithreading world) and when fine-tuning the performance to a particular cluster one often times needs to know and be able to reliably control which OS thread gets which tasks (i.e. workloads). For example, if I want to use hardware performance counters through, say, LIKWID.jl), I need to make sure that a certain computation runs on a certain OS thread (which must be pinned to the correct CPU-core). Or if I want to respect the NUMA structure of a system I need to make sure that certain tasks donā€™t migrate to Julia threads running in a different NUMA domain etc. Of course, the majority of people probably shouldnā€™t or donā€™t need to care about these things and/or the benefits of dynamic task scheduling and composable multithreading just outweigh the disadvantages. But, IMO, we should also have options for more fine-grained control as part of the official API.

(I hope this doesnā€™t come across as a rant :smiley:)

1 Like

I do recognize the need for controlling how tasks are mapped to OS threads. It is a reasonable thing to do for ā€œapplicationā€ (= non-library) code where you need to squeeze out the last bit of performance, at least for now. The application authors have more knowledge on how the components are organized than the generic julia scheduler and it helps optimize the performance.

But I donā€™t think @threads :static for i in 1:nthreads() is the right approach. If you want to ask for ā€œone task for each worker thread,ā€ then there should be an API to directly express this. Iā€™d imagine the main new usages of @threads :static would actually be ad-hoc implementations of this API. I think there should be a specific API to do it because nthreads() may not reflect the number of worker threads you can use. For example, after https://github.com/JuliaLang/julia/pull/42302, nthreads() may not reflect the number of worker threads you can use (although maybe I should clarify that I suggested this behavior). This can also happen with other conceivable changes like I/O- or finalizer-specific worker thread in the scheduler. So, I donā€™t think not discouraging @threads :static for i in 1:nthreads() is a good idea even in application code. I think itā€™s better to have an API like Threads.foreachworker(f) specifically for this.

2 Likes

(-1)^3 = -1 so okay, :static is bad. But what if the underlying thing has chunking structure, for example what if the thing weā€™re looping over are grouped in cache or file encoding (blob)? Dynamic scheduling would imply extra overhead because threads are now reading scattered data across multiple chunks where in :static the overlap is minimal (only happens at boundary)

Yeah, I knew that sentence was stupid but I wanted to hint that my argument is stricter than ā€œI donā€™t think encouraging @threads :static for i in 1:nthreads() is a good idea.ā€

APIs using :static scheduling internally is not usable in parallel application. This is because :static scheduling throws when it is started on non-primary worker thread. This is a rather artificial limitation but we canā€™t fix it. Waiting for all worker threads to be free is just simply a bad idea if we want to use it within parallel program where other components may be using some workers. So, donā€™t use it in packages if you want to make them useful for other people. Otherwise, nothing stops you from micro-optimizing your code.

Itā€™s not clear what kind of effects you are talking about [1]. But, FWIW, I am working on scheduling algorithms and implementations for making Julia scheduler ā€œawareā€ of cache in some sense. I donā€™t think we need to throw away cache-friendliness in order to have composable parallelism. But, if you have some examples where Juliaā€™s dynamic scheduling gets in the way of having a good performance, please do let me know.


  1. Is it about task migration at yield point effectively invalidates the CPU cache? But then itā€™s not clear what ā€œoverlapā€ is referring to. The worry about overlap could be interpreted as effect of false sharing due to concurrent writes but do you have an example where invalidation of O(nthreads()) cachelines matter? Also, itā€™s contradict with the phrase ā€œreading scattered dataā€. ā†©ļøŽ

Iā€™m referring to high-level cache, not CPU cache. Think of the data youā€™re iterating over this way:

[data1, data2, data3...][data41, data42, data43, data44][...][..][.....]

letā€™s call a [] a basket, in :static, each thread would get many baskets, with small fraction overlap on the boundary, for example, a thread gets data 1-401, but the boundary looks like [...data400][data401, data402, data403][...]. The problem is that each basket can only be read, parse, decompressed as a whole, so the sensible thing to do is process a basket and store in a thread-local cache.

Now :dynamic comes in, suddenly you have multiple threads access the same basket much more often, and replacing many local caches just to read a few data in a basket before throwing away.

Benchmarking with I/O stuff is hard and although I can see an increase in allocation, the speed impact seems to be minimal:

julia> using UnROOT, Base.Threads

julia> const tree = LazyTree("/home/akako/Downloads/Run2012BC_DoubleMuParked_Muons.root", "Events");

julia> function burn(N)
           res = 0.0
           for i = 1:N
               res+=cos(sin(i))
           end
           res
       end

julia> function main(tree)
           res = [0 for _ = 1:nthreads()]
           @threads :static for evt in tree
               burn(10^2) # simulate computing
               res[threadid()] += length(evt.Muon_pt)
           end
           res
       end

julia> @time main(tree);
 12.181016 seconds (21.00 k allocations: 2.663 GiB, 0.01% gc time)


julia> @time main_dynamic(tree)
 13.050303 seconds (21.48 k allocations: 2.917 GiB, 0.03% gc time)
10-element Vector{Int64}:
 15265992
 15204865
 15193408
 14531832
 14541762
 14539929
 14532228
 15236139
 15057163
 15219138

I donā€™t know under what real-world load this problem may be amplified, but I can see it may go wrong.

I strongly suggest avoiding reasoning about worker (OS) threads in Julia programs. It is a very low-level construct and arguably it should be avoided in idiomatic Julia programs. The primitive for concurrent execution in Julia is Task and not thread. Basically, if you write threadid() or nthreads() anywhere, thatā€™s a big warning sign unless you know what you are doing. (Note: I already have mentioned the exceptions which I consider is fine. Iā€™m not repeating them and my comment here only discusses what should be done in principle.)

There are various ways to specify ā€œlocalā€ resources. For example, FLoops.jl provides @init macro to declare basecase-local (approximately task-local) resource. See also Concurrency patterns for controlled parallelisms for various composable idioms for handling local resources.

what do you suggest to do then? I donā€™t care thread-locality per se, but I do care having a buffer that can be indexed by threadid() because thereā€™s no other way to organize the basket cache in a thread-safe way.

When a task (thread, technicality doesnā€™t matter for this) wants to read an event, it checks if the event is in a basket already decompressed, we canā€™t afford to do lookup every cache for every event because you have 10^8 * 100 to go through easily, so what we do is assume most of the time iteration is linear (:static makes this model very realistic) and check the event number against two number (beginning and end of the basket) and decide if we need to read from disk ā€“ this is the reason cache is ā€œthread-localā€, itā€™s just a way to keep track of things

I really want to find a better and easier-to-implement way to do this, we spent many hours experimenting with thread-safe LRU caches and stuff, they are an order of magnitude slower. This IS low-level I guess.

Iā€™d try making cache local to task (or any logically serial region of the code), as I suggested above.

you have as many tasks as the number of iterations right? in code like:

@threads for event in a_million_events
   ... # we have a million tasks here?
end

but you donā€™t want to read form disk and decompress for that many times, you probably have 100 iterations between disk reads, which means this cache cannot be task local?

No, @threads currently spawns only nthreads() tasks, and each task processes a contiguous region of the input. In the current implementation, the task tree structure of :dynamic and :static is identital. The only difference is that the underlying tasks in :dynamic can be scheduled on an arbitrary worker thread and is allowed to migrate across worker threads, like the usual tasks created via Threads.@spawn.

The details may change in the future but I believe the number of tasks will always be O(nthreads()) and each task process one contiguous region (or a few contiguous regions). Iā€™m almost certain this will always be the case since otherwise, itā€™d be essentially equivalent to @sync for x in xs; @spawn ...; end.

(This is not apparent from the current docstring Multi-Threading Ā· The Julia Language and so we probably should add some more clarifications.)

1 Like

oops, sorry i had a more ā€œdynamicā€ understanding which is wrong, ok yeah, I agree fully that I donā€™t care (OS) threads locality, I just need the task that has contiguous region of input to have a fixed ā€œplaceā€ for storing itā€™s decompressed chunkā€¦ which Iā€™m currently using buffers[threadid()] and I donā€™t think thereā€™s an alternative even though itā€™s not correct anymore

Yeah, I realized that the current docstring is not very clear about this. Iā€™ll try to feedback our conversation to a PR to make it more understandable.

You are right. This pattern is invalid with new @threads since the underlying task can migrate across thread. This is especially the case since (presumably) you are doing I/O in the loop.

If you have a quick workaround, you can do

@sync @threads for i in 1:nthreads()
    @async handle_i_th_partition(i)
end

or

if Threads.threadid() == 1
    @threads :static for ...
    end
else
    for ...  # sequential fallback
    end
end

to correctly use the buffers[threadid()] pattern. However, as you might have noticed, Iā€™m strongly against this pattern and I believe there are various ways to tackle this.

If your project is on GitHub, and if you want a less abstract comment from me, feel free to ping me on the actual code. I canā€™t promise if I can give good feedback in time but Iā€™m curious to look at challenging examples on parallel and concurrent programming in Julia.

1 Like

If your project is on GitHub, and if you want a less abstract comment from me, feel free to ping me on the actual code. I canā€™t promise if I can give good feedback in time but Iā€™m curious to look at challenging examples on parallel and concurrent programming in Julia.

Hope you donā€™t mind, Iā€™m taking you up on this offer: itā€™s not clear to me how weā€™re supposed to work around the case of per-iteration pre-allocated buffers (the buffers[tid] pattern described above). As an example, this is the very innermost loop of our code (called repeatedly), and itā€™s non-allocating, itā€™s not clear to me how to do this with @threads :dynamic or with floops: https://github.com/JuliaMolSim/DFTK.jl/blob/master/src/terms/Hamiltonian.jl#L124. Another simpler example (although this one is called once so presumably we could use floops more easily there): https://github.com/JuliaMolSim/DFTK.jl/blob/master/src/densities.jl#L24.

1 Like