Behavior of `Threads.@threads for` loop

I have code that assumes that each iteration of a Threads@threads for loop is run on a single thread. I.e., that one can use Threads.threadid() at the beginning of the loop to determine which thread is executing that loop iteration, and that this will not change during that iteration. Is this an OK assumption going forward?

In general yes, but there are corner cases: FAQ · FLoops

1 Like

The tasks created with @threads are sticky, so, yes, they don’t migrate between Julia threads. You might want to take a look at: Julia 1.7 says it can switch the thread your task is on. How often does that happen, and how can it be disabled? - #4 by carstenbauer.

(The mentioned corner case doesn’t apply to @threads for as the linked documentation mentions itself in the blue note box.)

2 Likes

Thanks, your link provides a definitive answer as to why this is the case.

One less thing to worry about!

I am afraid that you need to worry about this.

It’s OK for now as was pointed out. But it’ll break soon in “general” cases

where “general” means you don’t know or can’t control the yield points.

1 Like

Well, but he can just use @threads :static for and then it’s safe even in 1.8 or whenever other modes get added, no?

Yeah, that’s fine. I’d argue new code “shouldn’t” be using it but it’s in the API so ultimately nothing stops people from using it.

I know and understand that the prevalent opinion is to not rely on static / sticky scheduling but to expect and use the dynamic mechanisms that we have (or will have) to enable composable multithreading. But, IMO, it’s definitely a good thing that we have a static scheduling option a part of the API (to me, your comment somewhat sounds like it’s a mistake of the past and that you would get rid of the option if you could). Not every code is library code that needs to be composable. In my specific application, I might know exactly what I want or need so that the dynamic scheduler might just introduce unnecessary overhead and opaqueness. But more importantly, in HPC, in particular when using external tools (written in a / for a pre-task-based multithreading world) and when fine-tuning the performance to a particular cluster one often times needs to know and be able to reliably control which OS thread gets which tasks (i.e. workloads). For example, if I want to use hardware performance counters through, say, LIKWID.jl), I need to make sure that a certain computation runs on a certain OS thread (which must be pinned to the correct CPU-core). Or if I want to respect the NUMA structure of a system I need to make sure that certain tasks don’t migrate to Julia threads running in a different NUMA domain etc. Of course, the majority of people probably shouldn’t or don’t need to care about these things and/or the benefits of dynamic task scheduling and composable multithreading just outweigh the disadvantages. But, IMO, we should also have options for more fine-grained control as part of the official API.

(I hope this doesn’t come across as a rant :smiley:)

1 Like

I do recognize the need for controlling how tasks are mapped to OS threads. It is a reasonable thing to do for “application” (= non-library) code where you need to squeeze out the last bit of performance, at least for now. The application authors have more knowledge on how the components are organized than the generic julia scheduler and it helps optimize the performance.

But I don’t think @threads :static for i in 1:nthreads() is the right approach. If you want to ask for “one task for each worker thread,” then there should be an API to directly express this. I’d imagine the main new usages of @threads :static would actually be ad-hoc implementations of this API. I think there should be a specific API to do it because nthreads() may not reflect the number of worker threads you can use. For example, after https://github.com/JuliaLang/julia/pull/42302, nthreads() may not reflect the number of worker threads you can use (although maybe I should clarify that I suggested this behavior). This can also happen with other conceivable changes like I/O- or finalizer-specific worker thread in the scheduler. So, I don’t think not discouraging @threads :static for i in 1:nthreads() is a good idea even in application code. I think it’s better to have an API like Threads.foreachworker(f) specifically for this.

2 Likes

(-1)^3 = -1 so okay, :static is bad. But what if the underlying thing has chunking structure, for example what if the thing we’re looping over are grouped in cache or file encoding (blob)? Dynamic scheduling would imply extra overhead because threads are now reading scattered data across multiple chunks where in :static the overlap is minimal (only happens at boundary)

Yeah, I knew that sentence was stupid but I wanted to hint that my argument is stricter than “I don’t think encouraging @threads :static for i in 1:nthreads() is a good idea.”

APIs using :static scheduling internally is not usable in parallel application. This is because :static scheduling throws when it is started on non-primary worker thread. This is a rather artificial limitation but we can’t fix it. Waiting for all worker threads to be free is just simply a bad idea if we want to use it within parallel program where other components may be using some workers. So, don’t use it in packages if you want to make them useful for other people. Otherwise, nothing stops you from micro-optimizing your code.

It’s not clear what kind of effects you are talking about [1]. But, FWIW, I am working on scheduling algorithms and implementations for making Julia scheduler “aware” of cache in some sense. I don’t think we need to throw away cache-friendliness in order to have composable parallelism. But, if you have some examples where Julia’s dynamic scheduling gets in the way of having a good performance, please do let me know.


  1. Is it about task migration at yield point effectively invalidates the CPU cache? But then it’s not clear what “overlap” is referring to. The worry about overlap could be interpreted as effect of false sharing due to concurrent writes but do you have an example where invalidation of O(nthreads()) cachelines matter? Also, it’s contradict with the phrase “reading scattered data”. ↩︎

I’m referring to high-level cache, not CPU cache. Think of the data you’re iterating over this way:

[data1, data2, data3...][data41, data42, data43, data44][...][..][.....]

let’s call a [] a basket, in :static, each thread would get many baskets, with small fraction overlap on the boundary, for example, a thread gets data 1-401, but the boundary looks like [...data400][data401, data402, data403][...]. The problem is that each basket can only be read, parse, decompressed as a whole, so the sensible thing to do is process a basket and store in a thread-local cache.

