Multi-threaded Array building

Dear all,
I have a code of this kind to build an array:

N= 2000
A = zeros(Float64,N,N)
B # some long (>>10_000) vector of custom struct
for idx in arrayofindex #> lots (>>10_000) of elements
      i,j,v = processing(B[idx])
     A[i,j] += v
end

It is quite long as processing() take some times and I want to multithreaded it ( processing() is thread-safe).
I have read https://discourse.julialang.org/t/editing-an-array-with-multiple-threads/25364 and https://discourse.julialang.org/t/thread-safe-array-building/3275/2 but these answers (that are using of A[threadid()]) seems outdated.

What is now (with julia 1.10 or 1.11) the best way to multithread that kind of array building function?

I just saw that a very similar question Pool of locks? appears when I was writing this. I’ll see if the solution given there answer my problem

You want to do data conversion? from B to A?
if this is a deterministic procedure, why don’t you just do it once and then save A?
If this data conversion procedure the main workload in your overall procedure?

I think Threads.@threads can do what you want.

No it is not a conversion and (i,j) can appear multiple time.
I could have use @threads to build an array of (i,j,v) for all idx and then performing the reduction but in my actual problem I can’t really store all (i,j,v) (it is a lot of memory).

What about

A = [Threads.Atomic{Float64}(0) for _ = 1:N, _ = 1:N]
Threads.@threads for k = eachindex(B)
    local (i, j, v) = processing(B[k])
    Threads.atomic_add!(A[i, j], v)
end

Finally recover A = map(x -> x[], A).

1 Like

Is your array sparse?

No A is basically an image with many patterns of parameters stored in B

Thanks, I’ll try this and the ones given in Pool of locks? .
I will report a small benchmark as soon I have the time to perform it

