PSA: Thread-local state is no longer recommended; Common misconceptions about threadid() and nthreads()

It looks like the same logic applies to process-local states with multiprocessing. Am I right?

Edit:

To be more precise, something like this

results = SharedArray{Float64}(nworkers())
pmap(f, collection) # f works on on results[myid()]

Only if using threadid() to update a shared buffer. If threadid() was deprecated there would not be anything wrong with @threads, people would be forced to use the other patterns to index shared arrays. The problem is that using it is very tempting.

3 Likes

Alongside this blogpost we also tried to improve the docs and guide against the bad patterns. It’d be good to see whether people think the newly updated 1.9.2 docs are clear

2 Likes

Polyester’s threads are sticky, like @threads :static.
It also supports threadlocal= for defining thread-local state, without needing Threads.@threadid (which you should still avoid).
Unfortunately, it is type unstable and allocates memory.

I give some ideas for how it can be improved if anyone wants to give it a try (good first issue):

Note that LoopVectorization.jl handles threadlocal state without allocations.

We could use something like that approach, at least for many of the common cases.

julia> using LoopVectorization, LinearAlgebra, LinuxPerf

julia> BLAS.set_num_threads(@show(Threads.nthreads()));
Threads.nthreads() = 4

julia> x = rand(2^20);

julia> function dottturbo(x,y)
           s = zero(Base.promote_eltype(x,y))
           @tturbo for i = eachindex(x,y)
               s += x[i]*y[i]
           end
           return s
       end
dottturbo (generic function with 1 method)

julia> foreachf!(f::F, N, args::Vararg{<:Any,A}) where {F,A} = foreach(_ -> Base.donotdelete(f(args...)), Base.OneTo(N))
foreachf! (generic function with 1 method)

julia> @pstats "cpu-cycles,(instructions,branch-instructions,branch-misses),(task-clock,context-switches,cpu-migrations,page-faults),(L1-dcache-load-misses,L1-dcache-loads,L1-icache-load-misses),(dTLB-load-misses,dTLB-loads),(iTLB-load-misses,iTLB-loads)" begin
       foreachf!(dot, 100_000, x, x)
       end
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
β•Ά cpu-cycles               1.23e+11   60.0%  #  3.6 cycles per ns
β”Œ instructions             5.10e+10   60.0%  #  0.4 insns per cycle
β”‚ branch-instructions      5.69e+09   60.0%  # 11.2% of insns
β”” branch-misses            2.40e+06   60.0%  #  0.0% of branch insns
β”Œ task-clock               3.40e+10  100.0%  # 34.0 s
β”‚ context-switches         0.00e+00  100.0%
β”‚ cpu-migrations           0.00e+00  100.0%
β”” page-faults              0.00e+00  100.0%
β”Œ L1-dcache-load-misses    1.32e+10   20.0%  # 45.0% of dcache loads
β”‚ L1-dcache-loads          2.93e+10   20.0%
β”” L1-icache-load-misses    1.46e+07   20.0%
β”Œ dTLB-load-misses         5.65e+02   20.0%  #  0.0% of dTLB loads
β”” dTLB-loads               2.92e+10   20.0%
β”Œ iTLB-load-misses         1.08e+05   40.0%  # 30.9% of iTLB loads
β”” iTLB-loads               3.49e+05   40.0%
                  aggregated from 4 threads
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

julia> @pstats "cpu-cycles,(instructions,branch-instructions,branch-misses),(task-clock,context-switches,cpu-migrations,page-faults),(L1-dcache-load-misses,L1-dcache-loads,L1-icache-load-misses),(dTLB-load-misses,dTLB-loads),(iTLB-load-misses,iTLB-loads)" begin
       foreachf!(dottturbo, 100_000, x, x)
       end
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
β•Ά cpu-cycles               1.23e+11   60.0%  #  3.6 cycles per ns
β”Œ instructions             3.64e+10   60.0%  #  0.3 insns per cycle
β”‚ branch-instructions      3.38e+09   60.0%  #  9.3% of insns
β”” branch-misses            1.07e+06   60.0%  #  0.0% of branch insns
β”Œ task-clock               3.38e+10  100.0%  # 33.8 s
β”‚ context-switches         0.00e+00  100.0%
β”‚ cpu-migrations           0.00e+00  100.0%
β”” page-faults              0.00e+00  100.0%
β”Œ L1-dcache-load-misses    1.31e+10   20.0%  # 49.9% of dcache loads
β”‚ L1-dcache-loads          2.63e+10   20.0%
β”” L1-icache-load-misses    8.40e+05   20.0%
β”Œ dTLB-load-misses         7.50e+01   20.0%  #  0.0% of dTLB loads
β”” dTLB-loads               2.63e+10   20.0%
β”Œ iTLB-load-misses         1.08e+02   40.0%  #  0.4% of iTLB loads
β”” iTLB-loads               2.40e+04   40.0%
                  aggregated from 4 threads
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

julia> @btime dot($x, $x)
  83.491 ΞΌs (0 allocations: 0 bytes)
349742.18307257013

julia> @btime dottturbo($x, $x)
  82.378 ΞΌs (0 allocations: 0 bytes)
349742.18307257013

LV seems to ramp up thread use much more slowly though, which seems to hurt performance if you have access to more threads.
That could also use some more work.
I.e., LV should perhaps consider cache sizes when deciding how many threads to allocate.
But that’d probably be more relevant for micro benchmarks, where you have a chance of holding the memory in cache, while in β€œreal workloads”, it’s less likely the memory will be sitting in the correct core-local caches before the function gets called…
I find it unlikely that real workloads are bottlenecked by code like the above example; they’re hopefully able to parallelize on a more grandular level, so they can fuse the dot product with some other operations.

