Pre-allocate intermediate arrays for multi-threading

Hi, I need help in pre-allocating some inner-variables of a for loop to reduce allocations in a multi-threaded code. I am creating multiple local arrays inside the for loop for intermediate calculations. Here’s a MWE with pre-allocation and parallelised using Distributed. How do I make this a multi-threaded code without running into data-races and avoiding multiple allocations?

function test_fun(N,N_l)
          r_c_vector = rand(N,3) #input
          F_r_c_vector = rand(N,3) #input
          R_star = 1 #input
          l_vector = rand(N_l,3) #input
          u_Real = SharedArray{Float64}(N,3) #output
          #Pre-allocation
          dx_temp_1_full = zeros(length(r_c_vector[:,1]))
          dx_temp_2_full = similar(dx_temp_1_full)
          dx_temp_3_full = similar(dx_temp_1_full)
          dx_temp_full = similar(dx_temp_1_full)
          
          @sync @distributed for i=1:N
            temp_u_prime_1 = 0
            temp_u_prime_2 = 0
            temp_u_prime_3 = 0
            for l in eachindex(l_vector[:,1])
                    dx_temp_1_full .= r_c_vector[i,1].-r_c_vector[:,1] .- l_vector[l,1]
                    dx_temp_2_full .= r_c_vector[i,2].-r_c_vector[:,2] .- l_vector[l,2]
                    dx_temp_3_full .= r_c_vector[i,3].-r_c_vector[:,3] .- l_vector[l,3]
                    dx_temp_full   .=  sqrt.(dx_temp_1_full.^2 .+ dx_temp_2_full.^2 .+ dx_temp_3_full.^2)
                    #Only take points within R_star
                    cutoff_ids = findall(dx_temp_full .< R_star)
                    if isempty(cutoff_ids)
                      continue
                    end
                    dx_temp = dx_temp_full[cutoff_ids]
                    dx_temp_1 = dx_temp_1_full[cutoff_ids]
                    dx_temp_2 = dx_temp_2_full[cutoff_ids]
                    dx_temp_3 = dx_temp_3_full[cutoff_ids]
                    
                    A11 = exp.(dx_temp.^2 .+ dx_temp_1.^2) #this is more complex
                    A12 = exp.(dx_temp_1.*dx_temp_2)
                    A13 = exp.(dx_temp_1.*dx_temp_3)
                    A22 = exp.(dx_temp.^2 .+ dx_temp_2.^2)
                    A23 = exp.(dx_temp_2.*dx_temp_3)
                    A33 = exp.(dx_temp.^2 .+ dx_temp_3.^2)
        
                    temp_u_prime_1 = temp_u_prime_1 + sum(A11.*F_r_c_vector[cutoff_ids,1] .+ A12.*F_r_c_vector[cutoff_ids,2] .+ A13.*F_r_c_vector[cutoff_ids,3])
                    temp_u_prime_2 = temp_u_prime_2 + sum(A12.*F_r_c_vector[cutoff_ids,1] .+ A22.*F_r_c_vector[cutoff_ids,2] .+ A23.*F_r_c_vector[cutoff_ids,3])
                    temp_u_prime_3 = temp_u_prime_3 + sum(A13.*F_r_c_vector[cutoff_ids,1] .+ A23.*F_r_c_vector[cutoff_ids,2] .+ A33.*F_r_c_vector[cutoff_ids,3])
            end
            u_Real[i,:] = [temp_u_prime_1,temp_u_prime_2,temp_u_prime_3]
          end
          return u_Real
      end

For multi-threading, i had to stop pre-allocation due to data races but then it makes it slightly worse than the distributed case. I have read that multi-threading is supposed to be faster than distributed computing and so I want to make this switch.
PS: I believe I have made the serial job as fast as I can but if you have any suggestions, I’d be grateful for them.

why are you doing this?

one naive approach is to just move these pre-allocation into the parallel for-loop, so that they become task-local.

