Fast Multi-Threaded Array Editing without data races?

Note that with these data sizes and relatively little work for each index, you are likely to experience serious memory pressure. At least cache-misses, and likely TLB-misses which are very costly. The former can be alleviated by collecting indices close to each other by sorting them. The latter can be alleviated by running with larger page sizes. How to do that depends on the OS.

2 Likes

I have set up the code as you have suggested. Going by my memory usage, tasks appear to be assigned to the task_queue, but are never actually carried out by the task_consumer (confirmed by not allocations being made to the Arrays). Do I need to call these in a specific way that isn’t in the code you wrote?

I had a bug, it was missing an end. Here’s a simplified version that works:

task_queue = Channel{Task}(Inf)
ArrayOfValues = zeros(10)
task_consumer = Threads.@spawn begin
    for task in task_queue
         schedule(task)
         wait(task)
    end
end

function make_worker(task_queue)
    Threads.@spawn begin
       while true
         index = rand(1:10)
         value = rand()
         task = @task ArrayOfValues[index] = value
         sleep(5)
         put!(task_queue, task)
       end
    end
end

julia> ArrayOfValues
10-element Vector{Float64}:
 0.0
 0.7950942448474229
 0.7009708954929078
 0.957567172439463
 0.19631557112533105
 0.453290143877633
 0.8150300024119421
 0.20190612368343674
 0.0
 0.6006649024766615

julia> ArrayOfValues
10-element Vector{Float64}:
 0.6321285176596904
 0.2972683412784255
 0.3590054369079717
 0.7070913651228008
 0.16218331525499274
 0.5834609447257812
 0.5097080697152006
 0.2886712728113805
 0.9525001298254895
 0.13213068752448642
1 Like

I have got this to work, however it remains slower than the serial version. I believe the bottleneck is applying all the tasks on the single core. To ensure all tasks have been completed, I have to employ a long sleep. This sleep ends up being typically longer than the serial version takes. In addition the build up of tasks leads to large memory consumption, which only gets worse with increased sampling.

The sleep function is obviously inefficient as it is just a guess as to when all the tasks will have finished. Is there a neater way of waiting for this?

To speed up allocations, in general I don’t think the suggestion of @sgaure of sorting the indices will help as they tend to not be densely packed. However, perhaps the fact that I am allocating to a 6D array is hurting things (as Julia is column major), would reshapeing it to a 1D vector for the purpose of allocating, and converting the locs to a LinearIndicies help?

Is there also a quicker way of allocating the @view slices of the array?

You shouldn’t need a sleep in the real version, that was just for demonstration. Your workers don’t need to wait for the tasks to complete, they can just put them in the queue. And the queue processing should be really fast if the only thing the tasks are doing is updating the array.

If you’re worried about overloading the queue, you can do something like task_queue = Channel{Task}(32), which will only allow 32 elements in the queue, and if there are more elements then put! will wait until one has been processed.

Can you to share a working example that’s closer to your real problem? It’s hard to tell what the issue is at this point.

1 Like

I have written some example code that simulates that actual serial and (current version of) multithreaded code in both performace and structure. It should be a good test bed moving forward:

function Test_Serial()

    # allocate arrays, actual code has a pair of arrays rather than just one
    Array1Size = (22,20,20,20,20,20)
    ArrayOfValues1 = zeros(Float32,Array1Size)
    ArrayOfTallys1 = zeros(UInt32,Array1Size)
    Array2Size = (20,20,20,20)
    ArrayOfValues2 = zeros(Float32,Array2Size)
    ArrayOfTallys2 = zeros(UInt32,Array2Size)

    numSamples = 8000

    #perform MC sampling
    Test_Serial_MC!(ArrayOfValues1,ArrayOfTallys1,ArrayOfValues2,ArrayOfTallys2,numSamples)

    # some saving of arrays goes on here

end

function Test_Serial_MC!(ArrayOfValues1,ArrayOfTallys1,ArrayOfValues2,ArrayOfTallys2,numSamples)

    ValueSample = zeros(Float32,3) # MC samples three points

    for i in 1:numSamples
        ValueSample[3] = Value()
        (loc1,loc2,loc3,loc4) = (Loc(),Loc(),Loc(),Loc())
        ArrayOfValues2[loc1,loc2,loc3,loc4] += ValueSample[3]
        ArrayOfTallys2[loc1,loc2,loc3,loc4] += UInt32(1)
        if ValueSample[3] != 0
            for j in 1:100
                ValueSample[2:3] .= (Value(),Value())
                (loc5,loc6) = (Loc2(),Loc())
                (loc5p,loc6p) = (Loc2(),Loc())
                if (1<=loc5<=20)
                    ArrayOfValues1[loc5+2,loc6,loc1,loc2,loc3,loc4] += ValueSample[1]
                elseif (loc5>20) # over goes in second position
                    ArrayOfValues1[2,loc6,loc1,loc2,loc3,loc4] += ValueSample[1]
                else # under goes in second position
                    ArrayOfValues1[1,loc6,loc1,loc2,loc3,loc4] += ValueSample[1]
                end
                @view(ArrayOfTallys1[:,loc6,loc1,loc2,loc3,loc4]) .+= UInt32(1)

                if (1<=loc5p<=20)
                    ArrayOfValues1[loc5p+2,loc6,loc1,loc2,loc3,loc4] += ValueSample[1]
                elseif (loc5p>20) # over goes in second position
                    ArrayOfValues1[2,loc6,loc1,loc2,loc3,loc4] += ValueSample[1]
                else # under goes in second position
                    ArrayOfValues1[1,loc6,loc1,loc2,loc3,loc4] += ValueSample[1]
                end
                if loc6 != loc6p
                    @view(ArrayOfTallys1[:,loc6p,loc1,loc2,loc3,loc4]) .+= UInt32(1)
                end
            end
        else 
            @view(ArrayOfTallys1[:,:,loc1,loc2,loc3,loc4]) .+= UInt32(1)
        end
    end