Or, taking the ChunkSplitters’ example:

julia> using LoopVectorization

julia> using ChunkSplitters: chunks

julia> function sum_tturbo(f::F, x) where {F}
           s = zero(eltype(x))
           @tturbo for i = eachindex(x)
               s += f(x[i])
           end
           s
       end
sum_tturbo (generic function with 1 method)

julia> function sum_parallel(f, x; nchunks=Threads.nthreads())
           s = fill(zero(eltype(x)), nchunks)
           Threads.@threads for ichunk in 1:nchunks
               for i in chunks(x, ichunk, nchunks)
                   s[ichunk] += f(x[i])
               end
           end
           return sum(s)
       end
sum_parallel (generic function with 1 method)

julia> x = rand(10^7);

julia> Threads.nthreads()
36

julia> @btime sum(x -> log(x)^7, $x)
  157.715 ms (0 allocations: 0 bytes)
-4.944530243712232e10

julia> @btime sum_parallel(x -> log(x)^7, $x; nchunks=4)
  94.041 ms (206 allocations: 20.30 KiB)
-4.9445302437075905e10

julia> @btime sum_parallel(x -> log(x)^7, $x; nchunks=12)
  70.799 ms (209 allocations: 20.45 KiB)
-4.944530243710959e10

julia> @btime sum_parallel(x -> log(x)^7, $x; nchunks=64)
  29.700 ms (219 allocations: 21.17 KiB)
-4.944530243712026e10

julia> @btime sum_tturbo(x->log(x)^7, $x)
  908.156 ΞΌs (0 allocations: 0 bytes)
-4.944530243712195e10

@tturbo is 32x faster with 0 allocations :wink:

4 Likes

I’m confused where this leaves task_local_storage, β€œtask” as opposed to β€œthread”. As far as I can tell, that is still working, not deprecated, API.

task_local_storage isn’t going anywhere. It’s just not particularly useful or nice for implementing reductions.

For an example of something that task_local_storage does well in the context of @threads, here’s some code @GunnarFarneback shared on Zulip:

Threads.@threads for j in axes(x, 2)
    temp = get!(() -> Vector{Int}(undef, N), task_local_storage(), :temp)::Vector{Int}
    # fill in temp with j-dependent data
    for i in axes(x, 1)
        y[i, j] = # some computation with x, i, j, temp
    end
end

No, there are many many uses of @threads that are totally fine. What is suspect is code which is using threadid() in a multithreaded context, especially code which is indexing into some buffer using that function.

Additionally, there is advice that it’s just generally not a good idea to create a mutable buffer outside of a @threads block, and then read from and write to that buffer inside the @threads block. This can be done correctly, but one does need to be careful when doing so because it is an easy thing to do wrong.

Just to clarify why Polyester.jl wasn’t mentioned, that’s because it is not really an active participant in Julia’s regular multithreaded scheduling system, it’s really just a rather separate system for writing multithreaded code, so we decided it wasn’t really within the scope of that blogpost.

We can’t get rid of Base.Threads but that’s fine. That module can just stay as a set of low level primitives for multithreading, and then we could start working on a new standard library or sub-module of base that has good high-level parallel functions like map, reduce, filter, etc.

4 Likes

To be honest I don’t know, I’m not super experienced with Distributed.jl, but maybe someone like @jpsamaroo or @vchuravy could let us know. The scheduling details of processes is pretty different to tasks, so I wouldn’t be very surprised one way or another.

Is it reasonable to have threadid() return a special Integer subtype instead of an Int, and deprecate indexing using that type (or warn againt this)? This might increase the visibility of this message, and urge packages to change.

5 Likes

Thatβ€˜s a nice idea I havenβ€˜t heard before. It clearly delineates the problematic indexing, whereas indexing via loop index is fine and not affected.

Conceptually, yes I’d have to agree :thinking: Whether the parallelism in question is threading or parallel processing, the race conditions that are the problem are a property of the parallel algorithm, not the means of parallelization.

I’m against it. If one has sticky tasks using threadid() for indexing is just fine.

Anyway the change would be breaking, but it wouldn’t be bad having something like threadid(warn=false) to not throw a warning vs a default warning version.

Within a process myid() is stable, whereas within a task threadid() might not be.

And are all the ids unique? Ie. can two separate processes have the same id? Id assume not, right?

On the same machine, no, if you’re using a cluster then processes on different machines could easily have the same PID and if they use that PID to identify themselves when handing data back to some central coordinator there could be conflict/collisions.

Julia worker processes have unique IDs (independent on which machine they run on).

Why is that I cannot use @turbo in the inner loop, like this?

julia> function sum_parallel(f, x; nchunks=Threads.nthreads())
           s = fill(zero(eltype(x)), nchunks)
           Threads.@threads for (irange, ichunk) in chunks(x, nchunks)
               @turbo for i in irange
                   s[ichunk] += f(x[i])
               end
           end
           return sum(s)
       end
ERROR: LoadError: KeyError: key Symbol("###RHS###6###") not found

irange is regular UnitRange{Int64}.

It doesn’t like s[ichunk]. If you hoist the load and store out of the loop, it should work.

Suppose I have 10 calculations distributed in 5 workers, and now worker 2 is running task 1. Isn’t it possible that calculation 1 yields and calculation 6 kicks in in worker 2 before task 1 finishes? That will cause a problem even if myid() is stable.

Or is it something like @spawnat p expr does not wrap exp in a task and will never yield before expr finishes?