You could

  1. Use task_local_storage to have a per-task array (rather than per-thread). e.g. No more threadid indexing? [thread-local storage] - #12 by stevengj
  2. Use ChunkSplitters.jl (ala Why has simple threading using `threadid` become so complex in v1.12...? - #28 by foobar_lv2) to create a per-chunk array. (This has the downside of imposing essentially a static parallelization schedule, which might be bad if processing(B[idx]) could take a very different amount of time depending on the argument.)
  3. Use some higher-level primitives like in OhMyThreads.jl or Transducers.jl, e.g. expressing this as a parallel reduction operation.

Why not use atomics? E.g.

N = 2000
buffers = [Threads.Atomic{Float64}() for _ in 1:N, _ in 1:N]
B # some long (>>10_000) vector of custom struct
OhMyThreads.@tasks for idx in arrayofindex #> lots (>>10_000) of elements
      i,j,v = processing(B[idx])
     Threads.atomic_add!(buffers[i, j], v)
end
A = getindex.(buffers)

The appropriate way of using atomics is

atomic_view =  unsafe_wrap(AtomicMemory{Float64}, pointer(A), N*N; own=false)
...
@atomic atomic_view[ i + (j-1)*N ] += v

However, you won’t be happy with the performance.

For your specific value of N, just give each thread its private copy of A (it’s a mere 32mb) and then add all the A copies together single-threaded.

Or adjust your sample code to be more representative of your real problem: Give us ballpark estimates for the actual sizes / numbers you care about.

Could you please elaborate on what is inappropriate about my suggestion using Threads.Atomic?

You are building a large array of object references, each one pointing to a separately allocated box containing a single Float64, with atomic access.

I think it is somewhat unfortunate that julia has chosen the somewhat insane C++ as a model for atomics, instead of the reasonable C; i.e. that atomicity is a property of the object / field / memory location, instead of a property of the access. Heck, even java supports C-style atomics with VarHandle. (the syntax / ergonomics of VarHandle are somewhat atrocious, but at least the language provides the necessary tools)

But that ship has sailed unfortunately. Maybe we’ll get a VarHandle analogue in the future.

3 Likes

Atomic is a mutable struct, so each of them is allocated on the heap. Which means that your buffers Vector is a vector of pointers. This can cause inefficient use of the cache. Whether this is a performance problem depends on the use.

In the upcoming v1.12 version, the @atomic macro can be used on array elements:

Sure. In the OP the assumption was that the computations take quite a long time, so I wouldn’t expect this to be a bottleneck. In a very hot loop I wouldn’t have suggested this pattern. Thanks for the clarification.

In the upcoming v1.12 version, the @atomic macro can be used on array elements:

Unfortunately not on current nightly:

julia> v=[1]; m=v.ref.mem; @atomic m[1]
ERROR: ConcurrencyViolationError("memoryrefget: non-atomic memory cannot be accessed atomically")

In my view, this is plain bullshit. Atomic ops should be supported if alignment + size + architecture support them. And :notatomic should be a valid memory memory order that simply ignores the locks.

So sure, the ConcurrencyViolationError is sensible for v=[(1,2,3)]; m=v.ref.mem; @atomic m[1] because my machine doesn’t have an instruction to atomically load 24 bytes with 8 byte alignment; and there exists no lock that we could use as a fallback since I didn’t allocate space for it by making m an AtomicMemory.

1 Like
N= 2000
A = zeros(Float64,N,N)
B # some long (>>10_000) vector of custom struct
for idx in arrayofindex #> lots (>>10_000) of elements
      i,j,v = processing(B[idx])
     A[i,j] += v
end

It depends a lot on the details, but assuming I have enough RAM to store multiple copies of A, my intuition would be to parallelize this like so:

using ChunkSplitters
function f(N, B, array_of_index; ntasks=Threads.nthreads())
    As = zeros(Float64, N, N, ntasks) # add extra dimension corresponding to which task is operating
    @sync for (k, chunk_of_indices) in enumerate(chunks(array_of_index; n=ntasks))
        Threads.@spawn begin
            local A = @view As[:, :, k]
            for idx in chunk_of_indices
                i,j,v = processing(B[idx])
                A[i,j] += v
            end
        end
    end
    # Now combine the different dimensions, you may want to also parallelize this step, but probably not
    dropdims(sum(As; dims=3); dims=3)
end

This has the advantage that each task gets its own buffer that it can work on in a lock-free manner, so there’s no per-index overhead in getting and setting indices. However, it also has the disadvantage that it allocates a lot more memory which may introduce its own bottlenecks.

I find this style much easier to reason about and use than atomics or locks though, and is typically preferable to creating N^2 independant locks, or N^2 independant heap allocated AtomicInts.

2 Likes

I have made a small toy model for benchmarking with several ways to implement this multithreading:

using Primes
using Atomix
using OhMyThreads
using ChunkSplitters

function unbalanced_processing1(μx, μy, σ, i)
    isprime(i) && sleep(0.05)
    sleep(0.001)
    u = round(Int, μx)
    v = round(Int, μy)
    y = σ / sqrt((u - μx)^2 + ((v - μy)^2))
    return (u, v, y)
end

function balanced_processing1(μx, μy, σ, i)
    sleep(0.001)
    u = round(Int, μx)
    v = round(Int, μy)
    y = σ / sqrt((u - μx)^2 + ((v - μy)^2))
    return (u, v, y)
end


function simple_loop(f, N, B, arrayofindex)
    A = zeros(Float64, N, N)
    @inbounds for idx in arrayofindex
        u, v, y = f(B[idx]..., idx)
        A[u, v] += y
    end
    return A
end


function atomic_loop(f, N, B, arrayofindex)
    A = zeros(Float64, N, N)
    atomic_view = unsafe_wrap(AtomicMemory{Float64}, pointer(A), N * N; own = false)
    Threads.@threads  :dynamic for idx in arrayofindex
        u, v, y = f(B[idx]..., idx)
        Atomix.@atomic atomic_view[u + (v - 1) * N] += y
    end
    return A
end


function atomic_add_loop(f, N, B, arrayofindex)
    buffers = [Threads.Atomic{Float64}() for _ in 1:N, _ in 1:N]
    Threads.@threads for idx in arrayofindex
        u, v, y = f(B[idx]..., idx)
        Threads.atomic_add!(buffers[u, v], y)
    end
    return getindex.(buffers)
end

function chunked_loop(f, N, B, arrayofindex)
    nchunks = Threads.nthreads()
    buffer = [zeros(Float64, N, N) for _ in 1:nchunks]
    Threads.@threads  :dynamic for (ichunk, inds) in enumerate(index_chunks(arrayofindex; n = nchunks))
        @inbounds for idx in inds
            u, v, y = f(B[idx]..., idx)
            buffer[ichunk][u, v] += y
        end
    end
    return sum(buffer)
end

function chunked_loop2(f, N, B, arrayofindex)
    nchunks = Threads.nthreads()
    As = zeros(Float64, N, N, nchunks) # add extra dimension corresponding to which task is operating
    @sync for (k, chunk_of_indices) in enumerate(chunks(arrayofindex; n = nchunks))
        Threads.@spawn begin
            @inbounds for idx in chunk_of_indices
                u, v, y = f(B[idx]..., idx)
                As[u, v, k] += y
            end
        end
    end
    # Now combine the different dimensions, you may want to also parallelize this step, but probably not
    return dropdims(sum(As; dims = 3); dims = 3)
end

function channel_loop(f, N, B, arrayofindex)
    nbuffers = Threads.nthreads()
    chnl = Channel{Matrix{Float64}}(nbuffers)
    foreach(1:nbuffers) do _
        put!(chnl, zeros(Float64, N, N))
    end
    tmap(arrayofindex) do idx
        u, v, y = f(B[idx]..., idx)
        As = take!(chnl)
        As[u, v] += y
        put!(chnl, As)
    end
    return mapreduce(.+, 1:nbuffers) do _
        return take!(chnl)
    end
end

Here is the output of @btime

K = 100
N = 2048
M = 1_000_000

B = [(rand() * (N - 20) + 10, rand() * (N - 20) + 10, randn()) for i in 1:M]
arrayofindex = [rand(1:M) for i in 1:K]

@btime simple_loop(balanced_processing1, N, B, arrayofindex);

@btime simple_loop(unbalanced_processing1, N, B, arrayofindex);

...

for K=100

Method Unbalanced Time Unbalanced Allocs Unbalanced Mem Balanced Time Balanced Allocs Balanced Mem
simple_loop 584.909 ms 2,252 32.05 MiB 232.154 ms 2,224 32.05 MiB
channel_loop 393.912 ms 698 992.03 MiB 307.413 ms 668 992.03 MiB
atomic_loop 120.286 ms 520 32.02 MiB 23.987 ms 491 32.02 MiB
atomic_add_loop 166.522 ms 4,194,825 128.02 MiB 57.694 ms 4,194,796 128.02 MiB
chunked_loop 564.911 ms 683 992.03 MiB 304.707 ms 584 992.02 MiB
chunked_loop2 456.940 ms 545 544.02 MiB 355.152 ms 515 544.02 MiB

for K=10_000

Method Unbalanced Time Unbalanced Allocs Unbalanced Mem Balanced Time Balanced Allocs Balanced Mem
simple_loop 63.656 s 228,734 37.37 MiB 23.596 s 220,559 37.00 MiB
channel_loop 5.904 s 43,810 993.72 MiB 3.482 s 40,656 993.63 MiB
atomic_loop 5.807 s 43,616 33.33 MiB 2.922 s 40,460 33.23 MiB
atomic_add_loop 6.213 s 4,237,914 129.33 MiB 2.822 s 4,234,757 129.24 MiB
chunked_loop 8.773 s 45,510 993.39 MiB 3.266 s 40,550 993.24 MiB
chunked_loop2 6.749 s 43,640 545.33 MiB 3.533 s 40,492 545.24 MiB

It seems that @atomic as proposed by @foobar_lv2 is a good solution and I’m ā€œhappy with the performanceā€

I did not find an easy way to use Transducers or task_local_storage