Multi-threaded Array building

I somewhat strongly suspect your results here are not representative, since the sleep calls don’t actually take up CPU resources. If I were you, I would stick some random matrix multiplications in your benchmark functions to check if these results are applicable when the CPU is actually doing work.

But maybe the Atomic case is still faster then, Atomix can be pretty good.

By the way, for the unbalanced cases, you’ll get a big performance boost if you just spawn more threads, try something like 2 * Threads.nthreads() or even more.

2 Likes

Assuming your benchmark is representative of your real workload, this kind of empiricism is the right approach, :+1:

Just don’t forget that the right approach depends crucially on the behavior of your processing functions, the statistics of which indices are updated, etc.

You are now modeling a function that needs context switches, syscall, etc, but no CPU time – like e.g. accessing a small file. For that kind of workload, you should oversubscribe threads – the number of tasks for chunk-splitting should now reflect not the number of OS threads / cpu cores, but rather the real resource that is used (e.g. the queue depth of your SSD). Experiment with that until you figure out a good number.

This crucially depends on how expensive the generation of each index-triple ( i, j, v ) is. The atomic float add to an effectively random entry in the matrix, from many threads concurrently, has the potential to be slow, but this is a mere rounding error compared to something that takes an entire millisecond (4 million cycles, your sleep interval).

(slow because: Bad cache locality; your cpu probably doesn’t support atomic float ops and emulates via CAS loop; potentially lots of cache-line bouncing between cores)

2 Likes

Thanks @Mason and @foobar_lv2 for your advice, I have slightly modified my benchmark to replace the sleep() with randn(512,512) and increase the number of tasks:


function atomic_loop(f, N, B, arrayofindex)
    A = zeros(Float64, N, N)

    atomic_view = unsafe_wrap(AtomicMemory{Float64}, pointer(A), N * N; own = false)
    tforeach(arrayofindex; scheduler = :greedy, ntasks = Threads.nthreads() * 2) do idx
        u, v, y = f(B[idx]..., idx)
        Atomix.@atomic atomic_view[u + (v - 1) * N] += y
    end
    return A
end


function atomic_add_loop(f, N, B, arrayofindex)
    buffers = [Threads.Atomic{Float64}() for _ in 1:N, _ in 1:N]
    tforeach(arrayofindex; scheduler = :greedy, ntasks = Threads.nthreads() * 2) do idx
        u, v, y = f(B[idx]..., idx)
        Threads.atomic_add!(buffers[u, v], y)
    end
    return getindex.(buffers)
end

function chunked_loop(f, N, B, arrayofindex)
    nchunks = Threads.nthreads() * 2
    buffer = [zeros(Float64, N, N) for _ in 1:nchunks]
    #Threads.@threads  :dynamic for (ichunk, inds) in enumerate(index_chunks(arrayofindex; n = nchunks))
    OhMyThreads.@tasks for (ichunk, inds) in enumerate(index_chunks(arrayofindex; n = nchunks))
        @inbounds for idx in inds
            u, v, y = f(B[idx]..., idx)
            buffer[ichunk][u, v] += y
        end
    end

    return sum(buffer)
end

function chunked_loop2(f, N, B, arrayofindex)
    nchunks = Threads.nthreads() * 2

    As = zeros(Float64, N, N, nchunks) # add extra dimension corresponding to which task is operating
    @sync for (k, chunk_of_indices) in enumerate(chunks(arrayofindex; n = nchunks))
        Threads.@spawn begin
            @inbounds for idx in chunk_of_indices
                u, v, y = f(B[idx]..., idx)
                As[u, v, k] += y
            end
        end
    end
    # Now combine the different dimensions, you may want to also parallelize this step, but probably not
    return dropdims(sum(As; dims = 3); dims = 3)
end

function channel_loop(f, N, B, arrayofindex)
    nbuffers = Threads.nthreads() * 2
    chnl = Channel{Matrix{Float64}}(nbuffers)
    tforeach(1:nbuffers) do _
        put!(chnl, zeros(Float64, N, N))
    end
    tforeach(arrayofindex; scheduler = :greedy, ntasks = nbuffers) do idx
        #  tmap(arrayofindex) do idx
        u, v, y = f(B[idx]..., idx)
        As = take!(chnl)
        As[u, v] += y
        put!(chnl, As)
    end
    return tmapreduce(.+, 1:nbuffers) do _
        return take!(chnl)
    end
end
Method Unbalanced (K=10_000) Balanced (K=10_000)
simple_loop 13.588 s 3.473 s
channel_loop 4.060 s 1.485 s
atomic_loop 3.704 s 930.646 ms
atomic_add_loop 4.726 s 1.476 s
chunked_loop 7.781 s 2.119 s
chunked_loop2 5.148 s 1.823 s

It seems the @atomic is the right way to go for my purpose. The relative factor between methods may have changed probably due to allocations and gc caused by the extra randn().

I think it is close enough to my actual problem where K is several thousands and processing() is pure processing without files/network acces and involves some optimization routines take around 5ms each excepted sometime where it may take much longer or even fails (catched via try/catch).

I believe such a benchmark could be valuable, as there are topics in Julia, like multithreading, where there are many feasible approaches but not much insight.