Why am I using distributed? To parallelize the loop. I am unfamiliar with multi-threading, so went for a multi-processing approach.

As I mentioned, I have tried moving the pre-allocation inside the loop to use multi-threading. That does the job but doesn’t give any advantage over this pre-allocated Distributed implementation.

Hi, this is the best I could get, I hope the logic is still fine,

function test_fun!(u_Real, r_c_vector, F_r_c_vector, R_star, l_vector, cache_per_thread)
    N = size(r_c_vector, 1)

    Threads.@threads for i in 1:N
        tid = Threads.threadid()
        dx_temp_1_full, dx_temp_2_full, dx_temp_3_full, dx_temp_full, cutoff_ids_int = cache_per_thread[tid]

        temp_u_prime_1 = 0.0
        temp_u_prime_2 = 0.0
        temp_u_prime_3 = 0.0

        for l in axes(l_vector, 1)
            cutoff_len = 0
            for j in 1:N
                dx1 = r_c_vector[i, 1] - r_c_vector[j, 1] - l_vector[l, 1]
                dx2 = r_c_vector[i, 2] - r_c_vector[j, 2] - l_vector[l, 2]
                dx3 = r_c_vector[i, 3] - r_c_vector[j, 3] - l_vector[l, 3]
                dx = sqrt(dx1^2 + dx2^2 + dx3^2)

                dx_temp_1_full[j] = dx1
                dx_temp_2_full[j] = dx2
                dx_temp_3_full[j] = dx3
                dx_temp_full[j] = dx

                if dx < R_star
                    cutoff_len += 1
                    cutoff_ids_int[cutoff_len] = j
                end
            end

            if cutoff_len == 0
                continue
            end

            for idx in 1:cutoff_len
                j = cutoff_ids_int[idx]
                dx = dx_temp_full[j]
                dx1 = dx_temp_1_full[j]
                dx2 = dx_temp_2_full[j]
                dx3 = dx_temp_3_full[j]

                A11 = exp(dx^2 + dx1^2)
                A12 = exp(dx1 * dx2)
                A13 = exp(dx1 * dx3)
                A22 = exp(dx^2 + dx2^2)
                A23 = exp(dx2 * dx3)
                A33 = exp(dx^2 + dx3^2)

                F = @view F_r_c_vector[j, :]

                temp_u_prime_1 += A11 * F[1] + A12 * F[2] + A13 * F[3]
                temp_u_prime_2 += A12 * F[1] + A22 * F[2] + A23 * F[3]
                temp_u_prime_3 += A13 * F[1] + A23 * F[2] + A33 * F[3]
            end
        end

        u_Real[i, 1] = temp_u_prime_1
        u_Real[i, 2] = temp_u_prime_2
        u_Real[i, 3] = temp_u_prime_3
    end
end

function create_thread_cache(N)
    nthreads = Threads.nthreads()
    return [(
        zeros(N), # dx_temp_1_full
        zeros(N), # dx_temp_2_full
        zeros(N), # dx_temp_3_full
        zeros(N), # dx_temp_full
        Vector{Int}(undef, N) # cutoff_ids_int
    ) for _ in 1:nthreads]
end

using Base.Threads

N = 100
r_c_vector = rand(N, 3)
F_r_c_vector = rand(N, 3)
R_star = 1.0
l_vector = rand(N, 3)
u_Real = zeros(N, 3)

cache = create_thread_cache(N)

using BenchmarkTools

@btime test_fun!($u_Real, $r_c_vector, $F_r_c_vector, $R_star, $l_vector, $cache)

You will need to start julia with julia -t auto

The best is to use, or follow the patterns used and recommended in the OhMyThreads.jl and, for a lower level, ChunkSplitters.jl packages. The docs of those packages provide nice examples.

3 Likes

After reading PSA: Thread-local state is no longer recommended, I am not sure if this might run into data races.

But a huge thanks for making me realize that I could just iterate over another loop inside. Coming from matlab, I need to drop my habit of trying to vectorize. This way I don’t even need to work with arrays and I can also continue as soon as any dx{i} > R_star.