end

function Test_MultiThread()

    # allocate arrays, actual code has a pair of arrays rather than just one
    Array1Size = (22,20,20,20,20,20)
    ArrayOfValues1 = zeros(Float32,Array1Size)
    ArrayOfTallys1 = zeros(UInt32,Array1Size)
    Array2Size = (20,20,20,20)
    ArrayOfValues2 = zeros(Float32,Array2Size)
    ArrayOfTallys2 = zeros(UInt32,Array2Size)

    numSamples = 1000 # 8 workers each sampling numSamples
    
    # set up channel 
    task_queue = Channel{Task}(Inf)

    # set up consumer 
    task_consumer = Threads.@spawn begin
        for task in task_queue
            schedule(task)
            wait(task)
        end
    end

    #perform MC sampling
    workers = [Test_MultiThread_MC!(ArrayOfValues1,ArrayOfTallys1,ArrayOfValues2,ArrayOfTallys2,numSamples,task_queue) for i in 1:8]

    # check if all tasks complete
    sleep(10)
    println(sum(ArrayOfTallys2)) # should equal 8*numSamples

    close(task_queue) # close queue to free up memory?
    
    # some saving of arrays goes on here

end

function Test_MultiThread_MC!(ArrayOfValues1,ArrayOfTallys1,ArrayOfValues2,ArrayOfTallys2,numSamples,task_queue)

    Threads.@spawn begin

    ValueSample = zeros(Float32,3) # MC samples three points

    for i in 1:numSamples
        ValueSample[3] = Value()
        (loc1,loc2,loc3,loc4) = (Loc(),Loc(),Loc(),Loc())
        task = @task ArrayOfValues2[loc1,loc2,loc3,loc4] += ValueSample[3]
        put!(task_queue,task)
        task = @task ArrayOfTallys2[loc1,loc2,loc3,loc4] += UInt32(1)
        put!(task_queue,task)
        if ValueSample[3] != 0
            for j in 1:100
                ValueSample[2:3] .= (Value(),Value())
                (loc5,loc6) = (Loc2(),Loc())
                (loc5p,loc6p) = (Loc2(),Loc())
                if (1<=loc5<=20)
                    task = @task ArrayOfValues1[loc5+2,loc6,loc1,loc2,loc3,loc4] += ValueSample[1]
                elseif (loc5>20) # over goes in second position
                    task = @task ArrayOfValues1[2,loc6,loc1,loc2,loc3,loc4] += ValueSample[1]
                else # under goes in second position
                    task = @task ArrayOfValues1[1,loc6,loc1,loc2,loc3,loc4] += ValueSample[1]
                end
                put!(task_queue,task)
                task = @task @view(ArrayOfTallys1[:,loc6,loc1,loc2,loc3,loc4]) .+= UInt32(1)
                put!(task_queue,task)

                if (1<=loc5p<=20)
                    task = @task ArrayOfValues1[loc5p+2,loc6,loc1,loc2,loc3,loc4] += ValueSample[1]
                elseif (loc5p>20) # over goes in second position
                    task = @task ArrayOfValues1[2,loc6,loc1,loc2,loc3,loc4] += ValueSample[1]
                else # under goes in second position
                    task = @task ArrayOfValues1[1,loc6,loc1,loc2,loc3,loc4] += ValueSample[1]
                end
                put!(task_queue,task)
                if loc6 != loc6p
                    task = @task @view(ArrayOfTallys1[:,loc6p,loc1,loc2,loc3,loc4]) .+= UInt32(1)
                    put!(task_queue,task)
                end
            end
        else 
            task = @task @view(ArrayOfTallys1[:,:,loc1,loc2,loc3,loc4]) .+= UInt32(1)
            put!(task_queue,task)
        end
    end
    end # threads
end

function Loc()
    return rand(1:20)
end

function Loc1()
    return rand(0:20)
end

function Loc2()
    return rand(0:21)
end

function Value()
    # this takes about 50ns but sleep(0) is too long so below sorting is a hacky fix 
    list = rand(2)
    sort(list)

    return rand(0:50)
end

when benchmarked for a total of 8000 samples:

@btime Test_Serial()
  248.969 ms (3147209 allocations: 778.44 MiB)
@time Test_MultiThread()
 10.332610 seconds (24.95 M allocations: 2.859 GiB, 9.64% gc time)

There is a sleep(10) in the multithreaded code to ensure that all tasks have been completed. My guess is that there must be something in how the tasks are being performed that is slow compaired to the serial array editing.

1 Like

Drop the sleep(10). Replace it with

    wait.(workers)
    close(task_queue) # close queue stop consumer
    wait(task_consumer)
    println(sum(ArrayOfTallys2)) # should equal 8*numSamples

