I have made a small toy model for benchmarking with several ways to implement this multithreading:
using Primes
using Atomix
using OhMyThreads
using ChunkSplitters
function unbalanced_processing1(μx, μy, Ļ, i)
isprime(i) && sleep(0.05)
sleep(0.001)
u = round(Int, μx)
v = round(Int, μy)
y = Ļ / sqrt((u - μx)^2 + ((v - μy)^2))
return (u, v, y)
end
function balanced_processing1(μx, μy, Ļ, i)
sleep(0.001)
u = round(Int, μx)
v = round(Int, μy)
y = Ļ / sqrt((u - μx)^2 + ((v - μy)^2))
return (u, v, y)
end
function simple_loop(f, N, B, arrayofindex)
A = zeros(Float64, N, N)
@inbounds for idx in arrayofindex
u, v, y = f(B[idx]..., idx)
A[u, v] += y
end
return A
end
function atomic_loop(f, N, B, arrayofindex)
A = zeros(Float64, N, N)
atomic_view = unsafe_wrap(AtomicMemory{Float64}, pointer(A), N * N; own = false)
Threads.@threads :dynamic for idx in arrayofindex
u, v, y = f(B[idx]..., idx)
Atomix.@atomic atomic_view[u + (v - 1) * N] += y
end
return A
end
function atomic_add_loop(f, N, B, arrayofindex)
buffers = [Threads.Atomic{Float64}() for _ in 1:N, _ in 1:N]
Threads.@threads for idx in arrayofindex
u, v, y = f(B[idx]..., idx)
Threads.atomic_add!(buffers[u, v], y)
end
return getindex.(buffers)
end
function chunked_loop(f, N, B, arrayofindex)
nchunks = Threads.nthreads()
buffer = [zeros(Float64, N, N) for _ in 1:nchunks]
Threads.@threads :dynamic for (ichunk, inds) in enumerate(index_chunks(arrayofindex; n = nchunks))
@inbounds for idx in inds
u, v, y = f(B[idx]..., idx)
buffer[ichunk][u, v] += y
end
end
return sum(buffer)
end
function chunked_loop2(f, N, B, arrayofindex)
nchunks = Threads.nthreads()
As = zeros(Float64, N, N, nchunks) # add extra dimension corresponding to which task is operating
@sync for (k, chunk_of_indices) in enumerate(chunks(arrayofindex; n = nchunks))
Threads.@spawn begin
@inbounds for idx in chunk_of_indices
u, v, y = f(B[idx]..., idx)
As[u, v, k] += y
end
end
end
# Now combine the different dimensions, you may want to also parallelize this step, but probably not
return dropdims(sum(As; dims = 3); dims = 3)
end
function channel_loop(f, N, B, arrayofindex)
nbuffers = Threads.nthreads()
chnl = Channel{Matrix{Float64}}(nbuffers)
foreach(1:nbuffers) do _
put!(chnl, zeros(Float64, N, N))
end
tmap(arrayofindex) do idx
u, v, y = f(B[idx]..., idx)
As = take!(chnl)
As[u, v] += y
put!(chnl, As)
end
return mapreduce(.+, 1:nbuffers) do _
return take!(chnl)
end
end
Here is the output of @btime
K = 100
N = 2048
M = 1_000_000
B = [(rand() * (N - 20) + 10, rand() * (N - 20) + 10, randn()) for i in 1:M]
arrayofindex = [rand(1:M) for i in 1:K]
@btime simple_loop(balanced_processing1, N, B, arrayofindex);
@btime simple_loop(unbalanced_processing1, N, B, arrayofindex);
...
for K=100
Method |
Unbalanced Time |
Unbalanced Allocs |
Unbalanced Mem |
Balanced Time |
Balanced Allocs |
Balanced Mem |
simple_loop |
584.909 ms |
2,252 |
32.05 MiB |
232.154 ms |
2,224 |
32.05 MiB |
channel_loop |
393.912 ms |
698 |
992.03 MiB |
307.413 ms |
668 |
992.03 MiB |
atomic_loop |
120.286 ms |
520 |
32.02 MiB |
23.987 ms |
491 |
32.02 MiB |
atomic_add_loop |
166.522 ms |
4,194,825 |
128.02 MiB |
57.694 ms |
4,194,796 |
128.02 MiB |
chunked_loop |
564.911 ms |
683 |
992.03 MiB |
304.707 ms |
584 |
992.02 MiB |
chunked_loop2 |
456.940 ms |
545 |
544.02 MiB |
355.152 ms |
515 |
544.02 MiB |
for K=10_000
Method |
Unbalanced Time |
Unbalanced Allocs |
Unbalanced Mem |
Balanced Time |
Balanced Allocs |
Balanced Mem |
simple_loop |
63.656 s |
228,734 |
37.37 MiB |
23.596 s |
220,559 |
37.00 MiB |
channel_loop |
5.904 s |
43,810 |
993.72 MiB |
3.482 s |
40,656 |
993.63 MiB |
atomic_loop |
5.807 s |
43,616 |
33.33 MiB |
2.922 s |
40,460 |
33.23 MiB |
atomic_add_loop |
6.213 s |
4,237,914 |
129.33 MiB |
2.822 s |
4,234,757 |
129.24 MiB |
chunked_loop |
8.773 s |
45,510 |
993.39 MiB |
3.266 s |
40,550 |
993.24 MiB |
chunked_loop2 |
6.749 s |
43,640 |
545.33 MiB |
3.533 s |
40,492 |
545.24 MiB |
It seems that @atomic
as proposed by @foobar_lv2 is a good solution and Iām āhappy with the performanceā