How to lock variables in @threads

I am running a multithreaded parameter tuning for some ML algorithms.
I can avoid allocation in the input using CartesianIndices (thanks, stevengj !) but how can I avoid allocation in the output ? I would likely need to lock, in the example below, the variables bestError, bestPar1 and bestPar2, but how ?

par1 = [1,2]
par2 = [0.1,0.2,0.3]

doMyStuff(par1,par2) = abs(0-(-par1^3+par1^2+par1+par2^3-par2^2+par2+10))

# Version 1: Preallocation, computation and comparison
# Step A : preallocation
errorMatrix   = fill(Inf64,length(par1),length(par2))
# Step B: computation
function tuneParameters!(errorMatrix,par1,par2)
    Threads.@threads for ij in CartesianIndices((length(par1),length(par2)))
           (p1i, p2j)    = par1[Tuple(ij)[1]], par2[Tuple(ij)[2]]
           errorMatrix[Tuple(ij)...] = doMyStuff(p1i,p2j)
    end
    return errorMatrix
end
tuneParameters!(errorMatrix,par1,par2)
# Step C: comparison
bestError          = minimum(errorMatrix)
bestPar1, bestPar2 = par1[Tuple(argmin(errorMatrix))[1]], par2[Tuple(argmin(errorMatrix))[2]]

# Version 2: computation and comparision inside the loop
function tuneParametersB(par1,par2)
    # Step A : initialisation
    bestError = Inf64
    bestPar1  = nothing
    bestPar2  = nothing
    # Step B: computation and comparision
    Threads.@threads for ij in CartesianIndices((length(par1),length(par2)))
           (p1i, p2j)    = par1[Tuple(ij)[1]], par2[Tuple(ij)[2]]
           attempt       = doMyStuff(p1i,p2j)
           if(attempt < bestError)   # This would need a lock I presume...
               bestError = attempt
               bestPar1  = p1i
               bestPar2  = p2j
           end
    end
    return (bestError,bestPar1, bestPar2)
end
bestError , bestPar1, bestPar2 = tuneParametersB(par1,par2)

Edit: Crosspost on SO: multithreading - How to "lock" variables in @threads? - Stack Overflow

1 Like

I am trying to follow the documentation, but it doesn’t seems so simple… This result in a MethodError: no method matching lock(::Float64):

function tuneParametersC(par1,par2)
    # Step A : initialisation
    bestError = Inf64
    bestPar1  = nothing
    bestPar2  = nothing
    # Step B: computation and comparision
    Threads.@threads for ij in CartesianIndices((length(par1),length(par2)))
           (p1i, p2j)    = par1[Tuple(ij)[1]], par2[Tuple(ij)[2]]
           attempt       = doMyStuff(p1i,p2j)
           begin
               # lock(bestError,bestPar1,bestPar2) # doesn't work
               lock(bestError) # neither does this
               lock(bestPar1)
               lock(bestPar2)
               try
                   if(attempt < bestError) 
                       bestError = attempt
                       bestPar1  = p1i
                       bestPar2  = p2j
                   end
               finally
                   #unlock(bestError,bestPar1,bestPar2)
                   unlock(bestError)
                   unlock(bestPar1)
                   unlock(bestPar2)
               end
           end
    end
    return (bestError,bestPar1, bestPar2)
end

Maybe like this ?

function tuneParametersD(par1,par2)
    # Step A : initialisation
    bestError = Inf64
    bestPar1  = nothing
    bestPar2  = nothing
    compLock  = ReentrantLock()
    
    # Step B: computation and comparision
    Threads.@threads for ij in CartesianIndices((length(par1),length(par2)))
           (p1i, p2j)    = par1[Tuple(ij)[1]], par2[Tuple(ij)[2]]
           attempt       = doMyStuff(p1i,p2j)
           begin
               lock(compLock)
               try
                   if(attempt < bestError) 
                       bestError = attempt
                       bestPar1  = p1i
                       bestPar2  = p2j
                   end
               finally
                   unlock(compLock)
               end
           end
    end
    return (bestError,bestPar1, bestPar2)