I could not understand how the boxed variables work in the OhMyThreads package, do you just disable it when you are trying to allocate to a new array instead of a reduction?
I could however make sense of ChunkSplitters and just slapped this on top

@inbounds Threads.@threads for inds in index_chunks(r_c_vector[:,1]; n=Threads.nthreads()) #
        #Pre-allocation here
        for i in inds
           #main loop
        end
end

I don’t know if this is the best way to use the package but this does the job. This time I can actually just break this into a nested loop like @yolhan_mannes did but I’ll likely have to delve more into these packages in future.

I made cache for each thread to avoid it in

function create_thread_cache(N)
    nthreads = Threads.nthreads()
    return [(
        zeros(N), # dx_temp_1_full
        zeros(N), # dx_temp_2_full
        zeros(N), # dx_temp_3_full
        zeros(N), # dx_temp_full
        Vector{Int}(undef, N) # cutoff_ids_int
    ) for _ in 1:nthreads]
end

, I checked that the results in u_Real are stable through iterations so, I don’t think there is any data races here

Yes, that’s a proper way to do it. Just the @inbounds there is not useful and if you add a @views(r_c_vector[:,1]) you avoid allocating the intermediate array of this view.

Preallocating stuff for each thread is not a good of doing/thinking about this. You should think in terms of Tasks. Have a look at this for context

Note that @pulk_jain’s solution does exactly that: Preallocate for each task and thus is safe.

Oh ok didn’t know about that one, so either go for static threads (same perf here) or go for task spawning Indeed. In the case I tried I guess the work asked to each thread and the big loop leads to non appearance of the issue but it could.

nevermind , you could simplify the code to this point avoiding any caches

function test_fun!(u, r, F, R_star::T, l) where T
    N = size(u, 1)
    @inbounds Threads.@threads for i in 1:N
        ri1, ri2, ri3 = @view r[i, :]
        s1 = zero(T)
        s2 = zero(T)
        s3 = zero(T) 
        @inbounds for j in 1:N
            rj1, rj2, rj3 = @view r[j, :]
            rij1 = ri1 - rj1
            rij2 = ri2 - rj2
            rij3 = ri3 - rj3
            Fj1, Fj2, Fj3 = @view F[j, :]
            @inbounds for k in 1:N
                lk1, lk2, lk3 = @view l[k, :]
                dx1 = rij1 - lk1
                dx2 = rij2 - lk2
                dx3 = rij3 - lk3
                dxx = dx1 * dx1 + dx2 * dx2 + dx3 * dx3
                @inbounds @fastmath if dxx < R_star*R_star
                    ex12 = exp(dx1 * dx2)
                    ex13 = exp(dx1 * dx3)
                    ex23 = exp(dx2 * dx3)
                    s1 += exp(dxx + dx1 * dx1) * Fj1 + ex12 * Fj2 + ex13 * Fj3
                    s2 += ex12 * Fj1 + exp(dxx + dx2 * dx2) * Fj2 + ex23 * Fj3
                    s3 += ex13 * Fj1 + ex23 * Fj2 + exp(dxx + dx3 * dx3) * Fj3
                end
            end
        end
        @inbounds u[i, 1] = s1
        @inbounds u[i, 2] = s2
        @inbounds u[i, 3] = s3
    end
end

Or with another package.

This shouldn’t affect performance meaningfully (if it’s otherwise well-written), but it looks like this code might be a bit simpler if you used SVector from StaticArrays.jl. There seems to be some light linear algebra here that’s tedious to write out by hand but was presumably done that way to avoid allocations. With SVector you can write this with nice linear algebra without causing allocations.

You can use rand(SVector{3, Float64}, N) to make N 3-vectors instead of making a rand(N, 3) that you frequently slice to get those 3-vectors.

If, now that I see, you are computing particle interactions within a cutoff, you probably might want to use CellListMap.jl

(which is fast and parallel)