Avoiding Race Condition using Channels

The multi-threading docs only mention lock and atomic operations (for primitives) as a way to avoid race conditions.

How about the Channel approach?

Is this solution a safe (and good practice) approach? (it seems to work in practice).

using Base.Threads

#julia started with julia --threads 4
Threads.nthreads()

acc_safe = Ref{Int}(0)
acc_unsafe = Ref{Int}(0)

chnl = Channel{Int}(32) 

function upacc()
  while true
    acc_safe[] += take!(chnl)
  end
end  

@async upacc()

@threads for _ in 1:10000
  put!(chnl, 1)
  acc_unsafe[] += 1
end

println("unsafe sum: $(acc_unsafe[])") # unsafe sum: 9926 (or some value less than 10000)
println("safe sum: $(acc_safe[])") # safe sum: 10000

This is part of a larger system - so the specifics about why there is a while loop and why is the channel still up after the computation are not relevant at this point.

Thank you.

yes, and no

in your case it works because upacc returns quickly. You have moved the race condition to between the end of the loop and the print.

If upacc is slower (I reduced the number of iterations so it would finish before tomorrow :slight_smile: )

function upacc()
  while true
    acc_safe[] += take!(chnl)
    sleep(1)
  end
end  

@async upacc()

@threads for _ in 1:40
  put!(chnl, 1)
  acc_unsafe[] += 1
end

then the print will occur before upacc has finished

$ julia -t 4 t.jl
unsafe sum: 30
safe sum: 8

So you need a way to know how to wait for upacc to finish.

You could do that with @sync and an in-band signal to upacc to stop

function upacc()
  v = take!(chnl)
  while v > 0
    acc_safe[] += v
    v = take!(chnl)
    sleep(1)
  end
end  

@sync begin 

   @async upacc()

   @threads for _ in 1:40
      put!(chnl, 1)
      acc_unsafe[] += 1
   end

   put!(chnl, 0)
end

$ julia -t 4 t.jl
unsafe sum: 40
safe sum: 40

It might also be > 10000, or < 0, etc.

Thank you for the answer and for pointing out the long-computation consequence in this scenario.

In my view, the issue you are pointing out (early printing) is not implying any real race condition (just that when printing the result, the computation is not yet finished). Is there something that I am missing?

My understanding at this point is that regardless of the speed of upacc there will be no multiple threads trying to read/write to/from acc_safe[] at the same time. Is this accurate?

Not so easy to split the 8 bytes of an int when printing to get an illegal value.
I’m interpreting your print as more akin to “do something with the produced value”

If you were updating multiple fields of a mutable struct in a tree, something like

mutable struct node
    value
    prev
    next
end

then you might update prev, then “print” it, then update next

I would call that a race condition.

“It will be OK” is the prelude for disaster :slight_smile:

The race condition is because the main thread might call println(acc[]) “at the same time” the asynchronous task is updating it.

Apart from the printing in the end, yes, this is correct. Channels are introduced in the manual as a way to synchronize asynchronous tasks, but they are thread-safe and can also be used in a multi-threads context.

Note that, as said above, you need some way to know when the computation is finished. Channels can help with that as well, especially if you can close them once the computation is finished. Your initial example could for example look like this:

using Base.Threads
Threads.nthreads()

chnl = Channel{Int}(Threads.nthreads())

task = @async let acc = 0  # acc is now a local variable; nobody else will access it
    for i in chnl          # iteration will stop when the channel is closed
        acc += i
    end
    acc                    # the task returns the final value of the accumulator
end
# or even, in this specific case:
# task = @async sum(chnl)


@threads for _ in 1:10000
  put!(chnl, 1)
end

close(chnl)  # close the channel once the computation is finished
fetch(task)  # wait for the task to finish and get its return value

Thanks - this answers my original question.