Preallocation and data race in parallelization

Hi all,

I wonder what is the best way to deal with temporary arrays in function calls within a parallelized calculation.

The (very reduced) structure of my code is something like

function fun1()
    p = SA[1,2,3,4]
    q = SA[10,20,30,40]
	temp_arr = zeros(4,4)
    for i=1:3
        for j=(i+1):4
            temp_arr[i,j] = 2*p[i]*p[j] + 2*q[i]*q[j]
        end
    end
end

# large for loop:
for i = 1:1000
	fun1()
end

In order to remove any unnecessary in-time allocation, I preallocate the temporary array in the following way:

temp_arr = zeros(4,4)

function fun2(temp_arr)
    p = SA[1,2,3,4]
    q = SA[10,20,30,40]
    for i=1:3
        for j=(i+1):4
            temp_arr[i,j] = 2*p[i]*p[j] + 2*q[i]*q[j]
        end
    end
end

# large for loop:
for i = 1:1000
	fun2(temp_arr)
end

This worked as intended as I could remove any allocations from my function calls.

Now, in an attempt to speed-up the large for loop via parallelization (@threads for ... ), I started to run into nonsense results (not an “error”). I assume this is because of some data race happening, as the functions use the same preallocated temp_arr which is overwritten and therefore changed each time.

For the above example I managed to come up with a simple solution

function fun3()
    p = SA[1,2,3,4]
    q = SA[10,20,30,40]
	temp_arr3 = 2*p.*transpose(p) .+2*q.*transpose(q)
end

which is automatically an SMatrix and (to my understanding) therefore the compiler knows its size and does not need any in-time allocations. However, in my actual code there are several of those temporary arrays in use, and not always I can come up with such a simple expression as above. Also the other temporary arrays might be larger (i.e. having more dimensions) and therefore might violate the usual rule of thumb to use SArrays only when the number of elements is not more than 50 elements.

In that sense, I would like to learn how to manage such temporary arrays efficiently.

1 Like

Yes, you have a data race. If you need temporary arrays in your parallel code, you can make them task-local. I recently wrote a little bit about it for OhMyThreads.jl.

Alternatively, you could use a Channel and a producer-consumer pattern to pass temporary buffers to parallel tasks. (I’m in the process of expanding the discussion linked above to include this but can’t say when it will go live.) One possible implementation of the latter could be something like this:

provider = Channel() do provider  # provides the temporary arrays in a thread-safe manner
    for x in 1:Threads.nthreads()
        put!(provider, zeros(4,4))
    end
end

@sync for i in 1:1000
    @spawn begin
        temp_arr = take!(provider)
        fun2(temp_arr)
        put!(provider)
    end
end

You could also think about flipping things around like so:

provider = Channel() do provider # provides the data in a thread-safe manner
    for i in 1:1000
        put!(provider, i)
    end
end

ntasks = Threads.nthreads() # a positive number
@sync for _ in 1:ntasks
    @spawn for x in provider
        temp_arr = zeros(4,4) # task-local
        fun2(temp_arr)
    end
end

These ideas could be paired with chunking to reduce the number of channel queries (you would get a version of this with the former example if you used @threads because it internally chunks and creates “only” nthreads() tasks).

Side note: In case you aren’t aware of it, Julia is column-major order, so the order of your loops isn’t “natural”. Doesn’t matter much in this specific case because you’re not performing many iterations, but this can matter a lot in general.

2 Likes

Thank you for your reply!

So, if I understand correctly, for my example I would use it the following way?

using OhMyThreads: TaskLocalValue

function fun4(temp_arr)
    p = SA[1,2,3,4]
    q = SA[10,20,30,40]
    for i=1:3
        for j=(i+1):4
            temp_arr[i,j] = 2*p[i]*p[j] + 2*q[i]*q[j]
        end
    end
end

tls = TaskLocalValue{Matrix{Float64}}(() -> Matrix{Float64}(undef, 4, 4))

# large for loop:
@threads for i = 1:1000
    temp_arr = tls[]
    fun4(temp_arr)
end

Or does it not work for the “simple” @threads?

I appreciate your other suggestions, but they seem a bit more delicate in managing the threads. I probably need more time to study them in detail (and I have to admit, that when it comes to coding I often go with the most straightforward solution I can find…)

Also, on a sidenote, I realized that maybe there is actually a way to rewrite the other temporary arrays in my function in terms of StaticArrays that are not too large, but I will ask for some help in that regard in a separate topic. Then, I should hoepfully not need any extra care in task-local arrays.

