This is likely to be unavoidable due to the nature of what Value()
is approximating in the actual code. But I will look there to see if I can find any improvements.
Good spot! I re-wrote that part of the code (bettom) to remove the local arrays and make it race free, however this comes at the cost of having to assign a lot more tasks. Rather than one task per loop it becomes ~100, and the task_consumer
is no longer able to keep up, leading to a massive slow down and huge memory usage (maxing out the 64GB on my system) as the task_queue
fills up. I think that for this test case, the channel
approach just has too much overhead to be effective.
I did some scaling benchmarks on my system (i7-12700, 12 Core(s), 20 Logical Processor(s), 64GB RAM):
@time Test_Serial()
num samples = 1000000
11.818466 seconds (21 allocations: 281.985 MiB, 0.22% gc time)
@time Test_MultiThread_Locks()
num samples = 1000000, num Threads = 1
12.109094 seconds (52 allocations: 281.987 MiB)
@time Test_MultiThread_Locks()
num samples = 4000000, num Threads = 4
12.463613 seconds (72 allocations: 281.994 MiB, 0.10% gc time)
@time Test_MultiThread_Locks()
num samples = 8000000, num Threads = 8
13.790676 seconds (100 allocations: 282.005 MiB)
@time Test_MultiThread_Locks()
num samples = 16000000, num Threads = 16
19.694665 seconds (156 allocations: 282.027 MiB)
The scaling looks very good up to about 8 threads, given my array of locks is only 20 elements long, going above 8 threads the chances of lock conflicts increases and is probably causing the slow down from perfect scaling. This could be fixed I guess by adding more locks.
This is the plan for running the actual code!
Again thank you to everyone who has been so helpful on this thread!
Updated channel
code that is race free:
function Test_MultiThread_Channel()
# allocate arrays, actual code has a pair of arrays rather than just one
Array1Size = (22,20,20,20,20,20)
ArrayOfValues1 = zeros(UInt32,Array1Size)
ArrayOfTallys1 = zeros(UInt32,Base.tail(Array1Size))
Array2Size = (20,20,20,20)
ArrayOfValues2 = zeros(UInt32,Array2Size)
ArrayOfTallys2 = zeros(UInt32,Array2Size)
numSamples = 100000 # 8 workers each sampling numSamples
# set up channel
task_queue = Channel{Task}(Inf)
# set up consumer
task_consumer = Threads.@spawn begin
for task in task_queue
schedule(task)
wait(task)
end
end
#perform MC sampling
workers = [Test_MultiThread_MC_Channel!(ArrayOfValues1,ArrayOfTallys1,ArrayOfValues2,ArrayOfTallys2,numSamples,task_queue) for i in 1:8]
# check if all tasks complete
wait.(workers)
close(task_queue)
wait(task_consumer)
println(sum(ArrayOfTallys2)) # should equal 8*numSamples
println(sum(ArrayOfTallys1))
println(sum(ArrayOfValues2))
println(sum(ArrayOfValues1))
# some saving of arrays goes on here
end
function Test_MultiThread_MC_Channel!(ArrayOfValues1::Array{UInt32,6},ArrayOfTallys1::Array{UInt32,5},ArrayOfValues2::Array{UInt32,4},ArrayOfTallys2::Array{UInt32,4},numSamples,task_queue::Channel{Task})
Threads.@spawn begin
for i in 1:numSamples
Value3 = Value()
(loc1,loc2,loc3,loc4) = (Loc(),Loc(),Loc(),Loc())
loc1234 = CartesianIndex(loc1,loc2,loc3,loc4)
ArrayOfTallys1View = @view(ArrayOfTallys1[:,loc1234])
if Value3 != UInt32(0)
ArrayOfValues1View = @view(ArrayOfValues1[:,:,loc1234])
for j in 1:100
(Value1,Value2) = (Value(),Value())
(loc5,loc6) = (Loc2(),Loc())
loc5x = Loc2X(loc5)
(loc5p,loc6p) = (Loc2(),Loc())
loc5xp = Loc2X(loc5p)
task = @task begin
ArrayOfValues1View[loc5x,loc6] += Value1
ArrayOfTallys1View[loc6] += UInt32(1)
ArrayOfValues1View[loc5xp,loc6] += Value2
end
put!(task_queue,task)
if loc6 != loc6p
task = @task ArrayOfTallys1View[loc6p] += UInt32(1)
put!(task_queue,task)
end
end
else
task = @task ArrayOfTallys1View .+= UInt32(1)
put!(task_queue,task)
end
task = @task begin
ArrayOfValues2[loc1234] += Value3
ArrayOfTallys2[loc1234] += UInt32(1)
end
put!(task_queue,task)
end
end # threads
end