Since the 4 last indices are the same in the loop, you can clean up the code by creating a view before the inner loop:
valview2 = @view ArrayOfValues1[:,:,loc1,loc2,loc3,loc4]
When the loc6 has been found inside the loop you might also create another view
valview1 = @view valview2[:, loc6]. You can do the same with ArrayOfTallys1. The code will be much more readable.

I think the scheduling of a task for every assignment is too much overhead. If you use the Channel approach it’s probably better to send over an array (or the above mentioned view), an index, and the value as a tuple. Like
put!(channel, (valview1, loc5+1, ValueSample[1]), and let the consumer do something like

for (arr, ind, val) in channel
   arr[ind] += val
end

And one must take care to make it type stable. A better alternative would be to have a channel and consumer for each of the arrays, so just the index-value pair is sent. I.e. a Channel{Tuple{CartesianIndex{6}, Float32}}.

However, I suspect that even the atomic operations on the channel behind the scenes will add too much overhead. When timing with @time in Julia 1.11, it reports around 1 million lock conflicts, so you do not get rid of locks in this way.

To put it bluntly, I suspect that there is too little work being done between each necessary synchronization (be it locking, Atomic something) to benefit from parallelization.

Now, since the inner loop only accesses a 22x20 matrix (indexed by loc5, loc6), it might be beneficial to create a local such matrix in each @spawn’ed task, zero it before the inner loop, and merge it into the large array after the inner loop (protected by a lock). This would give you at least a few hundred instructions between each lock.

I.e. something like this:

# There's a lk = ReentrantLock() common to all threads
@spawn begin
    localvalarray = zeros(Float32, size(ArrayOfValues)[1:2])
    for i in 1:numSamples
       ...
       fill!(localvalarray, 0)
       for j in 1:100
           <update localvalarray instead of ArrayOfValues1>
       end
       @lock lk begin
           @view(ArraysOfValues[:,:, loc1, etc]) .+= localvalarray
       end
    end
end

You can even let lk be an array of locks, indexed by e.g. loc4 to lock just a section of the array.

2 Likes

Thanks! I didn’t realize how fast the workers were make Channel overhead more significant. If a thread can generate ~30,000 values per second then the overhead of most forms of parallelism is going to be too high.

If each update is a sum, could you have the workers each generate their own array, then add the arrays together? E.g.

function Test_Serial()

    # allocate arrays, actual code has a pair of arrays rather than just one
    Array1Size = (22,20,20,20,20,20)
    ArrayOfValues1 = zeros(Float32,Array1Size)
    ArrayOfTallys1 = zeros(UInt32,Array1Size)
    Array2Size = (20,20,20,20)
    ArrayOfValues2 = zeros(Float32,Array2Size)
    ArrayOfTallys2 = zeros(UInt32,Array2Size)

    numSamples = 8000

    #perform MC sampling
    Test_Serial_MC!(ArrayOfValues1,ArrayOfTallys1,ArrayOfValues2,ArrayOfTallys2,numSamples)

    # some saving of arrays goes on here
    return ArrayOfValues1
end

function Test_Serial_MC!(ArrayOfValues1,ArrayOfTallys1,ArrayOfValues2,ArrayOfTallys2,numSamples)

    ValueSample = zeros(Float32,3) # MC samples three points

     for i in 1:numSamples
        ValueSample[3] = Value()
        (loc1,loc2,loc3,loc4) = (Loc(),Loc(),Loc(),Loc())
        ArrayOfValues2[loc1,loc2,loc3,loc4] += ValueSample[3]
        ArrayOfTallys2[loc1,loc2,loc3,loc4] += UInt32(1)
        if ValueSample[3] != 0
            for j in 1:100
                ValueSample[2:3] .= (Value(),Value())
                (loc5,loc6) = (Loc2(),Loc())
                (loc5p,loc6p) = (Loc2(),Loc())
                if (1<=loc5<=20)
                    ArrayOfValues1[loc5+2,loc6,loc1,loc2,loc3,loc4] += ValueSample[1]
                elseif (loc5>20) # over goes in second position
                    ArrayOfValues1[2,loc6,loc1,loc2,loc3,loc4] += ValueSample[1]
                else # under goes in second position
                    ArrayOfValues1[1,loc6,loc1,loc2,loc3,loc4] += ValueSample[1]
                end
                @view(ArrayOfTallys1[:,loc6,loc1,loc2,loc3,loc4]) .+= UInt32(1)

                if (1<=loc5p<=20)
                    ArrayOfValues1[loc5p+2,loc6,loc1,loc2,loc3,loc4] += ValueSample[1]
                elseif (loc5p>20) # over goes in second position
                    ArrayOfValues1[2,loc6,loc1,loc2,loc3,loc4] += ValueSample[1]
                else # under goes in second position
                    ArrayOfValues1[1,loc6,loc1,loc2,loc3,loc4] += ValueSample[1]
                end
                if loc6 != loc6p
                    @view(ArrayOfTallys1[:,loc6p,loc1,loc2,loc3,loc4]) .+= UInt32(1)
                end
            end
        else 
            @view(ArrayOfTallys1[:,:,loc1,loc2,loc3,loc4]) .+= UInt32(1)
        end
    end
    return ArrayOfValues1
end

arrays = fetch.([Threads.@spawn Test_Serial() for i in 1:8]);
combined_array = sum(arrays);
1 Like

I am hesitant to take this approach as if each thread spawns its own set of arrays then I will quickly run out of system memory, particularly as I increase the number of threads.

I have implemented your suggestions, in particular the local 2D arrays and changing the channel definitions and adding separate channels for each array. The new code is:

function Test_MultiThread_Updated()

    # allocate arrays, actual code has a pair of arrays rather than just one
    Array1Size = (22,20,20,20,20,20)
    ArrayOfValues1 = zeros(Float32,Array1Size)
    ArrayOfTallys1 = zeros(UInt32,Array1Size)
    Array2Size = (20,20,20,20)
    ArrayOfValues2 = zeros(Float32,Array2Size)
    ArrayOfTallys2 = zeros(UInt32,Array2Size)

    numSamples = 100000 # 8 workers each sampling numSamples
    
    # set up channels
    task_queue_Value2 = Channel{Tuple{Array{Float32, 4},CartesianIndex{4},Float32}}(Inf)
    task_queue_Tally2 = Channel{Tuple{Array{UInt32, 4},CartesianIndex{4},UInt32}}(Inf)

    # set up consumers 
    task_consumer_Value2 = Threads.@spawn begin
        for (arr, ind, val) in task_queue_Value2
            arr[ind] += val
        end
    end
    task_consumer_Tally2 = Threads.@spawn begin
        for (arr, ind, val) in task_queue_Tally2
            arr[ind] += val
        end
    end

    #perform MC sampling
    workers = [Test_MultiThread_MC_Updated!(ArrayOfValues1,ArrayOfTallys1,ArrayOfValues2,ArrayOfTallys2,numSamples,task_queue_Value2,task_queue_Tally2) for i in 1:8]

    # check if all tasks complete
    wait.(workers)
    close(task_queue_Value2)
    close(task_queue_Tally2)
    wait(task_consumer_Value2)
    wait(task_consumer_Tally2)

    println(sum(ArrayOfTallys2)) # should equal 8*numSamples
    println(sum(ArrayOfTallys1))
    println(sum(ArrayOfValues2))
    println(sum(ArrayOfValues1))
    
    # some saving of arrays goes on here

end

function Test_MultiThread_MC_Updated!(ArrayOfValues1,ArrayOfTallys1,ArrayOfValues2,ArrayOfTallys2,numSamples,task_queue_Value2,task_queue_Tally2)

    lk = ReentrantLock()

    Threads.@spawn begin

    ValueSample = zeros(Float32,3) # MC samples three points

    localValue1 = zeros(Float32,size(ArrayOfValues1)[1:2])
    localTally1 = zeros(Float32,size(ArrayOfTallys1)[1:2])

    for i in 1:numSamples
        ValueSample[3] = Value()
        (loc1,loc2,loc3,loc4) = (Loc(),Loc(),Loc(),Loc())
        loc1234 = CartesianIndex(loc1,loc2,loc3,loc4)
        put!(task_queue_Value2,(ArrayOfValues2,loc1234,ValueSample[3]))
        put!(task_queue_Tally2,(ArrayOfTallys2,loc1234,UInt32(1)))

        ArrayOfValues1View = @view(ArrayOfValues1[:,:,loc1234])
        ArrayOfTallys1View = @view(ArrayOfTallys1[:,:,loc1234])

        fill!(localValue1,0f0)
        fill!(localTally1,UInt32(0))

        if ValueSample[3] != 0
            for j in 1:100
                ValueSample[2:3] .= (Value(),Value())
                (loc5,loc6) = (Loc2(),Loc())
                (loc5p,loc6p) = (Loc2(),Loc())

                if (1<=loc5<=20)
                    localValue1[loc5+2,loc6] += ValueSample[1]
                elseif (loc5>20) # over goes in second position
                    localValue1[2,loc6] += ValueSample[1]
                else # under goes in second position
                    localValue1[1,loc6] += ValueSample[1]
                end
                @view(localTally1[:,loc6]) .+= UInt32(1)

                if (1<=loc5p<=20)
                    localValue1[loc5p+2,loc6p] += ValueSample[2]
                elseif (loc5p>20) # over goes in second position
                    localValue1[2,loc6p] += ValueSample[2]
                else # under goes in second position
                    localValue1[1,loc6p] += ValueSample[2]
                end
                if loc6 != loc6p
                    @view(localTally1[:,loc6p]) .+= UInt32(1)
                end
            end
            @lock lk begin
                ArrayOfValues1View .+= localValue1
            end
        else 
            @view(localTally1[:,:]) .+= UInt32(1)
        end
        @lock lk begin
            ArrayOfTallys1View .+= localTally1
        end
    end
    end # threads
end

with rough timing benchmarks for 80000 samples

Test_Serial()
 10.617903 seconds (315.36 M allocations: 24.022 GiB, 11.80% gc time)
Test_MultiThread_Updated()
  9.111024 seconds (317.03 M allocations: 24.123 GiB, 45.14% gc time)

This suggestions I feel would be ideal as it is unlikely that different threads will need to access same sections of the array at the same time and could be used instead of the channel approach on all the arrays. In practice though, I am not sure how to implement it (I’ve never seen anyone use more than one lock), how would you go about doing it?

You create the locks before you spawn anything, e.g. with

locks = [ReentrantLock() for _ in 1:20]

Then, inside the spawned task you find loc1 ... loc6 and do the update of the local 22x20 array, and just use the appropriate lock for updating the large array:

@lock locks[loc4] begin
   @view(largearray[:, :, loc1, ..., loc4]) .+= localarray
end

The macro @lock does the same as your original try ... finally construction.

If there are still collisions you can even make a matrix of locks:

locks = [ReentrantLock() for _ in 1:20, _ in 1:20]

and use it with two indices

@lock locks[loc3, loc4] begin
    ...
end
1 Like

Can you post a fully self-contained simple sample code?

Fully self-contained means: If I copy-paste the code-block into the REPL, then it runs. You must check that before posting!

All samples I have seen so far are incomplete. For example,

  nested task error: UndefVarError: `Value` not defined in `Main`

This means that it is not possible to reasonably contribute to the discussion.

You need to do provide a reasonable definition like e.g. Value() = rand() < 0.9 ? rand() : 0.0 if your Value() is supposed to output Float64 with a reasonable chance of exact zeros.

Ideally your sample code would be simplified: No locks, no tasks, no multi-threading. Just a concise problem description.

Ideally you profile against your real application that timings/bottlenecks are similar enough with the mockup Loc() / Value() implementations that you post here.

1 Like

For this simple update it is also possible to use a Threads.Spinlock instead of a ReentrantLock. The difference is that if the lock is already held, the ReentrantLock will fall to sleep and yield to other tasks, and is woken up as soon as the lock is released. A Spinlock will spin-wait in a tight loop without sleeping or yielding to anything, so there will be less overhead when the lock is released.

1 Like

The full code was posted in this post. I have written Loc() and Value() to simulate the actual code such that they have similar performance.

1 Like

I’ve managed to get the lock approach to now be faster than the Serial version

function Test_MultiThread_Updated_Locks()

    # allocate arrays, actual code has a pair of arrays rather than just one
    Array1Size = (22,20,20,20,20,20)
    ArrayOfValues1 = zeros(Float32,Array1Size)
    ArrayOfTallys1 = zeros(UInt32,Array1Size)
    Array2Size = (20,20,20,20)
    ArrayOfValues2 = zeros(Float32,Array2Size)
    ArrayOfTallys2 = zeros(UInt32,Array2Size)

    numSamples = 100000 # 8 workers each sampling numSamples
    
    # set up locks 
    locks = Threads.SpinLock()
    # locks = [Threads.SpinLock() for _ in 1:20]

    #perform MC sampling
    workers = [Test_MultiThread_MC_Updated_Locks!(ArrayOfValues1,ArrayOfTallys1,ArrayOfValues2,ArrayOfTallys2,numSamples,locks) for i in 1:8]

    # check if all tasks complete
    wait.(workers)

    println(sum(ArrayOfTallys2)) # should equal 8*numSamples
    #println(sum(ArrayOfTallys1))
    #println(sum(ArrayOfValues2))
    #println(sum(ArrayOfValues1))
    
    # some saving of arrays goes on here

    return nothing

end

function Test_MultiThread_MC_Updated_Locks!(ArrayOfValues1::Array{Float32,6},ArrayOfTallys1::Array{UInt32,6},ArrayOfValues2::Array{Float32,4},ArrayOfTallys2::Array{UInt32,4},numSamples::Int64,locks)

    Threads.@spawn begin

    ValueSample = zeros(Float32,3) # MC samples three points

    localValue1 = zeros(Float32,size(ArrayOfValues1)[1:2])
    localTally1 = zeros(Float32,size(ArrayOfTallys1)[1:2])

    for i in 1:numSamples
        ValueSample[3] = Value()
        (loc1,loc2,loc3,loc4) = (Loc(),Loc(),Loc(),Loc())
        loc1234 = CartesianIndex(loc1,loc2,loc3,loc4)

        fill!(localValue1,0f0)
        fill!(localTally1,UInt32(0))

        if ValueSample[3] != 0
            for j in 1:100
                ValueSample[2:3] .= (Value(),Value())
                (loc5,loc6) = (Loc2(),Loc())
                (loc5p,loc6p) = (Loc2(),Loc())

                if (1<=loc5<=20)
                    localValue1[loc5+2,loc6] += ValueSample[1]
                elseif (loc5>20) # over goes in second position
                    localValue1[2,loc6] += ValueSample[1]
                else # under goes in second position
                    localValue1[1,loc6] += ValueSample[1]
                end
                @view(localTally1[:,loc6]) .+= UInt32(1)

                if (1<=loc5p<=20)
                    localValue1[loc5p+2,loc6p] += ValueSample[2]
                elseif (loc5p>20) # over goes in second position
                    localValue1[2,loc6p] += ValueSample[2]
                else # under goes in second position
                    localValue1[1,loc6p] += ValueSample[2]
                end
                if loc6 != loc6p
                    @view(localTally1[:,loc6p]) .+= UInt32(1)
                end
            end
        else 
            @view(localTally1[:,:]) .+= UInt32(1)
        end
        
        @lock locks #=locks[loc4]=# begin
            ArrayOfValues2[loc1234] += ValueSample[3] 
            ArrayOfTallys2[loc1234] += UInt32(1)
            @view(ArrayOfValues1[:,:,loc1234]) .+= localValue1
            @view(ArrayOfTallys1[:,:,loc1234]) .+= localTally1
        end
    end
    end # threads

end

With a single Lock the code timing is now (for 80000 samples):

@time Test_MultiThread_Updated_Locks()
  5.910963 seconds (315.28 M allocations: 24.016 GiB, 44.01% gc time)

compared to

@time Test_Serial()
 12.071050 seconds (316.89 M allocations: 27.045 GiB, 12.22% gc time

so about a 2x speed up.

Interestingly, using an array of locks (commented code) locks = [Threads.SpinLock() for _ in 1:20] with ...@lock locks[loc4] begin... doesn’t generate any speedup:

@time Test_MultiThread_Updated_Locks()
  7.186602 seconds (315.34 M allocations: 24.020 GiB, 53.04% gc time)

This suggest to me that the code is still treating each lock in the array as the same lock. Can the locks themselves be given a unique ID, perhaps a function argument like SpinLock(ID)?

No, they are separate locks, definitely. A SpinLock is just a single Int which is accessed atomically when trying to lock it, there are no references to anything else. Different SpinLocks are wholly unrelated. However, there will still be some memory synchronization going on which may slow things down.

Also, you may hit some memory bandwidth or latency limit. That is hard to speculate about without looking closer with cpu profiling packages like LIKWID or LinuxPerf, which requires hardware knowledge to benefit from.

But where do all the allocations come from?

Can you fix the code? Such that we don’t use your bugs to improve performance and then you find out that this doesn’t apply in real-life?

  1. ValueSample[1] is never modified. So it is constant zero, so most of your array updates can be optimized out.
  2. The construction of Loc2() is bad API. I would wrap it into e.g.
function Loc2X()
    r = Loc2()
    1 <= r <= 20 ? r+2 : r>20 ? 2 : 1
end

and then simplify the updates into loc5x = Loc2X() and then ArrayOfValues[loc5x, ...] += .... Can this be done or is there hidden code that prevents this?
3. ArrayOfTallys1 is only ever updated with ::Colon in the first position. So you could instead write

    ArrayOfTallys1 = zeros(UInt32,Base.tail(Array1Size))

and then skip the colon / @view / broadcast in most updates. I.e. @view(ArrayOfTallys1[:,loc6,loc1,loc2,loc3,loc4]) .+= UInt32(1) becomes ArrayOfTallys1[loc6,loc1,loc2,loc3,loc4] += UInt32(1) and
@view(ArrayOfTallys1[:,:,loc1,loc2,loc3,loc4]) .+= UInt32(1) becomes @view(ArrayOfTallys1[:,loc1,loc2,loc3,loc4]) .+= UInt32(1).
4. Can you please refactor such that ValueSample is not a vector anymore? i.e. use 3 scalars, like v3 = Value(). This is by the way how I spotted problem (1) during refactoring of your code.

edit PS:

  1. Your Value() mockup allocates. I get that you need to do something that takes an adequate amount of time – but can you do something non-allocating instead? This simplifies benchmarking/profiling for all of us. For example you could import StaticArrays and create and hash a random SVector (you adjust its size until it adequately matches the real runtime). In order to ensure that the compiler doesn’t optimize this out, you add a check (hash(rand(SVector{4, Int32})) == 0 && throw("bad luck")).
2 Likes

Thank you for all the excellent comments/suggestions. I have done as you asked and written the test code, which is at the bottom of this post, and well replicates the actual code (hopefully bug free). It includes a serial function Test_Serial(), and two multi-threaded functions based on the comments of all the wonderful people in the thread, Test_MultiThread_Channel() using a channel approach and Test_MultiThread_Locks() using an array of locks approach. They benchmark like:

@time Test_Serial()
  9.404520 seconds (37 allocations: 281.986 MiB)

@time Test_MultiThread_Channel()
  2.381187 seconds (5.60 M allocations: 867.951 MiB, 6.25% gc time)

@time Test_MultiThread_Locks()
  1.502663 seconds (115 allocations: 282.006 MiB)

The performance seems to be in line with the 8x speed up expected by using 8 threads over 1. The channel approach still has quite a large number of allocations in comparison to the other two, not sure what is going on there?

I am not sure if there is anywhere left to squeeze out any last bits of performance?

Full code:

using StaticArrays

function Test_Serial()

    # allocate arrays, actual code has a pair of arrays rather than just one
    Array1Size = (22,20,20,20,20,20)
    ArrayOfValues1 = zeros(UInt32,Array1Size)
    ArrayOfTallys1 = zeros(UInt32,Base.tail(Array1Size))
    Array2Size = (20,20,20,20)
    ArrayOfValues2 = zeros(UInt32,Array2Size)
    ArrayOfTallys2 = zeros(UInt32,Array2Size)

    numSamples = 800000

    #perform MC sampling
    Test_Serial_MC!(ArrayOfValues1,ArrayOfTallys1,ArrayOfValues2,ArrayOfTallys2,numSamples)

    println(sum(ArrayOfTallys2)) # should equal 8*numSamples
    println(sum(ArrayOfTallys1))
    println(sum(ArrayOfValues2))
    println(sum(ArrayOfValues1))
    # some saving of arrays goes on here

    return nothing

end

function Test_Serial_MC!(ArrayOfValues1::Array{UInt32,6},ArrayOfTallys1::Array{UInt32,5},ArrayOfValues2::Array{UInt32,4},ArrayOfTallys2::Array{UInt32,4},numSamples)

    localValue1 = zeros(UInt32,size(ArrayOfValues1)[1:2])
    localTally1 = zeros(UInt32,size(ArrayOfTallys1)[1])

    for i in 1:numSamples
        Value3 = Value()
        (loc1,loc2,loc3,loc4) = (Loc(),Loc(),Loc(),Loc())
        loc1234 = CartesianIndex(loc1,loc2,loc3,loc4)

        fill!(localTally1,UInt32(0))

        if Value3 != UInt32(0)
            fill!(localValue1,UInt32(0))
            for j in 1:100
                (Value1,Value2) = (Value(),Value())
                (loc5,loc6) = (Loc2(),Loc())
                loc5x = Loc2X(loc5)
                (loc5p,loc6p) = (Loc2(),Loc())
                loc5xp = Loc2X(loc5p)
                localValue1[loc5x,loc6] += Value1
                localTally1[loc6] += UInt32(1)
                localValue1[loc5xp,loc6] += Value2
                if loc6 != loc6p
                    localTally1[loc6p] += UInt32(1)
                end
            end
        else 
            localTally1 .+= UInt32(1)
        end

        ArrayOfValues2[loc1234] += Value3
        ArrayOfTallys2[loc1234] += UInt32(1)
        @view(ArrayOfValues1[:,:,loc1234]) .+= localValue1
        @view(ArrayOfTallys1[:,loc1234]) .+= localTally1

    end
end

function Test_MultiThread_Channel()

    # allocate arrays, actual code has a pair of arrays rather than just one
    Array1Size = (22,20,20,20,20,20)
    ArrayOfValues1 = zeros(UInt32,Array1Size)
    ArrayOfTallys1 = zeros(UInt32,Base.tail(Array1Size))
    Array2Size = (20,20,20,20)
    ArrayOfValues2 = zeros(UInt32,Array2Size)
    ArrayOfTallys2 = zeros(UInt32,Array2Size)

    numSamples = 100000 # 8 workers each sampling numSamples
    
    # set up channel 
    task_queue = Channel{Task}(Inf)

    # set up consumer 
    task_consumer = Threads.@spawn begin
        for task in task_queue
            schedule(task)
            wait(task)
        end
    end

    #perform MC sampling
    workers = [Test_MultiThread_MC_Channel!(ArrayOfValues1,ArrayOfTallys1,ArrayOfValues2,ArrayOfTallys2,numSamples,task_queue) for i in 1:8]

    # check if all tasks complete
    wait.(workers)
    close(task_queue)
    wait(task_consumer)

    println(sum(ArrayOfTallys2)) # should equal 8*numSamples
    println(sum(ArrayOfTallys1))
    println(sum(ArrayOfValues2))
    println(sum(ArrayOfValues1))
    # some saving of arrays goes on here

end

function Test_MultiThread_MC_Channel!(ArrayOfValues1::Array{UInt32,6},ArrayOfTallys1::Array{UInt32,5},ArrayOfValues2::Array{UInt32,4},ArrayOfTallys2::Array{UInt32,4},numSamples,task_queue::Channel{Task})

    Threads.@spawn begin

    localValue1 = zeros(UInt32,size(ArrayOfValues1)[1:2])
    localTally1 = zeros(UInt32,size(ArrayOfTallys1)[1])

    for i in 1:numSamples
        Value3 = Value()
        (loc1,loc2,loc3,loc4) = (Loc(),Loc(),Loc(),Loc())
        loc1234 = CartesianIndex(loc1,loc2,loc3,loc4)

        fill!(localTally1,UInt32(0))

        if Value3 != UInt32(0)
            fill!(localValue1,UInt32(0))
            for j in 1:100
                (Value1,Value2) = (Value(),Value())
                (loc5,loc6) = (Loc2(),Loc())
                loc5x = Loc2X(loc5)
                (loc5p,loc6p) = (Loc2(),Loc())
                loc5xp = Loc2X(loc5p)
                localValue1[loc5x,loc6] += Value1
                localTally1[loc6] += UInt32(1)
                localValue1[loc5xp,loc6] += Value2
                if loc6 != loc6p
                    localTally1[loc6p] += UInt32(1)
                end
            end
        else 
            localTally1 .+= UInt32(1)
        end
        
        task = @task begin 
            ArrayOfValues2[loc1234] += Value3
            ArrayOfTallys2[loc1234] += UInt32(1)
            @view(ArrayOfValues1[:,:,loc1234]) .+= localValue1
            @view(ArrayOfTallys1[:,loc1234]) .+= localTally1
        end
        put!(task_queue,task)
    end
    end # threads

end

function Test_MultiThread_Locks()

    # allocate arrays, actual code has a pair of arrays rather than just one
    Array1Size = (22,20,20,20,20,20)
    ArrayOfValues1 = zeros(UInt32,Array1Size)
    ArrayOfTallys1 = zeros(UInt32,Base.tail(Array1Size))
    Array2Size = (20,20,20,20)
    ArrayOfValues2 = zeros(UInt32,Array2Size)
    ArrayOfTallys2 = zeros(UInt32,Array2Size)

    numSamples = 100000 # 8 workers each sampling numSamples

    # set up locks 
    #locks = Threads.SpinLock()
    locks = [Threads.SpinLock() for _ in 1:20]

    #perform MC sampling
    workers = [Test_MultiThread_MC_Locks!(ArrayOfValues1,ArrayOfTallys1,ArrayOfValues2,ArrayOfTallys2,numSamples,locks) for i in 1:8]

    # check if all tasks complete
    wait.(workers)

    println(sum(ArrayOfTallys2)) # should equal 8*numSamples
    println(sum(ArrayOfTallys1))
    println(sum(ArrayOfValues2))
    println(sum(ArrayOfValues1))
    
    # some saving of arrays goes on here

    return nothing

end

function Test_MultiThread_MC_Locks!(ArrayOfValues1::Array{UInt32,6},ArrayOfTallys1::Array{UInt32,5},ArrayOfValues2::Array{UInt32,4},ArrayOfTallys2::Array{UInt32,4},numSamples,locks)

    Threads.@spawn begin

    localValue1 = zeros(UInt32,size(ArrayOfValues1)[1:2])
    localTally1 = zeros(UInt32,size(ArrayOfTallys1)[1])

    for i in 1:numSamples
        Value3 = Value()
        (loc1,loc2,loc3,loc4) = (Loc(),Loc(),Loc(),Loc())
        loc1234 = CartesianIndex(loc1,loc2,loc3,loc4)

        fill!(localTally1,UInt32(0))

        if Value3 != UInt32(0)
            fill!(localValue1,UInt32(0))
            for j in 1:100
                (Value1,Value2) = (Value(),Value())
                (loc5,loc6) = (Loc2(),Loc())
                loc5x = Loc2X(loc5)
                (loc5p,loc6p) = (Loc2(),Loc())
                loc5xp = Loc2X(loc5p)
                localValue1[loc5x,loc6] += Value1
                localTally1[loc6] += UInt32(1)
                localValue1[loc5xp,loc6] += Value2
                if loc6 != loc6p
                    localTally1[loc6p] += UInt32(1)
                end
            end
        else 
            localTally1 .+= UInt32(1)
        end
        
        @lock locks[loc4] begin
            ArrayOfValues2[loc1234] += Value3 
            ArrayOfTallys2[loc1234] += UInt32(1)
            @view(ArrayOfValues1[:,:,loc1234]) .+= localValue1
            @view(ArrayOfTallys1[:,loc1234]) .+= localTally1
        end
    end
    
    end # threads

end

function Loc()
    return rand(1:20)
end

function Loc2()
    return rand(0:21)
end

function Loc2X(loc)
    1 <= loc <= 20 ? loc+2 : loc>20 ? 2 : 1
end

function Value()
    # this takes about 50ns but sleep(0) is too long so below sorting is a hacky fix 
    (hash(rand(SVector{30, Int32})) == 0 && throw("bad luck"))

    a = rand(0:50)

    return UInt32(a)
end
3 Likes

You’re very bottlenecked on Value() invocations. Just extracting that:

julia> function testSimple(numSamples)
       for i=1:numSamples
       for k=1:4 Loc() end
       Value()
       for j=1:100
       Value(); Value()
       Loc2();Loc(); Loc2(); Loc();
       end
       end
       nothing
       end
testSimple (generic function with 1 method)

julia> @btime testSimple(800000)
  11.349 s (0 allocations: 0 bytes)
julia> @btime Test_Serial()
800000
153239930
20000237
4000419868
800000
153243671
19988291
3999821035
800000
153240450
20016438
3999947475
  11.906 s (44 allocations: 281.99 MiB)

The @task macro creates a closure under the hood, i.e. a callable struct. This is allocated.

A worse issue is that the channel approach looks wrong:

        task = @task begin 
            ArrayOfValues2[loc1234] += Value3
            ArrayOfTallys2[loc1234] += UInt32(1)
            @view(ArrayOfValues1[:,:,loc1234]) .+= localValue1
            @view(ArrayOfTallys1[:,loc1234]) .+= localTally1
        end
        put!(task_queue,task)

This puts a task to update values/tallys into the queue. The closure should take Value3, loc1234, ArrayOfValues2, ArrayOfTallys2, localValue1, localTally1.

The problem is that you then continue to update/reuse localValue1 and localTally1 in the worker thread. So subsequent writes to these values race with the reads in the consumer thread.

I am not sure if there is anywhere left to squeeze out any last bits of performance?

I think there is plenty of things to squeeze out, but you’re close to perfect scaling on 8 threads on your machine.

Try:

  1. Use more threads! One thread per logical core, with SMT / hyperthreading enabled, instead of one thread per logical core. Try it out! It is often hard to predict whether SMT helps or hurts. It can hurt for pure number crunching, where all your floating point ports are busy all the time; but often you’re latency constrained, and then SMT is close to a 2x performance win. By default julia tries to scale by physical cores, not by logical cores (imo a bad default for many workloads, but julia’s “numerical linear algebra” roots show).
  2. Use even more threads! e.g. test on a machine with 64 physical cores / 128 logical cores.

You will probably see that both the locking and the channel solutions scale terribly. But if you only care about 8 threads, then your benchmarks have demonstrated that the locking solution scales well enough – no need to spend more time optimizing that.

2 Likes