Fast Multi-Threaded Array Editing without data races?

Hi, I am fairly new to multi-threading in Juila and am having trouble finding the best procedure to avoid data races in an a section of code I am writing.

The code uses a monte carlo method to generate values that are then assigned to specific locations in a large 6D array based on some conditions and a second array is used to keep a tally of sampled points. Something like (I cannot share the actual code):

lk = ReentrantLock()
Threads.@threads :static for iThreads in 1:nThreads
 generatevalues!(val,somedata)
(loc1,loc2,loc3,loc4,loc5,loc6,somedata)=generatelocations(somedata)
 #update arrays
 begin
  lock(lk)
 try
  if (1<=loc1<=num1)
   ArrayofValues[loc1+2,loc2,loc3,loc4,loc5,loc6] += val
  elseif (loc1 > num1)
   ArrayofValues[2,loc2,loc3,loc4,loc5,loc6] += val
  else (loc1 < num1)
   ArrayofValues[1,loc2,loc3,loc4,loc5,loc6] += val
  end
   @view(ArrayofTallys[:,loc2,loc3,loc4,loc5,loc6] .+= 1
 finally
  unlock(lk)
 end
end

These arrays are large but in general so the likelihood of two threads editing the same elements of the arrays at the same time is low but non-zero, hence the use of the lock. However this tends to makes the code slower than the serial version. Is there an efficient way to edit small sections/views of arrays without needing to lock the whole thing? Or is there a better approach that I should attempt to take?

edits: square brackets for array indexing.

1 Like

Welcome! None of this looks like it would work. Julia indexes with square brackets (not parens), and it looks like you’re using the locs and val as both containers and scalars in a confusing manner.

Get the serial version working first, and then work towards multithreading if you need it (profile!).

1 Like

OhMyThreads.jl has quite a few very useful tools for multithreaded applications

2 Likes

I have edited the original post to include the square brackets that are in the actual code.

To clarify more, the code works and has already been optimised for serial running (as well as I can optimise). The code is sampling over 6D of phase space that has been discretised by a set of num bins. The locs are UInt32 values while the val is a Float32. The function generatevalues! mutates val for each monte carlo sample and then generatelocations! mutates the values of the locs to generate the location of the correct bin in the 6D phase space with which to add val in the ArrayofValues array.

How much of the time is in the indexing? If the array indexing isn’t your bottleneck, you could just have 1 thread who’s job is to do the array updates and only multithread generatevalues! and generatelocations!

2 Likes

Right, you can’t mutate UInt32 or Float32s — they’re immutable!

Generally, one strategy that can be useful is to generate a list of indices and values to update and then perform the updates sequentially at the end.

The indexing and assigning takes about the same time as the other function. Either way, if I did just assign one thread to assign the values in not sure if that would scale well.

The generatelocations! was just shorthand for some set of functions that revaluated the values of loc and val for each Monte Carlo sample. I have checked and this part of the code has no issues. I edited the original post to hopefully put it in a more accurate form.

The list of lists/vectors of indices with an associated value might work. Would there be any issues/anything I should be careful of when having multiple threads append to this list? Would I need to also lock the list?

yes. Doing this without a lock doesn’t work.

Some questions:

  • What do you want to happen if two threads would overwrite the same index? Are you ok with getting either value?

  • What is your intention with this line of code? @view(ArrayofTallys[:,loc2,loc3,loc4,loc5,loc6] .+= 1 Do you want to modify that whole row? (In which case you wouldn’t use @view

If I were approaching the problem, I would probably have your individual threads calculate their values then submit the intended updates to a Channel{Task}. The Channel would run on a single thread and be the only thing that directly accesses the array. If you’re interested in this approach I can write out a more detailed example.

3 Likes

If appending to this list of vectors also needs a lock, what would the the speed up compared to what I am currently doing?

some very simple thoughts that might still work:

are your functions that generate data and indices the main bottleneck?
if so, could you preallocate a buffer of values and one for indices for each Thread and simply write all the changes locally.

afterwards, you can iterate over the buffers in a single thread and sum up all the changes.

You can store the indices as a single number by using CartesianIndices

If you have enough memory you can of course also give each thread a copy of the array and in the end sum together all those arrays

To get accurate Monte Carlo results, it would need to sum both values. I.e. I cannot miss a value.

Yes, the whole row needs to be incremented by 1. What would your alternative to @view be?

If I were to do this, I am unclear how a bottleneck could be avoided if I have lots of threads calculating values and only a single one allocating to an array? Perhaps if you could provide more detail, that would be helpful.

The generating functions and the allocation process (in serial) take about equal time, and they are about as fast as I can get them (again in serial).

Storing the indices as a CartesianIndex is a good idea.

This would be great but the arrays are in general quite large (around 17GB for one case) so I can’t just make copies for each thread.

To get accurate Monte Carlo results, it would need to sum both values. I.e. I cannot miss a value.

Good to know, the channel approach can do that.

If I were to do this, I am unclear how a bottleneck could be avoided if I have lots of threads calculating values and only a single one allocating to an array? Perhaps if you could provide more detail, that would be helpful.

Assigning to an array is extremely fast, so it’s usually the case that something else is slowing you down, such as the if statement. Can you give me a sense of the number of writes per second you expect? For example, writing to 100,000 randomly selected indices takes ~55ms on my machine.

using BenchmarkTools
base_array = zeros(1_000_000)
rand_indices = rand(1:1_000_000, 100_000)
rand_values = rand(100_000)

@btime for (i, v) in zip(rand_indices, rand_values)
   base_array[i] = v
end
> 55.241 ms (1099356 allocations: 25.93 MiB)
1 Like

In fact, sorting the indices is likely to be a huge win for arrays this large — random access is slow.

1 Like

This depends a lot on how dense the sorted indices are.

Here’s an example of the channel approach:

task_queue = Channel{Task}(Inf)
task_consumer = Threads.@spawn begin
    for task in task_queue
         schedule(task)
         wait(task)
    end
end

function make_worker(task_queue)
    Threads.@spawn begin
         generatevalues!(val,somedata)
        (loc1,loc2,loc3,loc4,loc5,loc6,somedata)=generatelocations(somedata)
        if (1<=loc1<=num1)
           task = @task ArrayofValues[loc1+2,loc2,loc3,loc4,loc5,loc6] += val
        elseif (loc1 > num1)
           task = @task ArrayofValues[2,loc2,loc3,loc4,loc5,loc6] += val
        else (loc1 < num1)
           task = @task ArrayofValues[1,loc2,loc3,loc4,loc5,loc6] += val
       end
      put!(task_queue, task)
  end

workers = [make_worker(task_queue) for i in 1:8]

This approach will make 8 worker tasks, and one consumer task. The workers do as much work as possible, then pass a simple update task to the queue, which task_consumer processes. Since task_consumer is the only one directly touching ArrayOfValues, it’s thread-safe, and it will almost certainly process the tasks much faster than the workers can produce them.

2 Likes

Unless completely trivial, it’s a good idea to use Threads.@spawn for threading, or some of the tools in the OhMyThreads package. Locking is ok if the locked portion of the loop is minimal compared to the actual work being done. Otherwise the lock overhead can easily result in a slower threaded version than a serial one.

I’m a bit uncertain about your program, should there be loop around the generation? Ok, I assume so. I think I would have done something like this:

tasks = Task[]
for _ in 1:nthreads()
    push!(tasks,
      Threads.@spawn begin
        <allocate working memory for e.g. val or other stuff>
        idxval = Vector{Tuple{CartesianIndex{6}, Float32}}(undef, N)
        for i in 1:N  # whatever N is
            val = generatevalue(somedata)
            idx = generateindex(somedata)   # Assumed to be a CartesianIndex
            idxval[i] = (idx, val)
        end
        sort!(idxval)   # sort it to ensure data locality if it's reasonably dense, otherwise leave it.
        return idxval
      end)
end

for t in tasks
    # here we could merge all the index/value vectors, or take them one by one.
    for (id, v) in fetch(t)::Vector{Tuple{CartesianIndex{6}, Float32})
        ArrayofValues[id] += v
        # and the other logic
    end
end

All this should be put inside a function, or two functions, one with the @spawn loop, one with the fetch loop.

Conceptually this is not very different from the Channel approach, more like a verbose variant of it.

It’s not obvious that the idxval should be a Vector of a Tuple, it could also be a Vector{Any}, whatever is faster. But when it’s fetched it’s probably wise to annotate it with what’s in it. So the compiler knows.

The idea here is to avoid locks altogether, and separating the generation from the storing of values. If a huge number of values are generated, you may do it in smaller chunks to not use all your memory in the idxval vector.

1 Like

My system seems to be faster than yours taking only ~8.5ms to run that code. It must be the if statements slowing it down. When in serial, the whole process, for a single sample point takes ~100ns.

I will implement this and get back with some benchmarks.