end
bestError , bestPar1, bestPar2 = tuneParametersD(par1,par2)

It seems to work, but I didn’t (yet) tested in real code…

Do not use locks if you want to write fast data parallel programs. See A quick introduction to data parallelism in Julia for how to write findmin-type of calculation.

Thank you.
Is that true even if, like in mine case, the locked code is many order of magnitude faster than the unlocked one?
In the function in the example, doMyStuff() is the heavy computational part, while the lock code is just a conditional check and a couple of scalar assignment.
Using locks in these cases would also hurt performances ? On my early tests in real models, using 4 threads I get a 2x speed gain against single thread, but I don’t know how much of the difference with a theoretical 4x gain is due to the lock…

Generally, it is best to avoid intercore communication as much as possible. Apart from performance/scaling, this is to protect your sanity: Concurrent execution is hard to reason about; much easier to just have non-interacting parts.

The Threads.@threads macro is unfortunately very terrible, and I really recommend against using it for things that are not one-off scripts. It makes applications like yours a chore, it does not compose (if there is opportunity for parallelism inside of doMyStuff or outside of tuneParameters!), and it gives completely incorrect intuitions about what is going on. Furthermore you always risk inadvertently hammering your intercore communication by having separate threads accumulate into separate slots in an array that share a cacheline (cf eg Random number and parallel execution - #19 by foobar_lv2).

Base offers the much better @spawn:

import Distributed
using BenchmarkTools
par1 = rand(Int64, 1000)
par2 = rand(1000)

doMyStuff(par1,par2) = abs(0-(-par1^3+par1^2+par1+par2^3-par2^2+par2+10))
       function tuneParametersB(par1,par2)

       todo = Task[]
       for ran in Distributed.splitrange(1, length(par1), Threads.nthreads())
       t = Threads.@spawn begin 
       let be = Inf64, bp1=-1, bp2=-1.0
       @inbounds for i in ran
       pi = par1[i]
       for pj in par2
       res = doMyStuff(pi,pj)
       if res < be be, bp1, bp2 = res, pi, pj end
       end
       end
       (be, bp1, bp2)
       end end#spawn
       push!(todo, t)
       end 
       be, bp1, bp2 = (Inf64, -1, -1.0)
       for t in todo
       be_, bp1_, bp2_ = fetch(t)
       if(be_ < be) be, bp1, bp2 = be_, bp1_, bp2_ end
       end
       be, bp1, bp2
       end

@btime tuneParametersB(par1,par2)

Alternatively, you can use a library for parallel mapreduce, as previous posters recommended; or you can use Threads.@threads to multithread over the presplit ranges (in real life the code would not look as ugly, I just copy-pasted from the repl)

(the Treads.@threads macro predates the fancy new multithreading support, that’s why it’s so terrible compared to the new task-based parallelism)

2 Likes

I’d rather not recommend @spawn to programmers new to threading. It’s a great foundation but difficult to use and loses clarity in the program. For example, while concise and self-contained, @foobar_lv2’s example contains subtly different but yet equivalent definitions of the “monoid” if res < be be, bp1, bp2 = res, pi, pj end. There is no syntactic constraint that the required invariance between these two definitions is preserved after subsequent refactorings. Furthermore, when using @spawn, you need to understand let in let be = Inf64, bp1=-1, bp2=-1.0 is crucial (see PSA: Reasoning about scope rules and multithreading - #2 by tkf for why let is needed; FYI, FLoops.jl tries to detect this bug).

If you want simple data parallelism like this, I strongly recommend learning how to use mapreduce (or, equivalently, something like FLoops.jl), at least as a first step.

1 Like

I’d rather not recommend @spawn to programmers new to threading.

Indeed, that would be the right recommendation for people who are experienced about threading. I agree that your multithreaded libraries are awesome, and can save some code.

However, all the complexities you note are somewhat the point of this exercise:

Understanding the interaction of let and closures is absolutely essential to writing good julia code. Understanding how task-based multithreading is supposed to work – i.e. the fundamental idea of “spawn off some units of work, then fetch to synchronize on completion; profit if you spawned enough stuff that no core goes idle; choose large enough work packages to overcome scheduler overhead” – is absolutely essential, and “doing” is a good strategy for learning.

First, someone wants multithreaded map!, i.e. update some array in place with non-overlapping destination indices (Threads.@threads can do the job with minimal fuss, at the price of composability).

Then someone wants a multithreaded mapreduce. (your libraries do the job with minimal fuss; Threads.@threads is not up to the job, unless one goes into big contortions – now all time spent understanding Threads.@threads is somewhat wasted and one needs to unlearn bad patterns like relying on Threads.threadid() instead of using the Task system)

Then someone has a recursive / divide+conquer algorithm like e.g. sorting an array. Threads.@spawn is the way to go.

Most problems don’t need complex concurrent code, with atomics, locks and fancy lock-free datastructures. But divide-and-conquer style algorithms are everywhere, so I don’t see a point in hiding how to do that.

I agree writing divide-and-conquer -based algorithms is a very important (though somewhat advanced) skill. You do need to use it directly for writing something like sort and accumulate. But my impression from looking at discourse questions since Julia 1.3 is that people don’t need such advanced tools most of the time. Furthermore, constructing a clever accumulator type and a monoid over it require a different kind of skill and a good insight to your problem (see, e.g., Parallel word count · Transducers.jl). I think it’s worth learning this approach well, especially because it is applicable as-is to different execution contexts like GPU and distributed environments where task-based approach becomes impossible or at least much trickier.

My intuition is that it’s always better to express what to compute than how to compute (e.g., don’t use while + iterate when you mean for). It makes it easier to communicate your intent to people and the system (compiler + runtime + library). The latter enables optimizations and portability.

Yes, I totally agree with this. But please note that what I was pointing out was about writing merely well-defined code. Not even correct/intended code. That’s because, if you have a data race (e.g., remove the let), your code goes straight into undefined behavior. That’s even before writing code whose behavior is intended. After writing well-defined intended code, you can then finally start talking about optimizing code.

Yes, communication-by-sharing type of concurrency constructs are truly tricky beasts (e.g., look how long it took for C/C++ standard to nail down (a subset of) atomics). So, yeah, thanks for explaining it extensively in your posts!

The fact is that, I have to admit, I am understanding very little of all your conversation.
I have some intuitions on “threads are dangerous because their order of execution is not guaranteed”, and that one can put some “locks” to avoid it as to let all the other threads wait. I did even thought, it did come to me more natural, that the lock was to individual variables rather than some code.

That’s all I can arrive… mapreduce, reductions, parallel composition, task-based parallelism… are already concepts that require more specific study (and time)… I went to the home page of Floops.jl… I understood almost nothing, starting form the title ( “fold”). I just had the feeling that I need to go to an other package, FoldsThreads.jl. But then I got other “strange” word: “executor” what is it ???

I usually use Threads.@threads like this :

Threads.@threads for d in data
    process_data(d)#long running function, save results to disk
end

If I understand correctly I can replace it with :

for d in data
   Threads.@spawn process_data(d)
end

The advantage being I could use @spawn in process_data as well ?

Your assumption is correct. But notice the word wait. Waiting wastes CPU time. So, you need to use a technique that avoids waiting as much as possible. Most of the time, you can do it by using a high-level API like @threads (though very limited) or the ones from, e.g., JuliaFolds/*.jl.

That’s exactly why I wrote A quick introduction to data parallelism in Julia. I tried to write it in a way you can understand without already knowing these words.

I tried to explain it here:

No, you need @sync to make it equivalent. Also, if data is very long, it’s very likely @threads is much better. If you have this type of computation and if you don’t already observe some performance problems, I highly recommend just sticking with @threads.

2 Likes