The right mental model of `@threads`?

Hi,

After reading this blog about multi-threading I’ve tried to parallelize a recursive algorithm. The problem is that it used modifying functions and the docs warn us that “we might get a wrong answer”. And indeed, in the toy example below we do get a wrong answer.

I struggle to see how/when that happens and what a good mental model of `@threads` would be (as you may have guessed, I don’t have a CS background). Is there another way to parallelize an algorithm of the following structure?

``````import Base.Threads.@spawn

# --- Original algorithm
function foo!(res, n)
for i in 1:n
if i == 1
else
foo!(res, i-1)
end
end
end

function foo(n)
res = Int[]
foo!(res, n)
res
end

# Doesn't work correctly!
function foo2!(res, n)
if i == 1
else
foo2!(res, i-1)
end
end
end

function foo2(n)
res = Int[]
foo2!(res, n)
res
end

# --- run functions
n = 5

s1 = foo(n)
length(s1)                # is always 2^(n-1)

s2 = foo2(n)
length(s2)                # is changing from run to run...
``````

(I run Julia 1.4.0)

Hi

Think of threads as a bunch of workers standing around waiting to be given tasks. When you call

``````@threads for i in 1:n
``````

The compiler basically says, we have lets say 4 workers available, and we need to run this loop over 1 times, thus lets split the work 1:n/4 n/4:n/2 n/2:3n/4 3n/4:n and give it to the workers. They will now start working, unfortunately they don’t all work at the same pace.
If they are writing their answers into their own unique output spots, this isn’t a problem, however if they need to place all the results into a single pile, the sequence that you might end up with is random.

In your toy example, push! places an answer at the end of the array (or think of it as a queue), however now the answers are likely to arrive into the queue in a random order depending on which worker happened to work the fastest.

This explains a random ordering, but I would still think you should always get the same length.

There is a subtler problem called race conditions which could explain the length, it comes in in terms of the queue management. Essentially to add something to the queue, you need to lookup the queue length, go add the new data at the memory location just beyond the old data and then update the queue length. If the first worker is busy adding the new data and another worker happen to look up the queue length before the first worker has had a chance to update the length variable, the second worker will overwrite the same place in memory. That can be avoided by using what is called atomic actions (like read value and increment the queue length as a single event). I am a bit surprised that push! would not be using that natively (or is there a slightly slower thread-safe version of push! available somewhere?), however I have not yet ventured into threaded julia (excluding using BLAS and FFT’s multiple threading). You can add those elements by manually using lock, Mutex and Semaphores etc.

My personal opinion is that threads can be useful in certain cases, but they can more easily get you into weird troubles. The message passing (@spawn, Tasks and Channels is generally a safer approach, since the interactions between workers are clearly defined) or using higher level constructs specifically created for this parallel working, like Shared Arrays.

Unfortunately your toy example is so “toy” that I cannot comment on whether you can achieve that in a threaded manner. You are recursively calling a function and then trying to launch threaded actions (the for loop) at each level. I believe only the top level will be threaded, the inner levels will not be allowed to use threads.

• Do you only care about recursing and the sequence of outputs doesn’t matter? Then thread is OK, you just need to manage the locking on push!
• Do you care about the sequence? Then see if you can determine the output size beforehand, allocate it and then ask each task to write into the correct location, which will require passing some extra information so each call to foo! will know where it fits into the tree.
• Do you want to reuse previous calls of the recursive function, then using some caching mechanism is needed. I remember some julia package that helps with that, but cannot off-hand remember the name.

If this doesn’t answer your question sufficiently, maybe if you can give a slightly less toy example will help others understand what the concept is you need.

4 Likes

Thanks a lot for this excellent reply @lwabeke!

I’ve expected a random ordering (I don’t care about this in my application) but I didn’t think about race conditions and `push!`. You pointer in this direction was very helpful.

Looking at the newer documentation it seems `lock` is a good way to avoid the race conditions. I’ll update the example if I figured it out.

Most operations in Julia are not thread-safe. I.e. if data is shared between threads you must ensure that updates are not done simultaneously. The most general mechanism is to create a lock which is shared between the threads (either a global, or as an argument to the function).

``````mylock = ReentrantLock()
...
lock(mylock)
<update shared data>
unlock(mylock)
``````

Only one thread can hold a lock at any time. The others entering `lock(mylock)` will wait until it is unlocked. There is some overhead in a lock, so it can be inefficient if the updates are frequent.

For counters and other simple data, you can also create an “atomic” which can be increased atomically (even a standard `N += 1` isn’t thread-safe, `N` must be read, increased and written, if two threads read simultaneously it goes wrong.) The atomic operations have low overhead, they typically utilize atomic instructions in the CPU and do not bother the Julia task scheduler like lock() does.

The Threads.@threads macro uses another functionality under the hood, exposed via the macro Threads.@spawn. It can be used directly for more flexible parallellization. Like in the following (admittedly silly and inefficient) recursive example:

``````function f(n)
(n <= 2) && return 1
f1 = f(n-1)
return f1 + fetch(f2)
end
``````

Threads.@spawn simply means “run the expression in a separate thread, but just continue the program here too”. The net effect is that `f(n-2)` starts up in a separate thread, and a handle to the thread is stored in `f2`. The program continues in parallel with computing `f(n-1)`. The `fetch(f2)` fetches the result from the thread (and waits, if necessary). But you will still have to synchronize updates to data shared between threads.

Threads.@spawn will not overbook your cpu. Even if you spawn a thousand times, only up to Threads.nthreads() things will run simultaneously. That is, the function you spawn may do its own spawns.

1 Like

Thanks @sgaure for the pointer to `ReentrantLock`!
The (inprogress) documentation about lock gives the wrong impression that one could simple pass any variable to `lock`.

With your help I came up with this version that always returns the correct length:

``````# pass a 'lock' to avoid race conditions while `push!`
function foo3!(res, n, mylock)
if i == 1
lock(mylock) do
end
else
foo3!(res, i-1, mylock)
end
end
end

function foo3(n)
res = Int[]
mylock = ReentrantLock()
foo3!(res, n, mylock)
res
end
``````

Thanks a lot!

1 Like

An alternative, which avoids the lock overhead, is if you know the size of the result array before start. You can then allocate the full array in foo3(), and pass an atomic integer instead of a lock to foo3!():

``````idx = Threads.Atomic{Int}(1)
...