Now :dynamic comes in, suddenly you have multiple threads access the same basket much more often, and replacing many local caches just to read a few data in a basket before throwing away.

Benchmarking with I/O stuff is hard and although I can see an increase in allocation, the speed impact seems to be minimal:

julia> using UnROOT, Base.Threads

julia> const tree = LazyTree("/home/akako/Downloads/Run2012BC_DoubleMuParked_Muons.root", "Events");

julia> function burn(N)
           res = 0.0
           for i = 1:N
               res+=cos(sin(i))
           end
           res
       end

julia> function main(tree)
           res = [0 for _ = 1:nthreads()]
           @threads :static for evt in tree
               burn(10^2) # simulate computing
               res[threadid()] += length(evt.Muon_pt)
           end
           res
       end

julia> @time main(tree);
 12.181016 seconds (21.00 k allocations: 2.663 GiB, 0.01% gc time)


julia> @time main_dynamic(tree)
 13.050303 seconds (21.48 k allocations: 2.917 GiB, 0.03% gc time)
10-element Vector{Int64}:
 15265992
 15204865
 15193408
 14531832
 14541762
 14539929
 14532228
 15236139
 15057163
 15219138

I don’t know under what real-world load this problem may be amplified, but I can see it may go wrong.

I strongly suggest avoiding reasoning about worker (OS) threads in Julia programs. It is a very low-level construct and arguably it should be avoided in idiomatic Julia programs. The primitive for concurrent execution in Julia is Task and not thread. Basically, if you write threadid() or nthreads() anywhere, that’s a big warning sign unless you know what you are doing. (Note: I already have mentioned the exceptions which I consider is fine. I’m not repeating them and my comment here only discusses what should be done in principle.)

There are various ways to specify “local” resources. For example, FLoops.jl provides @init macro to declare basecase-local (approximately task-local) resource. See also Concurrency patterns for controlled parallelisms for various composable idioms for handling local resources.

what do you suggest to do then? I don’t care thread-locality per se, but I do care having a buffer that can be indexed by threadid() because there’s no other way to organize the basket cache in a thread-safe way.

When a task (thread, technicality doesn’t matter for this) wants to read an event, it checks if the event is in a basket already decompressed, we can’t afford to do lookup every cache for every event because you have 10^8 * 100 to go through easily, so what we do is assume most of the time iteration is linear (:static makes this model very realistic) and check the event number against two number (beginning and end of the basket) and decide if we need to read from disk – this is the reason cache is “thread-local”, it’s just a way to keep track of things

I really want to find a better and easier-to-implement way to do this, we spent many hours experimenting with thread-safe LRU caches and stuff, they are an order of magnitude slower. This IS low-level I guess.

I’d try making cache local to task (or any logically serial region of the code), as I suggested above.

you have as many tasks as the number of iterations right? in code like:

@threads for event in a_million_events
   ... # we have a million tasks here?
end

but you don’t want to read form disk and decompress for that many times, you probably have 100 iterations between disk reads, which means this cache cannot be task local?

No, @threads currently spawns only nthreads() tasks, and each task processes a contiguous region of the input. In the current implementation, the task tree structure of :dynamic and :static is identital. The only difference is that the underlying tasks in :dynamic can be scheduled on an arbitrary worker thread and is allowed to migrate across worker threads, like the usual tasks created via Threads.@spawn.

The details may change in the future but I believe the number of tasks will always be O(nthreads()) and each task process one contiguous region (or a few contiguous regions). I’m almost certain this will always be the case since otherwise, it’d be essentially equivalent to @sync for x in xs; @spawn ...; end.

(This is not apparent from the current docstring Multi-Threading · The Julia Language and so we probably should add some more clarifications.)

1 Like

oops, sorry i had a more “dynamic” understanding which is wrong, ok yeah, I agree fully that I don’t care (OS) threads locality, I just need the task that has contiguous region of input to have a fixed “place” for storing it’s decompressed chunk… which I’m currently using buffers[threadid()] and I don’t think there’s an alternative even though it’s not correct anymore

Yeah, I realized that the current docstring is not very clear about this. I’ll try to feedback our conversation to a PR to make it more understandable.

You are right. This pattern is invalid with new @threads since the underlying task can migrate across thread. This is especially the case since (presumably) you are doing I/O in the loop.

If you have a quick workaround, you can do

@sync @threads for i in 1:nthreads()
    @async handle_i_th_partition(i)
end

or

if Threads.threadid() == 1
    @threads :static for ...
    end
else
    for ...  # sequential fallback
    end
end

to correctly use the buffers[threadid()] pattern. However, as you might have noticed, I’m strongly against this pattern and I believe there are various ways to tackle this.

If your project is on GitHub, and if you want a less abstract comment from me, feel free to ping me on the actual code. I can’t promise if I can give good feedback in time but I’m curious to look at challenging examples on parallel and concurrent programming in Julia.

1 Like

If your project is on GitHub, and if you want a less abstract comment from me, feel free to ping me on the actual code. I can’t promise if I can give good feedback in time but I’m curious to look at challenging examples on parallel and concurrent programming in Julia.

Hope you don’t mind, I’m taking you up on this offer: it’s not clear to me how we’re supposed to work around the case of per-iteration pre-allocated buffers (the buffers[tid] pattern described above). As an example, this is the very innermost loop of our code (called repeatedly), and it’s non-allocating, it’s not clear to me how to do this with @threads :dynamic or with floops: DFTK.jl/Hamiltonian.jl at master · JuliaMolSim/DFTK.jl · GitHub. Another simpler example (although this one is called once so presumably we could use floops more easily there): DFTK.jl/densities.jl at master · JuliaMolSim/DFTK.jl · GitHub.

1 Like