Fast Multi-Threaded Array Editing without data races?

This is likely to be unavoidable due to the nature of what Value() is approximating in the actual code. But I will look there to see if I can find any improvements.

Good spot! I re-wrote that part of the code (bettom) to remove the local arrays and make it race free, however this comes at the cost of having to assign a lot more tasks. Rather than one task per loop it becomes ~100, and the task_consumer is no longer able to keep up, leading to a massive slow down and huge memory usage (maxing out the 64GB on my system) as the task_queue fills up. I think that for this test case, the channel approach just has too much overhead to be effective.

I did some scaling benchmarks on my system (i7-12700, 12 Core(s), 20 Logical Processor(s), 64GB RAM):

@time Test_Serial()
num samples = 1000000
 11.818466 seconds (21 allocations: 281.985 MiB, 0.22% gc time)

@time Test_MultiThread_Locks()
num samples = 1000000, num Threads = 1
 12.109094 seconds (52 allocations: 281.987 MiB)

@time Test_MultiThread_Locks()
num samples = 4000000, num Threads = 4
 12.463613 seconds (72 allocations: 281.994 MiB, 0.10% gc time)

@time Test_MultiThread_Locks()
num samples = 8000000, num Threads = 8
 13.790676 seconds (100 allocations: 282.005 MiB)

@time Test_MultiThread_Locks()
num samples = 16000000, num Threads = 16
 19.694665 seconds (156 allocations: 282.027 MiB)

The scaling looks very good up to about 8 threads, given my array of locks is only 20 elements long, going above 8 threads the chances of lock conflicts increases and is probably causing the slow down from perfect scaling. This could be fixed I guess by adding more locks.

This is the plan for running the actual code!

Again thank you to everyone who has been so helpful on this thread!

Updated channel code that is race free:

function Test_MultiThread_Channel()

    # allocate arrays, actual code has a pair of arrays rather than just one
    Array1Size = (22,20,20,20,20,20)
    ArrayOfValues1 = zeros(UInt32,Array1Size)
    ArrayOfTallys1 = zeros(UInt32,Base.tail(Array1Size))
    Array2Size = (20,20,20,20)
    ArrayOfValues2 = zeros(UInt32,Array2Size)
    ArrayOfTallys2 = zeros(UInt32,Array2Size)

    numSamples = 100000 # 8 workers each sampling numSamples
    
    # set up channel 
    task_queue = Channel{Task}(Inf)

    # set up consumer 
    task_consumer = Threads.@spawn begin
        for task in task_queue
            schedule(task)
            wait(task)
        end
    end

    #perform MC sampling
    workers = [Test_MultiThread_MC_Channel!(ArrayOfValues1,ArrayOfTallys1,ArrayOfValues2,ArrayOfTallys2,numSamples,task_queue) for i in 1:8]

    # check if all tasks complete
    wait.(workers)
    close(task_queue)
    wait(task_consumer)

    println(sum(ArrayOfTallys2)) # should equal 8*numSamples
    println(sum(ArrayOfTallys1))
    println(sum(ArrayOfValues2))
    println(sum(ArrayOfValues1))
    # some saving of arrays goes on here

end

function Test_MultiThread_MC_Channel!(ArrayOfValues1::Array{UInt32,6},ArrayOfTallys1::Array{UInt32,5},ArrayOfValues2::Array{UInt32,4},ArrayOfTallys2::Array{UInt32,4},numSamples,task_queue::Channel{Task})

    Threads.@spawn begin

    for i in 1:numSamples
        Value3 = Value()
        (loc1,loc2,loc3,loc4) = (Loc(),Loc(),Loc(),Loc())
        loc1234 = CartesianIndex(loc1,loc2,loc3,loc4)

        ArrayOfTallys1View = @view(ArrayOfTallys1[:,loc1234])

        if Value3 != UInt32(0)
            ArrayOfValues1View = @view(ArrayOfValues1[:,:,loc1234])
            for j in 1:100
                (Value1,Value2) = (Value(),Value())
                (loc5,loc6) = (Loc2(),Loc())
                loc5x = Loc2X(loc5)
                (loc5p,loc6p) = (Loc2(),Loc())
                loc5xp = Loc2X(loc5p)
                task = @task begin 
                    ArrayOfValues1View[loc5x,loc6] += Value1
                    ArrayOfTallys1View[loc6] += UInt32(1)
                    ArrayOfValues1View[loc5xp,loc6] += Value2
                end
                put!(task_queue,task)
                if loc6 != loc6p
                    task = @task ArrayOfTallys1View[loc6p] += UInt32(1)
                    put!(task_queue,task)
                end
            end
        else 
            task = @task ArrayOfTallys1View .+= UInt32(1)
            put!(task_queue,task)
        end
        
        task = @task begin 
            ArrayOfValues2[loc1234] += Value3
            ArrayOfTallys2[loc1234] += UInt32(1)
        end
        put!(task_queue,task)
    end
    end # threads

end
1 Like

And when everything works, you can annotate loops and other assignments with @inbounds. Since you won’t be surprised by arrays with esoteric indexing, and much of the actual work is indexing, it might happen to save some cycles here, and even in some cases allow the compiler to vectorize.

1 Like

There is also a synchronization free method that can be used if loc4 is independent of the other locs. The tasks can work completely in their own section of the large array. You could “draw” loc4 in advance, @spawn 20 tasks, one for each value of loc4, with the number of samples to draw (typically close to 1/20 of the total number of samples). These can update the large array without any synchronization, since the other tasks have a different loc4. The tasks could work on a @view with a fixed loc4.

Note that this does not extend beyond 20 cpus (then you must fix e.g. loc3 too), and if you run on fewer you should yield() now and then, since the tasks will be completely cpu-bound. Without any yield() with 8 cpus, the first 8 tasks will run to completion, then the next 8, and finally only 4 i parallel, which isn’t very optimal.