It looks like the same logic applies to process-local states with multiprocessing. Am I right?
Edit:
To be more precise, something like this
results = SharedArray{Float64}(nworkers())
pmap(f, collection) # f works on on results[myid()]
It looks like the same logic applies to process-local states with multiprocessing. Am I right?
Edit:
To be more precise, something like this
results = SharedArray{Float64}(nworkers())
pmap(f, collection) # f works on on results[myid()]
Only if using threadid()
to update a shared buffer. If threadid()
was deprecated there would not be anything wrong with @threads
, people would be forced to use the other patterns to index shared arrays. The problem is that using it is very tempting.
Alongside this blogpost we also tried to improve the docs and guide against the bad patterns. Itβd be good to see whether people think the newly updated 1.9.2 docs are clear
Polyesterβs threads are sticky, like @threads :static
.
It also supports threadlocal=
for defining thread-local state, without needing Threads.@threadid
(which you should still avoid).
Unfortunately, it is type unstable and allocates memory.
I give some ideas for how it can be improved if anyone wants to give it a try (good first issue):
Note that LoopVectorization.jl handles threadlocal state without allocations.
We could use something like that approach, at least for many of the common cases.
julia> using LoopVectorization, LinearAlgebra, LinuxPerf
julia> BLAS.set_num_threads(@show(Threads.nthreads()));
Threads.nthreads() = 4
julia> x = rand(2^20);
julia> function dottturbo(x,y)
s = zero(Base.promote_eltype(x,y))
@tturbo for i = eachindex(x,y)
s += x[i]*y[i]
end
return s
end
dottturbo (generic function with 1 method)
julia> foreachf!(f::F, N, args::Vararg{<:Any,A}) where {F,A} = foreach(_ -> Base.donotdelete(f(args...)), Base.OneTo(N))
foreachf! (generic function with 1 method)
julia> @pstats "cpu-cycles,(instructions,branch-instructions,branch-misses),(task-clock,context-switches,cpu-migrations,page-faults),(L1-dcache-load-misses,L1-dcache-loads,L1-icache-load-misses),(dTLB-load-misses,dTLB-loads),(iTLB-load-misses,iTLB-loads)" begin
foreachf!(dot, 100_000, x, x)
end
βββββββββββββββββββββββββββββββββββββββββββ
βΆ cpu-cycles 1.23e+11 60.0% # 3.6 cycles per ns
β instructions 5.10e+10 60.0% # 0.4 insns per cycle
β branch-instructions 5.69e+09 60.0% # 11.2% of insns
β branch-misses 2.40e+06 60.0% # 0.0% of branch insns
β task-clock 3.40e+10 100.0% # 34.0 s
β context-switches 0.00e+00 100.0%
β cpu-migrations 0.00e+00 100.0%
β page-faults 0.00e+00 100.0%
β L1-dcache-load-misses 1.32e+10 20.0% # 45.0% of dcache loads
β L1-dcache-loads 2.93e+10 20.0%
β L1-icache-load-misses 1.46e+07 20.0%
β dTLB-load-misses 5.65e+02 20.0% # 0.0% of dTLB loads
β dTLB-loads 2.92e+10 20.0%
β iTLB-load-misses 1.08e+05 40.0% # 30.9% of iTLB loads
β iTLB-loads 3.49e+05 40.0%
aggregated from 4 threads
βββββββββββββββββββββββββββββββββββββββββββ
julia> @pstats "cpu-cycles,(instructions,branch-instructions,branch-misses),(task-clock,context-switches,cpu-migrations,page-faults),(L1-dcache-load-misses,L1-dcache-loads,L1-icache-load-misses),(dTLB-load-misses,dTLB-loads),(iTLB-load-misses,iTLB-loads)" begin
foreachf!(dottturbo, 100_000, x, x)
end
βββββββββββββββββββββββββββββββββββββββββββ
βΆ cpu-cycles 1.23e+11 60.0% # 3.6 cycles per ns
β instructions 3.64e+10 60.0% # 0.3 insns per cycle
β branch-instructions 3.38e+09 60.0% # 9.3% of insns
β branch-misses 1.07e+06 60.0% # 0.0% of branch insns
β task-clock 3.38e+10 100.0% # 33.8 s
β context-switches 0.00e+00 100.0%
β cpu-migrations 0.00e+00 100.0%
β page-faults 0.00e+00 100.0%
β L1-dcache-load-misses 1.31e+10 20.0% # 49.9% of dcache loads
β L1-dcache-loads 2.63e+10 20.0%
β L1-icache-load-misses 8.40e+05 20.0%
β dTLB-load-misses 7.50e+01 20.0% # 0.0% of dTLB loads
β dTLB-loads 2.63e+10 20.0%
β iTLB-load-misses 1.08e+02 40.0% # 0.4% of iTLB loads
β iTLB-loads 2.40e+04 40.0%
aggregated from 4 threads
βββββββββββββββββββββββββββββββββββββββββββ
julia> @btime dot($x, $x)
83.491 ΞΌs (0 allocations: 0 bytes)
349742.18307257013
julia> @btime dottturbo($x, $x)
82.378 ΞΌs (0 allocations: 0 bytes)
349742.18307257013
LV seems to ramp up thread use much more slowly though, which seems to hurt performance if you have access to more threads.
That could also use some more work.
I.e., LV should perhaps consider cache sizes when deciding how many threads to allocate.
But thatβd probably be more relevant for micro benchmarks, where you have a chance of holding the memory in cache, while in βreal workloadsβ, itβs less likely the memory will be sitting in the correct core-local caches before the function gets calledβ¦
I find it unlikely that real workloads are bottlenecked by code like the above example; theyβre hopefully able to parallelize on a more grandular level, so they can fuse the dot product with some other operations.
Or, taking the ChunkSplittersβ example:
julia> using LoopVectorization
julia> using ChunkSplitters: chunks
julia> function sum_tturbo(f::F, x) where {F}
s = zero(eltype(x))
@tturbo for i = eachindex(x)
s += f(x[i])
end
s
end
sum_tturbo (generic function with 1 method)
julia> function sum_parallel(f, x; nchunks=Threads.nthreads())
s = fill(zero(eltype(x)), nchunks)
Threads.@threads for ichunk in 1:nchunks
for i in chunks(x, ichunk, nchunks)
s[ichunk] += f(x[i])
end
end
return sum(s)
end
sum_parallel (generic function with 1 method)
julia> x = rand(10^7);
julia> Threads.nthreads()
36
julia> @btime sum(x -> log(x)^7, $x)
157.715 ms (0 allocations: 0 bytes)
-4.944530243712232e10
julia> @btime sum_parallel(x -> log(x)^7, $x; nchunks=4)
94.041 ms (206 allocations: 20.30 KiB)
-4.9445302437075905e10
julia> @btime sum_parallel(x -> log(x)^7, $x; nchunks=12)
70.799 ms (209 allocations: 20.45 KiB)
-4.944530243710959e10
julia> @btime sum_parallel(x -> log(x)^7, $x; nchunks=64)
29.700 ms (219 allocations: 21.17 KiB)
-4.944530243712026e10
julia> @btime sum_tturbo(x->log(x)^7, $x)
908.156 ΞΌs (0 allocations: 0 bytes)
-4.944530243712195e10
@tturbo
is 32x faster with 0 allocations
Iβm confused where this leaves task_local_storage
, βtaskβ as opposed to βthreadβ. As far as I can tell, that is still working, not deprecated, API.
task_local_storage
isnβt going anywhere. Itβs just not particularly useful or nice for implementing reductions.
For an example of something that task_local_storage
does well in the context of @threads
, hereβs some code @GunnarFarneback shared on Zulip:
Threads.@threads for j in axes(x, 2)
temp = get!(() -> Vector{Int}(undef, N), task_local_storage(), :temp)::Vector{Int}
# fill in temp with j-dependent data
for i in axes(x, 1)
y[i, j] = # some computation with x, i, j, temp
end
end
No, there are many many uses of @threads
that are totally fine. What is suspect is code which is using threadid()
in a multithreaded context, especially code which is indexing into some buffer using that function.
Additionally, there is advice that itβs just generally not a good idea to create a mutable buffer outside of a @threads
block, and then read from and write to that buffer inside the @threads
block. This can be done correctly, but one does need to be careful when doing so because it is an easy thing to do wrong.
Just to clarify why Polyester.jl wasnβt mentioned, thatβs because it is not really an active participant in Juliaβs regular multithreaded scheduling system, itβs really just a rather separate system for writing multithreaded code, so we decided it wasnβt really within the scope of that blogpost.
We canβt get rid of Base.Threads
but thatβs fine. That module can just stay as a set of low level primitives for multithreading, and then we could start working on a new standard library or sub-module of base that has good high-level parallel functions like map
, reduce
, filter
, etc.
To be honest I donβt know, Iβm not super experienced with Distributed.jl, but maybe someone like @jpsamaroo or @vchuravy could let us know. The scheduling details of processes is pretty different to tasks, so I wouldnβt be very surprised one way or another.
Is it reasonable to have threadid()
return a special Integer
subtype instead of an Int
, and deprecate indexing using that type (or warn againt this)? This might increase the visibility of this message, and urge packages to change.
Thatβs a nice idea I havenβt heard before. It clearly delineates the problematic indexing, whereas indexing via loop index is fine and not affected.
Conceptually, yes Iβd have to agree Whether the parallelism in question is threading or parallel processing, the race conditions that are the problem are a property of the parallel algorithm, not the means of parallelization.
Iβm against it. If one has sticky tasks using threadid()
for indexing is just fine.
Anyway the change would be breaking, but it wouldnβt be bad having something like threadid(warn=false)
to not throw a warning vs a default warning version.
Within a process myid()
is stable, whereas within a task threadid()
might not be.
And are all the ids unique? Ie. can two separate processes have the same id? Id assume not, right?
On the same machine, no, if youβre using a cluster then processes on different machines could easily have the same PID and if they use that PID to identify themselves when handing data back to some central coordinator there could be conflict/collisions.
Julia worker processes have unique IDs (independent on which machine they run on).
Why is that I cannot use @turbo
in the inner loop, like this?
julia> function sum_parallel(f, x; nchunks=Threads.nthreads())
s = fill(zero(eltype(x)), nchunks)
Threads.@threads for (irange, ichunk) in chunks(x, nchunks)
@turbo for i in irange
s[ichunk] += f(x[i])
end
end
return sum(s)
end
ERROR: LoadError: KeyError: key Symbol("###RHS###6###") not found
irange
is regular UnitRange{Int64}
.
It doesnβt like s[ichunk]
. If you hoist the load and store out of the loop, it should work.
Suppose I have 10 calculations distributed in 5 workers, and now worker 2 is running task 1. Isnβt it possible that calculation 1 yields and calculation 6 kicks in in worker 2 before task 1 finishes? That will cause a problem even if myid()
is stable.
Or is it something like @spawnat p expr
does not wrap exp
in a task and will never yield before expr
finishes?