Note that with these data sizes and relatively little work for each index, you are likely to experience serious memory pressure. At least cache-misses, and likely TLB-misses which are very costly. The former can be alleviated by collecting indices close to each other by sorting them. The latter can be alleviated by running with larger page sizes. How to do that depends on the OS.
I have set up the code as you have suggested. Going by my memory usage, tasks appear to be assigned to the task_queue, but are never actually carried out by the task_consumer (confirmed by not allocations being made to the Arrays). Do I need to call these in a specific way that isnât in the code you wrote?
I had a bug, it was missing an end
. Hereâs a simplified version that works:
task_queue = Channel{Task}(Inf)
ArrayOfValues = zeros(10)
task_consumer = Threads.@spawn begin
for task in task_queue
schedule(task)
wait(task)
end
end
function make_worker(task_queue)
Threads.@spawn begin
while true
index = rand(1:10)
value = rand()
task = @task ArrayOfValues[index] = value
sleep(5)
put!(task_queue, task)
end
end
end
julia> ArrayOfValues
10-element Vector{Float64}:
0.0
0.7950942448474229
0.7009708954929078
0.957567172439463
0.19631557112533105
0.453290143877633
0.8150300024119421
0.20190612368343674
0.0
0.6006649024766615
julia> ArrayOfValues
10-element Vector{Float64}:
0.6321285176596904
0.2972683412784255
0.3590054369079717
0.7070913651228008
0.16218331525499274
0.5834609447257812
0.5097080697152006
0.2886712728113805
0.9525001298254895
0.13213068752448642
I have got this to work, however it remains slower than the serial version. I believe the bottleneck is applying all the tasks on the single core. To ensure all tasks have been completed, I have to employ a long sleep
. This sleep
ends up being typically longer than the serial version takes. In addition the build up of tasks
leads to large memory consumption, which only gets worse with increased sampling.
The sleep
function is obviously inefficient as it is just a guess as to when all the tasks will have finished. Is there a neater way of waiting for this?
To speed up allocations, in general I donât think the suggestion of @sgaure of sorting the indices will help as they tend to not be densely packed. However, perhaps the fact that I am allocating to a 6D array is hurting things (as Julia is column major), would reshape
ing it to a 1D vector for the purpose of allocating, and converting the loc
s to a LinearIndicies
help?
Is there also a quicker way of allocating the @view
slices of the array?
You shouldnât need a sleep
in the real version, that was just for demonstration. Your workers donât need to wait for the tasks to complete, they can just put them in the queue. And the queue processing should be really fast if the only thing the tasks are doing is updating the array.
If youâre worried about overloading the queue, you can do something like task_queue = Channel{Task}(32)
, which will only allow 32 elements in the queue, and if there are more elements then put!
will wait until one has been processed.
Can you to share a working example thatâs closer to your real problem? Itâs hard to tell what the issue is at this point.
I have written some example code that simulates that actual serial and (current version of) multithreaded code in both performace and structure. It should be a good test bed moving forward:
function Test_Serial()
# allocate arrays, actual code has a pair of arrays rather than just one
Array1Size = (22,20,20,20,20,20)
ArrayOfValues1 = zeros(Float32,Array1Size)
ArrayOfTallys1 = zeros(UInt32,Array1Size)
Array2Size = (20,20,20,20)
ArrayOfValues2 = zeros(Float32,Array2Size)
ArrayOfTallys2 = zeros(UInt32,Array2Size)
numSamples = 8000
#perform MC sampling
Test_Serial_MC!(ArrayOfValues1,ArrayOfTallys1,ArrayOfValues2,ArrayOfTallys2,numSamples)
# some saving of arrays goes on here
end
function Test_Serial_MC!(ArrayOfValues1,ArrayOfTallys1,ArrayOfValues2,ArrayOfTallys2,numSamples)
ValueSample = zeros(Float32,3) # MC samples three points
for i in 1:numSamples
ValueSample[3] = Value()
(loc1,loc2,loc3,loc4) = (Loc(),Loc(),Loc(),Loc())
ArrayOfValues2[loc1,loc2,loc3,loc4] += ValueSample[3]
ArrayOfTallys2[loc1,loc2,loc3,loc4] += UInt32(1)
if ValueSample[3] != 0
for j in 1:100
ValueSample[2:3] .= (Value(),Value())
(loc5,loc6) = (Loc2(),Loc())
(loc5p,loc6p) = (Loc2(),Loc())
if (1<=loc5<=20)
ArrayOfValues1[loc5+2,loc6,loc1,loc2,loc3,loc4] += ValueSample[1]
elseif (loc5>20) # over goes in second position
ArrayOfValues1[2,loc6,loc1,loc2,loc3,loc4] += ValueSample[1]
else # under goes in second position
ArrayOfValues1[1,loc6,loc1,loc2,loc3,loc4] += ValueSample[1]
end
@view(ArrayOfTallys1[:,loc6,loc1,loc2,loc3,loc4]) .+= UInt32(1)
if (1<=loc5p<=20)
ArrayOfValues1[loc5p+2,loc6,loc1,loc2,loc3,loc4] += ValueSample[1]
elseif (loc5p>20) # over goes in second position
ArrayOfValues1[2,loc6,loc1,loc2,loc3,loc4] += ValueSample[1]
else # under goes in second position
ArrayOfValues1[1,loc6,loc1,loc2,loc3,loc4] += ValueSample[1]
end
if loc6 != loc6p
@view(ArrayOfTallys1[:,loc6p,loc1,loc2,loc3,loc4]) .+= UInt32(1)
end
end
else
@view(ArrayOfTallys1[:,:,loc1,loc2,loc3,loc4]) .+= UInt32(1)
end
end
end
function Test_MultiThread()
# allocate arrays, actual code has a pair of arrays rather than just one
Array1Size = (22,20,20,20,20,20)
ArrayOfValues1 = zeros(Float32,Array1Size)
ArrayOfTallys1 = zeros(UInt32,Array1Size)
Array2Size = (20,20,20,20)
ArrayOfValues2 = zeros(Float32,Array2Size)
ArrayOfTallys2 = zeros(UInt32,Array2Size)
numSamples = 1000 # 8 workers each sampling numSamples
# set up channel
task_queue = Channel{Task}(Inf)
# set up consumer
task_consumer = Threads.@spawn begin
for task in task_queue
schedule(task)
wait(task)
end
end
#perform MC sampling
workers = [Test_MultiThread_MC!(ArrayOfValues1,ArrayOfTallys1,ArrayOfValues2,ArrayOfTallys2,numSamples,task_queue) for i in 1:8]
# check if all tasks complete
sleep(10)
println(sum(ArrayOfTallys2)) # should equal 8*numSamples
close(task_queue) # close queue to free up memory?
# some saving of arrays goes on here
end
function Test_MultiThread_MC!(ArrayOfValues1,ArrayOfTallys1,ArrayOfValues2,ArrayOfTallys2,numSamples,task_queue)
Threads.@spawn begin
ValueSample = zeros(Float32,3) # MC samples three points
for i in 1:numSamples
ValueSample[3] = Value()
(loc1,loc2,loc3,loc4) = (Loc(),Loc(),Loc(),Loc())
task = @task ArrayOfValues2[loc1,loc2,loc3,loc4] += ValueSample[3]
put!(task_queue,task)
task = @task ArrayOfTallys2[loc1,loc2,loc3,loc4] += UInt32(1)
put!(task_queue,task)
if ValueSample[3] != 0
for j in 1:100
ValueSample[2:3] .= (Value(),Value())
(loc5,loc6) = (Loc2(),Loc())
(loc5p,loc6p) = (Loc2(),Loc())
if (1<=loc5<=20)
task = @task ArrayOfValues1[loc5+2,loc6,loc1,loc2,loc3,loc4] += ValueSample[1]
elseif (loc5>20) # over goes in second position
task = @task ArrayOfValues1[2,loc6,loc1,loc2,loc3,loc4] += ValueSample[1]
else # under goes in second position
task = @task ArrayOfValues1[1,loc6,loc1,loc2,loc3,loc4] += ValueSample[1]
end
put!(task_queue,task)
task = @task @view(ArrayOfTallys1[:,loc6,loc1,loc2,loc3,loc4]) .+= UInt32(1)
put!(task_queue,task)
if (1<=loc5p<=20)
task = @task ArrayOfValues1[loc5p+2,loc6,loc1,loc2,loc3,loc4] += ValueSample[1]
elseif (loc5p>20) # over goes in second position
task = @task ArrayOfValues1[2,loc6,loc1,loc2,loc3,loc4] += ValueSample[1]
else # under goes in second position
task = @task ArrayOfValues1[1,loc6,loc1,loc2,loc3,loc4] += ValueSample[1]
end
put!(task_queue,task)
if loc6 != loc6p
task = @task @view(ArrayOfTallys1[:,loc6p,loc1,loc2,loc3,loc4]) .+= UInt32(1)
put!(task_queue,task)
end
end
else
task = @task @view(ArrayOfTallys1[:,:,loc1,loc2,loc3,loc4]) .+= UInt32(1)
put!(task_queue,task)
end
end
end # threads
end
function Loc()
return rand(1:20)
end
function Loc1()
return rand(0:20)
end
function Loc2()
return rand(0:21)
end
function Value()
# this takes about 50ns but sleep(0) is too long so below sorting is a hacky fix
list = rand(2)
sort(list)
return rand(0:50)
end
when benchmarked for a total of 8000 samples:
@btime Test_Serial()
248.969 ms (3147209 allocations: 778.44 MiB)
@time Test_MultiThread()
10.332610 seconds (24.95 M allocations: 2.859 GiB, 9.64% gc time)
There is a sleep(10)
in the multithreaded code to ensure that all tasks have been completed. My guess is that there must be something in how the tasks are being performed that is slow compaired to the serial array editing.
Drop the sleep(10)
. Replace it with
wait.(workers)
close(task_queue) # close queue stop consumer
wait(task_consumer)
println(sum(ArrayOfTallys2)) # should equal 8*numSamples
Since the 4 last indices are the same in the loop, you can clean up the code by creating a view before the inner loop:
valview2 = @view ArrayOfValues1[:,:,loc1,loc2,loc3,loc4]
When the loc6
has been found inside the loop you might also create another view
valview1 = @view valview2[:, loc6]
. You can do the same with ArrayOfTallys1
. The code will be much more readable.
I think the scheduling of a task for every assignment is too much overhead. If you use the Channel
approach itâs probably better to send over an array (or the above mentioned view), an index, and the value as a tuple. Like
put!(channel, (valview1, loc5+1, ValueSample[1])
, and let the consumer do something like
for (arr, ind, val) in channel
arr[ind] += val
end
And one must take care to make it type stable. A better alternative would be to have a channel and consumer for each of the arrays, so just the index-value pair is sent. I.e. a Channel{Tuple{CartesianIndex{6}, Float32}}
.
However, I suspect that even the atomic operations on the channel behind the scenes will add too much overhead. When timing with @time
in Julia 1.11
, it reports around 1 million lock conflicts, so you do not get rid of locks in this way.
To put it bluntly, I suspect that there is too little work being done between each necessary synchronization (be it locking, Atomic
something) to benefit from parallelization.
Now, since the inner loop only accesses a 22x20 matrix (indexed by loc5
, loc6
), it might be beneficial to create a local such matrix in each @spawn
âed task, zero it before the inner loop, and merge it into the large array after the inner loop (protected by a lock). This would give you at least a few hundred instructions between each lock.
I.e. something like this:
# There's a lk = ReentrantLock() common to all threads
@spawn begin
localvalarray = zeros(Float32, size(ArrayOfValues)[1:2])
for i in 1:numSamples
...
fill!(localvalarray, 0)
for j in 1:100
<update localvalarray instead of ArrayOfValues1>
end
@lock lk begin
@view(ArraysOfValues[:,:, loc1, etc]) .+= localvalarray
end
end
end
You can even let lk
be an array of locks, indexed by e.g. loc4
to lock just a section of the array.
Thanks! I didnât realize how fast the workers were make Channel overhead more significant. If a thread can generate ~30,000 values per second then the overhead of most forms of parallelism is going to be too high.
If each update is a sum, could you have the workers each generate their own array, then add the arrays together? E.g.
function Test_Serial()
# allocate arrays, actual code has a pair of arrays rather than just one
Array1Size = (22,20,20,20,20,20)
ArrayOfValues1 = zeros(Float32,Array1Size)
ArrayOfTallys1 = zeros(UInt32,Array1Size)
Array2Size = (20,20,20,20)
ArrayOfValues2 = zeros(Float32,Array2Size)
ArrayOfTallys2 = zeros(UInt32,Array2Size)
numSamples = 8000
#perform MC sampling
Test_Serial_MC!(ArrayOfValues1,ArrayOfTallys1,ArrayOfValues2,ArrayOfTallys2,numSamples)
# some saving of arrays goes on here
return ArrayOfValues1
end
function Test_Serial_MC!(ArrayOfValues1,ArrayOfTallys1,ArrayOfValues2,ArrayOfTallys2,numSamples)
ValueSample = zeros(Float32,3) # MC samples three points
for i in 1:numSamples
ValueSample[3] = Value()
(loc1,loc2,loc3,loc4) = (Loc(),Loc(),Loc(),Loc())
ArrayOfValues2[loc1,loc2,loc3,loc4] += ValueSample[3]
ArrayOfTallys2[loc1,loc2,loc3,loc4] += UInt32(1)
if ValueSample[3] != 0
for j in 1:100
ValueSample[2:3] .= (Value(),Value())
(loc5,loc6) = (Loc2(),Loc())
(loc5p,loc6p) = (Loc2(),Loc())
if (1<=loc5<=20)
ArrayOfValues1[loc5+2,loc6,loc1,loc2,loc3,loc4] += ValueSample[1]
elseif (loc5>20) # over goes in second position
ArrayOfValues1[2,loc6,loc1,loc2,loc3,loc4] += ValueSample[1]
else # under goes in second position
ArrayOfValues1[1,loc6,loc1,loc2,loc3,loc4] += ValueSample[1]
end
@view(ArrayOfTallys1[:,loc6,loc1,loc2,loc3,loc4]) .+= UInt32(1)
if (1<=loc5p<=20)
ArrayOfValues1[loc5p+2,loc6,loc1,loc2,loc3,loc4] += ValueSample[1]
elseif (loc5p>20) # over goes in second position
ArrayOfValues1[2,loc6,loc1,loc2,loc3,loc4] += ValueSample[1]
else # under goes in second position
ArrayOfValues1[1,loc6,loc1,loc2,loc3,loc4] += ValueSample[1]
end
if loc6 != loc6p
@view(ArrayOfTallys1[:,loc6p,loc1,loc2,loc3,loc4]) .+= UInt32(1)
end
end
else
@view(ArrayOfTallys1[:,:,loc1,loc2,loc3,loc4]) .+= UInt32(1)
end
end
return ArrayOfValues1
end
arrays = fetch.([Threads.@spawn Test_Serial() for i in 1:8]);
combined_array = sum(arrays);
I am hesitant to take this approach as if each thread spawns its own set of arrays then I will quickly run out of system memory, particularly as I increase the number of threads.
I have implemented your suggestions, in particular the local 2D arrays and changing the channel definitions and adding separate channels for each array. The new code is:
function Test_MultiThread_Updated()
# allocate arrays, actual code has a pair of arrays rather than just one
Array1Size = (22,20,20,20,20,20)
ArrayOfValues1 = zeros(Float32,Array1Size)
ArrayOfTallys1 = zeros(UInt32,Array1Size)
Array2Size = (20,20,20,20)
ArrayOfValues2 = zeros(Float32,Array2Size)
ArrayOfTallys2 = zeros(UInt32,Array2Size)
numSamples = 100000 # 8 workers each sampling numSamples
# set up channels
task_queue_Value2 = Channel{Tuple{Array{Float32, 4},CartesianIndex{4},Float32}}(Inf)
task_queue_Tally2 = Channel{Tuple{Array{UInt32, 4},CartesianIndex{4},UInt32}}(Inf)
# set up consumers
task_consumer_Value2 = Threads.@spawn begin
for (arr, ind, val) in task_queue_Value2
arr[ind] += val
end
end
task_consumer_Tally2 = Threads.@spawn begin
for (arr, ind, val) in task_queue_Tally2
arr[ind] += val
end
end
#perform MC sampling
workers = [Test_MultiThread_MC_Updated!(ArrayOfValues1,ArrayOfTallys1,ArrayOfValues2,ArrayOfTallys2,numSamples,task_queue_Value2,task_queue_Tally2) for i in 1:8]
# check if all tasks complete
wait.(workers)
close(task_queue_Value2)
close(task_queue_Tally2)
wait(task_consumer_Value2)
wait(task_consumer_Tally2)
println(sum(ArrayOfTallys2)) # should equal 8*numSamples
println(sum(ArrayOfTallys1))
println(sum(ArrayOfValues2))
println(sum(ArrayOfValues1))
# some saving of arrays goes on here
end
function Test_MultiThread_MC_Updated!(ArrayOfValues1,ArrayOfTallys1,ArrayOfValues2,ArrayOfTallys2,numSamples,task_queue_Value2,task_queue_Tally2)
lk = ReentrantLock()
Threads.@spawn begin
ValueSample = zeros(Float32,3) # MC samples three points
localValue1 = zeros(Float32,size(ArrayOfValues1)[1:2])
localTally1 = zeros(Float32,size(ArrayOfTallys1)[1:2])
for i in 1:numSamples
ValueSample[3] = Value()
(loc1,loc2,loc3,loc4) = (Loc(),Loc(),Loc(),Loc())
loc1234 = CartesianIndex(loc1,loc2,loc3,loc4)
put!(task_queue_Value2,(ArrayOfValues2,loc1234,ValueSample[3]))
put!(task_queue_Tally2,(ArrayOfTallys2,loc1234,UInt32(1)))
ArrayOfValues1View = @view(ArrayOfValues1[:,:,loc1234])
ArrayOfTallys1View = @view(ArrayOfTallys1[:,:,loc1234])
fill!(localValue1,0f0)
fill!(localTally1,UInt32(0))
if ValueSample[3] != 0
for j in 1:100
ValueSample[2:3] .= (Value(),Value())
(loc5,loc6) = (Loc2(),Loc())
(loc5p,loc6p) = (Loc2(),Loc())
if (1<=loc5<=20)
localValue1[loc5+2,loc6] += ValueSample[1]
elseif (loc5>20) # over goes in second position
localValue1[2,loc6] += ValueSample[1]
else # under goes in second position
localValue1[1,loc6] += ValueSample[1]
end
@view(localTally1[:,loc6]) .+= UInt32(1)
if (1<=loc5p<=20)
localValue1[loc5p+2,loc6p] += ValueSample[2]
elseif (loc5p>20) # over goes in second position
localValue1[2,loc6p] += ValueSample[2]
else # under goes in second position
localValue1[1,loc6p] += ValueSample[2]
end
if loc6 != loc6p
@view(localTally1[:,loc6p]) .+= UInt32(1)
end
end
@lock lk begin
ArrayOfValues1View .+= localValue1
end
else
@view(localTally1[:,:]) .+= UInt32(1)
end
@lock lk begin
ArrayOfTallys1View .+= localTally1
end
end
end # threads
end
with rough timing benchmarks for 80000 samples
Test_Serial()
10.617903 seconds (315.36 M allocations: 24.022 GiB, 11.80% gc time)
Test_MultiThread_Updated()
9.111024 seconds (317.03 M allocations: 24.123 GiB, 45.14% gc time)
This suggestions I feel would be ideal as it is unlikely that different threads will need to access same sections of the array at the same time and could be used instead of the channel approach on all the arrays. In practice though, I am not sure how to implement it (Iâve never seen anyone use more than one lock), how would you go about doing it?
You create the locks before you spawn anything, e.g. with
locks = [ReentrantLock() for _ in 1:20]
Then, inside the spawned task you find loc1 ... loc6
and do the update of the local 22x20 array, and just use the appropriate lock for updating the large array:
@lock locks[loc4] begin
@view(largearray[:, :, loc1, ..., loc4]) .+= localarray
end
The macro @lock
does the same as your original try ... finally
construction.
If there are still collisions you can even make a matrix of locks:
locks = [ReentrantLock() for _ in 1:20, _ in 1:20]
and use it with two indices
@lock locks[loc3, loc4] begin
...
end
Can you post a fully self-contained simple sample code?
Fully self-contained means: If I copy-paste the code-block into the REPL, then it runs. You must check that before posting!
All samples I have seen so far are incomplete. For example,
nested task error: UndefVarError: `Value` not defined in `Main`
This means that it is not possible to reasonably contribute to the discussion.
You need to do provide a reasonable definition like e.g. Value() = rand() < 0.9 ? rand() : 0.0
if your Value()
is supposed to output Float64 with a reasonable chance of exact zeros.
Ideally your sample code would be simplified: No locks, no tasks, no multi-threading. Just a concise problem description.
Ideally you profile against your real application that timings/bottlenecks are similar enough with the mockup Loc()
/ Value()
implementations that you post here.
For this simple update it is also possible to use a Threads.Spinlock
instead of a ReentrantLock
. The difference is that if the lock is already held, the ReentrantLock
will fall to sleep and yield to other tasks, and is woken up as soon as the lock is released. A Spinlock
will spin-wait in a tight loop without sleeping or yielding to anything, so there will be less overhead when the lock is released.
function Loc() return rand(1:20) end function Loc1() return rand(0:20) end function Loc2() return rand(0:21) end function Value() # this takes about 50ns but sleep(0) is too long so below sorting is a hacky fix list = rand(2) sort(list) return rand(0:50) end
The full code was posted in this post. I have written Loc()
and Value()
to simulate the actual code such that they have similar performance.
Iâve managed to get the lock approach to now be faster than the Serial version
function Test_MultiThread_Updated_Locks()
# allocate arrays, actual code has a pair of arrays rather than just one
Array1Size = (22,20,20,20,20,20)
ArrayOfValues1 = zeros(Float32,Array1Size)
ArrayOfTallys1 = zeros(UInt32,Array1Size)
Array2Size = (20,20,20,20)
ArrayOfValues2 = zeros(Float32,Array2Size)
ArrayOfTallys2 = zeros(UInt32,Array2Size)
numSamples = 100000 # 8 workers each sampling numSamples
# set up locks
locks = Threads.SpinLock()
# locks = [Threads.SpinLock() for _ in 1:20]
#perform MC sampling
workers = [Test_MultiThread_MC_Updated_Locks!(ArrayOfValues1,ArrayOfTallys1,ArrayOfValues2,ArrayOfTallys2,numSamples,locks) for i in 1:8]
# check if all tasks complete
wait.(workers)
println(sum(ArrayOfTallys2)) # should equal 8*numSamples
#println(sum(ArrayOfTallys1))
#println(sum(ArrayOfValues2))
#println(sum(ArrayOfValues1))
# some saving of arrays goes on here
return nothing
end
function Test_MultiThread_MC_Updated_Locks!(ArrayOfValues1::Array{Float32,6},ArrayOfTallys1::Array{UInt32,6},ArrayOfValues2::Array{Float32,4},ArrayOfTallys2::Array{UInt32,4},numSamples::Int64,locks)
Threads.@spawn begin
ValueSample = zeros(Float32,3) # MC samples three points
localValue1 = zeros(Float32,size(ArrayOfValues1)[1:2])
localTally1 = zeros(Float32,size(ArrayOfTallys1)[1:2])
for i in 1:numSamples
ValueSample[3] = Value()
(loc1,loc2,loc3,loc4) = (Loc(),Loc(),Loc(),Loc())
loc1234 = CartesianIndex(loc1,loc2,loc3,loc4)
fill!(localValue1,0f0)
fill!(localTally1,UInt32(0))
if ValueSample[3] != 0
for j in 1:100
ValueSample[2:3] .= (Value(),Value())
(loc5,loc6) = (Loc2(),Loc())
(loc5p,loc6p) = (Loc2(),Loc())
if (1<=loc5<=20)
localValue1[loc5+2,loc6] += ValueSample[1]
elseif (loc5>20) # over goes in second position
localValue1[2,loc6] += ValueSample[1]
else # under goes in second position
localValue1[1,loc6] += ValueSample[1]
end
@view(localTally1[:,loc6]) .+= UInt32(1)
if (1<=loc5p<=20)
localValue1[loc5p+2,loc6p] += ValueSample[2]
elseif (loc5p>20) # over goes in second position
localValue1[2,loc6p] += ValueSample[2]
else # under goes in second position
localValue1[1,loc6p] += ValueSample[2]
end
if loc6 != loc6p
@view(localTally1[:,loc6p]) .+= UInt32(1)
end
end
else
@view(localTally1[:,:]) .+= UInt32(1)
end
@lock locks #=locks[loc4]=# begin
ArrayOfValues2[loc1234] += ValueSample[3]
ArrayOfTallys2[loc1234] += UInt32(1)
@view(ArrayOfValues1[:,:,loc1234]) .+= localValue1
@view(ArrayOfTallys1[:,:,loc1234]) .+= localTally1
end
end
end # threads
end
With a single Lock the code timing is now (for 80000 samples):
@time Test_MultiThread_Updated_Locks()
5.910963 seconds (315.28 M allocations: 24.016 GiB, 44.01% gc time)
compared to
@time Test_Serial()
12.071050 seconds (316.89 M allocations: 27.045 GiB, 12.22% gc time
so about a 2x speed up.
Interestingly, using an array of locks (commented code) locks = [Threads.SpinLock() for _ in 1:20]
with ...@lock locks[loc4] begin...
doesnât generate any speedup:
@time Test_MultiThread_Updated_Locks()
7.186602 seconds (315.34 M allocations: 24.020 GiB, 53.04% gc time)
This suggest to me that the code is still treating each lock in the array as the same lock. Can the locks themselves be given a unique ID, perhaps a function argument like SpinLock(ID)?
This suggest to me that the code is still treating each lock in the array as the same lock. Can the locks themselves be given a unique ID, perhaps a function argument like SpinLock(ID)?
No, they are separate locks, definitely. A SpinLock
is just a single Int
which is accessed atomically when trying to lock it, there are no references to anything else. Different SpinLock
s are wholly unrelated. However, there will still be some memory synchronization going on which may slow things down.
Also, you may hit some memory bandwidth or latency limit. That is hard to speculate about without looking closer with cpu profiling packages like LIKWID
or LinuxPerf
, which requires hardware knowledge to benefit from.
But where do all the allocations come from?
Can you fix the code? Such that we donât use your bugs to improve performance and then you find out that this doesnât apply in real-life?
- ValueSample[1] is never modified. So it is constant zero, so most of your array updates can be optimized out.
- The construction of Loc2() is bad API. I would wrap it into e.g.
function Loc2X()
r = Loc2()
1 <= r <= 20 ? r+2 : r>20 ? 2 : 1
end
and then simplify the updates into loc5x = Loc2X()
and then ArrayOfValues[loc5x, ...] += ...
. Can this be done or is there hidden code that prevents this?
3. ArrayOfTallys1
is only ever updated with ::Colon
in the first position. So you could instead write
ArrayOfTallys1 = zeros(UInt32,Base.tail(Array1Size))
and then skip the colon / @view / broadcast in most updates. I.e. @view(ArrayOfTallys1[:,loc6,loc1,loc2,loc3,loc4]) .+= UInt32(1)
becomes ArrayOfTallys1[loc6,loc1,loc2,loc3,loc4] += UInt32(1)
and
@view(ArrayOfTallys1[:,:,loc1,loc2,loc3,loc4]) .+= UInt32(1)
becomes @view(ArrayOfTallys1[:,loc1,loc2,loc3,loc4]) .+= UInt32(1)
.
4. Can you please refactor such that ValueSample
is not a vector anymore? i.e. use 3 scalars, like v3 = Value()
. This is by the way how I spotted problem (1) during refactoring of your code.
edit PS:
- Your
Value()
mockup allocates. I get that you need to do something that takes an adequate amount of time â but can you do something non-allocating instead? This simplifies benchmarking/profiling for all of us. For example you could import StaticArrays and create and hash a randomSVector
(you adjust its size until it adequately matches the real runtime). In order to ensure that the compiler doesnât optimize this out, you add a check (hash(rand(SVector{4, Int32})) == 0 && throw("bad luck")
).
Thank you for all the excellent comments/suggestions. I have done as you asked and written the test code, which is at the bottom of this post, and well replicates the actual code (hopefully bug free). It includes a serial function Test_Serial()
, and two multi-threaded functions based on the comments of all the wonderful people in the thread, Test_MultiThread_Channel()
using a channel
approach and Test_MultiThread_Locks()
using an array of lock
s approach. They benchmark like:
@time Test_Serial()
9.404520 seconds (37 allocations: 281.986 MiB)
@time Test_MultiThread_Channel()
2.381187 seconds (5.60 M allocations: 867.951 MiB, 6.25% gc time)
@time Test_MultiThread_Locks()
1.502663 seconds (115 allocations: 282.006 MiB)
The performance seems to be in line with the 8x speed up expected by using 8 threads over 1. The channel
approach still has quite a large number of allocations in comparison to the other two, not sure what is going on there?
I am not sure if there is anywhere left to squeeze out any last bits of performance?
Full code:
using StaticArrays
function Test_Serial()
# allocate arrays, actual code has a pair of arrays rather than just one
Array1Size = (22,20,20,20,20,20)
ArrayOfValues1 = zeros(UInt32,Array1Size)
ArrayOfTallys1 = zeros(UInt32,Base.tail(Array1Size))
Array2Size = (20,20,20,20)
ArrayOfValues2 = zeros(UInt32,Array2Size)
ArrayOfTallys2 = zeros(UInt32,Array2Size)
numSamples = 800000
#perform MC sampling
Test_Serial_MC!(ArrayOfValues1,ArrayOfTallys1,ArrayOfValues2,ArrayOfTallys2,numSamples)
println(sum(ArrayOfTallys2)) # should equal 8*numSamples
println(sum(ArrayOfTallys1))
println(sum(ArrayOfValues2))
println(sum(ArrayOfValues1))
# some saving of arrays goes on here
return nothing
end
function Test_Serial_MC!(ArrayOfValues1::Array{UInt32,6},ArrayOfTallys1::Array{UInt32,5},ArrayOfValues2::Array{UInt32,4},ArrayOfTallys2::Array{UInt32,4},numSamples)
localValue1 = zeros(UInt32,size(ArrayOfValues1)[1:2])
localTally1 = zeros(UInt32,size(ArrayOfTallys1)[1])
for i in 1:numSamples
Value3 = Value()
(loc1,loc2,loc3,loc4) = (Loc(),Loc(),Loc(),Loc())
loc1234 = CartesianIndex(loc1,loc2,loc3,loc4)
fill!(localTally1,UInt32(0))
if Value3 != UInt32(0)
fill!(localValue1,UInt32(0))
for j in 1:100
(Value1,Value2) = (Value(),Value())
(loc5,loc6) = (Loc2(),Loc())
loc5x = Loc2X(loc5)
(loc5p,loc6p) = (Loc2(),Loc())
loc5xp = Loc2X(loc5p)
localValue1[loc5x,loc6] += Value1
localTally1[loc6] += UInt32(1)
localValue1[loc5xp,loc6] += Value2
if loc6 != loc6p
localTally1[loc6p] += UInt32(1)
end
end
else
localTally1 .+= UInt32(1)
end
ArrayOfValues2[loc1234] += Value3
ArrayOfTallys2[loc1234] += UInt32(1)
@view(ArrayOfValues1[:,:,loc1234]) .+= localValue1
@view(ArrayOfTallys1[:,loc1234]) .+= localTally1
end
end
function Test_MultiThread_Channel()
# allocate arrays, actual code has a pair of arrays rather than just one
Array1Size = (22,20,20,20,20,20)
ArrayOfValues1 = zeros(UInt32,Array1Size)
ArrayOfTallys1 = zeros(UInt32,Base.tail(Array1Size))
Array2Size = (20,20,20,20)
ArrayOfValues2 = zeros(UInt32,Array2Size)
ArrayOfTallys2 = zeros(UInt32,Array2Size)
numSamples = 100000 # 8 workers each sampling numSamples
# set up channel
task_queue = Channel{Task}(Inf)
# set up consumer
task_consumer = Threads.@spawn begin
for task in task_queue
schedule(task)
wait(task)
end
end
#perform MC sampling
workers = [Test_MultiThread_MC_Channel!(ArrayOfValues1,ArrayOfTallys1,ArrayOfValues2,ArrayOfTallys2,numSamples,task_queue) for i in 1:8]
# check if all tasks complete
wait.(workers)
close(task_queue)
wait(task_consumer)
println(sum(ArrayOfTallys2)) # should equal 8*numSamples
println(sum(ArrayOfTallys1))
println(sum(ArrayOfValues2))
println(sum(ArrayOfValues1))
# some saving of arrays goes on here
end
function Test_MultiThread_MC_Channel!(ArrayOfValues1::Array{UInt32,6},ArrayOfTallys1::Array{UInt32,5},ArrayOfValues2::Array{UInt32,4},ArrayOfTallys2::Array{UInt32,4},numSamples,task_queue::Channel{Task})
Threads.@spawn begin
localValue1 = zeros(UInt32,size(ArrayOfValues1)[1:2])
localTally1 = zeros(UInt32,size(ArrayOfTallys1)[1])
for i in 1:numSamples
Value3 = Value()
(loc1,loc2,loc3,loc4) = (Loc(),Loc(),Loc(),Loc())
loc1234 = CartesianIndex(loc1,loc2,loc3,loc4)
fill!(localTally1,UInt32(0))
if Value3 != UInt32(0)
fill!(localValue1,UInt32(0))
for j in 1:100
(Value1,Value2) = (Value(),Value())
(loc5,loc6) = (Loc2(),Loc())
loc5x = Loc2X(loc5)
(loc5p,loc6p) = (Loc2(),Loc())
loc5xp = Loc2X(loc5p)
localValue1[loc5x,loc6] += Value1
localTally1[loc6] += UInt32(1)
localValue1[loc5xp,loc6] += Value2
if loc6 != loc6p
localTally1[loc6p] += UInt32(1)
end
end
else
localTally1 .+= UInt32(1)
end
task = @task begin
ArrayOfValues2[loc1234] += Value3
ArrayOfTallys2[loc1234] += UInt32(1)
@view(ArrayOfValues1[:,:,loc1234]) .+= localValue1
@view(ArrayOfTallys1[:,loc1234]) .+= localTally1
end
put!(task_queue,task)
end
end # threads
end
function Test_MultiThread_Locks()
# allocate arrays, actual code has a pair of arrays rather than just one
Array1Size = (22,20,20,20,20,20)
ArrayOfValues1 = zeros(UInt32,Array1Size)
ArrayOfTallys1 = zeros(UInt32,Base.tail(Array1Size))
Array2Size = (20,20,20,20)
ArrayOfValues2 = zeros(UInt32,Array2Size)
ArrayOfTallys2 = zeros(UInt32,Array2Size)
numSamples = 100000 # 8 workers each sampling numSamples
# set up locks
#locks = Threads.SpinLock()
locks = [Threads.SpinLock() for _ in 1:20]
#perform MC sampling
workers = [Test_MultiThread_MC_Locks!(ArrayOfValues1,ArrayOfTallys1,ArrayOfValues2,ArrayOfTallys2,numSamples,locks) for i in 1:8]
# check if all tasks complete
wait.(workers)
println(sum(ArrayOfTallys2)) # should equal 8*numSamples
println(sum(ArrayOfTallys1))
println(sum(ArrayOfValues2))
println(sum(ArrayOfValues1))
# some saving of arrays goes on here
return nothing
end
function Test_MultiThread_MC_Locks!(ArrayOfValues1::Array{UInt32,6},ArrayOfTallys1::Array{UInt32,5},ArrayOfValues2::Array{UInt32,4},ArrayOfTallys2::Array{UInt32,4},numSamples,locks)
Threads.@spawn begin
localValue1 = zeros(UInt32,size(ArrayOfValues1)[1:2])
localTally1 = zeros(UInt32,size(ArrayOfTallys1)[1])
for i in 1:numSamples
Value3 = Value()
(loc1,loc2,loc3,loc4) = (Loc(),Loc(),Loc(),Loc())
loc1234 = CartesianIndex(loc1,loc2,loc3,loc4)
fill!(localTally1,UInt32(0))
if Value3 != UInt32(0)
fill!(localValue1,UInt32(0))
for j in 1:100
(Value1,Value2) = (Value(),Value())
(loc5,loc6) = (Loc2(),Loc())
loc5x = Loc2X(loc5)
(loc5p,loc6p) = (Loc2(),Loc())
loc5xp = Loc2X(loc5p)
localValue1[loc5x,loc6] += Value1
localTally1[loc6] += UInt32(1)
localValue1[loc5xp,loc6] += Value2
if loc6 != loc6p
localTally1[loc6p] += UInt32(1)
end
end
else
localTally1 .+= UInt32(1)
end
@lock locks[loc4] begin
ArrayOfValues2[loc1234] += Value3
ArrayOfTallys2[loc1234] += UInt32(1)
@view(ArrayOfValues1[:,:,loc1234]) .+= localValue1
@view(ArrayOfTallys1[:,loc1234]) .+= localTally1
end
end
end # threads
end
function Loc()
return rand(1:20)
end
function Loc2()
return rand(0:21)
end
function Loc2X(loc)
1 <= loc <= 20 ? loc+2 : loc>20 ? 2 : 1
end
function Value()
# this takes about 50ns but sleep(0) is too long so below sorting is a hacky fix
(hash(rand(SVector{30, Int32})) == 0 && throw("bad luck"))
a = rand(0:50)
return UInt32(a)
end
Youâre very bottlenecked on Value()
invocations. Just extracting that:
julia> function testSimple(numSamples)
for i=1:numSamples
for k=1:4 Loc() end
Value()
for j=1:100
Value(); Value()
Loc2();Loc(); Loc2(); Loc();
end
end
nothing
end
testSimple (generic function with 1 method)
julia> @btime testSimple(800000)
11.349 s (0 allocations: 0 bytes)
julia> @btime Test_Serial()
800000
153239930
20000237
4000419868
800000
153243671
19988291
3999821035
800000
153240450
20016438
3999947475
11.906 s (44 allocations: 281.99 MiB)
The
channel
approach still has quite a large number of allocations in comparison to the other two, not sure what is going on there?
The @task
macro creates a closure under the hood, i.e. a callable struct. This is allocated.
A worse issue is that the channel approach looks wrong:
task = @task begin
ArrayOfValues2[loc1234] += Value3
ArrayOfTallys2[loc1234] += UInt32(1)
@view(ArrayOfValues1[:,:,loc1234]) .+= localValue1
@view(ArrayOfTallys1[:,loc1234]) .+= localTally1
end
put!(task_queue,task)
This puts a task to update values/tallys into the queue. The closure should take Value3, loc1234, ArrayOfValues2, ArrayOfTallys2, localValue1, localTally1.
The problem is that you then continue to update/reuse localValue1 and localTally1 in the worker thread. So subsequent writes to these values race with the reads in the consumer thread.
I am not sure if there is anywhere left to squeeze out any last bits of performance?
I think there is plenty of things to squeeze out, but youâre close to perfect scaling on 8 threads on your machine.
Try:
- Use more threads! One thread per logical core, with SMT / hyperthreading enabled, instead of one thread per logical core. Try it out! It is often hard to predict whether SMT helps or hurts. It can hurt for pure number crunching, where all your floating point ports are busy all the time; but often youâre latency constrained, and then SMT is close to a 2x performance win. By default julia tries to scale by physical cores, not by logical cores (imo a bad default for many workloads, but juliaâs ânumerical linear algebraâ roots show).
- Use even more threads! e.g. test on a machine with 64 physical cores / 128 logical cores.
You will probably see that both the locking and the channel solutions scale terribly. But if you only care about 8 threads, then your benchmarks have demonstrated that the locking solution scales well enough â no need to spend more time optimizing that.