Thanks @Mason and @foobar_lv2 for your advice, I have slightly modified my benchmark to replace the sleep()
with randn(512,512)
and increase the number of tasks:
function atomic_loop(f, N, B, arrayofindex)
A = zeros(Float64, N, N)
atomic_view = unsafe_wrap(AtomicMemory{Float64}, pointer(A), N * N; own = false)
tforeach(arrayofindex; scheduler = :greedy, ntasks = Threads.nthreads() * 2) do idx
u, v, y = f(B[idx]..., idx)
Atomix.@atomic atomic_view[u + (v - 1) * N] += y
end
return A
end
function atomic_add_loop(f, N, B, arrayofindex)
buffers = [Threads.Atomic{Float64}() for _ in 1:N, _ in 1:N]
tforeach(arrayofindex; scheduler = :greedy, ntasks = Threads.nthreads() * 2) do idx
u, v, y = f(B[idx]..., idx)
Threads.atomic_add!(buffers[u, v], y)
end
return getindex.(buffers)
end
function chunked_loop(f, N, B, arrayofindex)
nchunks = Threads.nthreads() * 2
buffer = [zeros(Float64, N, N) for _ in 1:nchunks]
#Threads.@threads :dynamic for (ichunk, inds) in enumerate(index_chunks(arrayofindex; n = nchunks))
OhMyThreads.@tasks for (ichunk, inds) in enumerate(index_chunks(arrayofindex; n = nchunks))
@inbounds for idx in inds
u, v, y = f(B[idx]..., idx)
buffer[ichunk][u, v] += y
end
end
return sum(buffer)
end
function chunked_loop2(f, N, B, arrayofindex)
nchunks = Threads.nthreads() * 2
As = zeros(Float64, N, N, nchunks) # add extra dimension corresponding to which task is operating
@sync for (k, chunk_of_indices) in enumerate(chunks(arrayofindex; n = nchunks))
Threads.@spawn begin
@inbounds for idx in chunk_of_indices
u, v, y = f(B[idx]..., idx)
As[u, v, k] += y
end
end
end
# Now combine the different dimensions, you may want to also parallelize this step, but probably not
return dropdims(sum(As; dims = 3); dims = 3)
end
function channel_loop(f, N, B, arrayofindex)
nbuffers = Threads.nthreads() * 2
chnl = Channel{Matrix{Float64}}(nbuffers)
tforeach(1:nbuffers) do _
put!(chnl, zeros(Float64, N, N))
end
tforeach(arrayofindex; scheduler = :greedy, ntasks = nbuffers) do idx
# tmap(arrayofindex) do idx
u, v, y = f(B[idx]..., idx)
As = take!(chnl)
As[u, v] += y
put!(chnl, As)
end
return tmapreduce(.+, 1:nbuffers) do _
return take!(chnl)
end
end
Method |
Unbalanced (K=10_000) |
Balanced (K=10_000) |
simple_loop |
13.588 s |
3.473 s |
channel_loop |
4.060 s |
1.485 s |
atomic_loop |
3.704 s |
930.646 ms |
atomic_add_loop |
4.726 s |
1.476 s |
chunked_loop |
7.781 s |
2.119 s |
chunked_loop2 |
5.148 s |
1.823 s |
It seems the @atomic
is the right way to go for my purpose. The relative factor between methods may have changed probably due to allocations and gc caused by the extra randn()
.
I think it is close enough to my actual problem where K is several thousands and processing()
is pure processing without files/network acces and involves some optimization routines take around 5ms each excepted sometime where it may take much longer or even fails (catched via try/catch
).
I believe such a benchmark could be valuable, as there are topics in Julia, like multithreading, where there are many feasible approaches but not much insight.