In principle I am aware of it, but still thank you for reminding me. Ideally, I should come to the point where I write my loops in column-major order as a default. However I have a question in that regard: You mention that for this case it does not matter due to the low number of iterations (I assume you have the double-loop over i and j in mind). Will it become a problem if this function is called many times , e.g. in this larger outer loop? In other words, will the penalty due to wrong loop-order add up again if called many times?

It does and your usage looks correct (you probably want zeros(4,4) instead of Matrix{Float64}(undef, 4,4) though because your fun4 doesn’t overwrite every element of temp_arr).

(If your loop iterations have different computational cost (non-uniform workload) you won’t get load balancing with @threads though.)

Note that if you only want TaskLocalValue you could also just add TaskLocalValue.jl as a dependency rather than the full OhMyThreads.jl.

Generally speaking, yes. You matrix is very small though: Assuming a cache line size of 64 bytes (8 Float64 numbers) your matrix (4*4=16 elements) presumably takes up only two cache lines. The (negative) potential for cache misses is thus limited. But you should generally try to get used to writing loops in the correct order and not rely on “luck” to save you.

Also, perhaps stating the obvious, but you should wrap your threaded loop into a function. In general, try to avoid global scope when you care about performance.

Again, thank you for your reply with many clarifications.

Alright, I will then try to implement it into the actual code that I’m working on. Hopefully I can report back soon.

I dont know how uniform the load-balancing is in my case, but I think for now I will assume that it is the case and see the result. I have a guess that there should be some variation, but (hopefully) not massively. Also I’m not sure how I would determine the load-balancing. Maybe if the overall speedup is far away from a factor n=Threads.nthreads()?

The link to TaskLocalValue.jl is not working for me, I get an error message:

Got a 404 when trying to retrieve package information.

Either this package or version doesn’t exist or we’re still in the process of generating this page.

Indeed, I need to get to the point where the column-major ordering becomes natural. And yes, the threaded loop is within a function, I just extracted it for providing a MWE.

It should be TaskLocalValues.jl (note the final “s”).

1 Like

It’s plural, TaskLocalValues.jl.

No, there can be many reasons for imperfect acceleration, even fundamental ones like Ahmdal’s law. Moreover, in principle, you can get ideal speedup with load balancing as well (assuming perfect conditions).

Apart from thinking about your computation, you could time different iterations (and maybe create a histogram) or - with OhMyThreads.jl - could switch between DynamicScheduler(; nchunks=nthreads()) and, for example, DynamicScheduler(; nchunks=4*nthreads()) to simply test if you observe a speedup.

Alright I tried the implementation with TaskLocalValues, and indeed, there is no more data race happening. Thank you already so far. As this was my original question, I will mark your reply from above as answer.

As a sidenote: It was also possible to use the same for several temporary arrays:

tls = TaskLocalValue{Matrix{Float64}}(() -> zeros(4,4))
@sync @threads for i = 1:1000
    temp_arr1 = tls[]
    temp_arr2 = tls[]
    fun(temp_arr1,temp_arr2)
end

I have, however, some follow-up question regarding your latest remark, as the speedup of currently about 2x is a bit disappointing for 8 threads.

I failed to see how I should apply the DynamicScheduler(; nchunks=4*nthreads()) in my specific case. Must I provide this information to the for-loop?

No, in general, this is unsafe because temp_arr1 and temp_arr2 are the same matrix:

julia> tls = TaskLocalValue{Matrix{Float64}}(() -> zeros(4,4));

julia> a = tls[];

julia> b = tls[];

julia> a === b
true

What did you expect to get, and why? Assuming that you can get ideal speed up (8x for 8 threads) is often overly optimistic, see e.g. Amdahl’s law.

Well, with @threads it’s (currently) not possible to get load balancing. My comment is related to OhMyThreads.jl in which case you would replace the @threads for ... end (btw, you don’t need @sync) with

tforeach(1:1000; scheduler=DynamicScheduler(; nchunks=4*nthreads())) do i
    # loop body goes here
end

Note that the API is still a little bit in flux, because OMT is a very young package.

Interesting, then somehow in my specific case (not the MWE above) this problem does not occur. I will however change my code accordingly to be safe.

Well I was timing only the bare for-loop and not my whole code, so yes, I thought I would get at least somewhat close to it, maybe 6x or 7x. In any case, I have tested again, and could not reproduce the 2x speedup from before, but now its rather 4x. I didnt document the previous runs properly (it was merely a quick test) so maybe I did something wrong there. Still not perfect, as I was hoping for better scaling, especially for running the code on our local workstation, but 4x is an improvement nontheless. Also, I found more bottleneck-places in my code so I guess I will leave it as is for now.

Thanks, I will try that when I come back to further optimize!