Simplest way to parallelize a while loop

#1

I’m wondering what’s the easiest way to multi-thread a code of the following form:

L=[someStuff]
while !isempty(L)
    current=pop!(L)
    #do some computations involving current
    if someConditionDependingOnThoseComputations
        push!(L, anArrayOfStuffs...)
    end
end

So roughly I’d like something like the Threads.@threads macro but for while loops (using locks before pop!-ing and push!-ing of course). In fact there’s an obvious way of doing what I want just by using this macro and doing a for loop on the number of threads and so on, I’m just wondering if there’s a better/cleaner way to do it.

I’ve had a quick look at the code of this macro, and the key seems to be

ccall(:jl_threading_run, Ref{Cvoid}, (Any,), someFunction)

But I haven’t found any documentation on how to properly use that, and of course there might be a better way ! Any suggestion welcome.

1 Like

#2

Currently, I think that is indeed the most straightforward way.
If that is something you do very often you can create your own macro. Something like the pseudocode below

macro threadit(b)
    quote
       Threads.@threads for _ in Base.OneTo(Threads.nthreads())
            $b
        end
    end
end
0 Likes

#3

RIght, unfortunately it’s not that easy, the threads need to keep looping while there’s work to be done, and sometimes they have to wait while L is empty but other threads are running. I’ve tried writing down a dummy version of what I want, but the result is not super elegant, and on Julia 1.0 it keeps provoking a segfault although there are locks everywhere… On 1.1 it works sometimes, throw somewhat random errors some other times, and segfault every once in a while.

function test(n::Real) #try test(100.0)
    a=Threads.Mutex()
    c=Threads.Condition()
    atwork=Threads.Atomic{Int}(0)
    L=[n]
    Threads.@threads for i in 1:Threads.nthreads()
        while true
            lock(a)
            while isempty(L) && atwork[] >0 #we wait while L is empty but other threads are working
                unlock(a)
                wait(c)
                lock(a)
            end
            if isempty(L) && atwork[]==0 #if nobody's working and L is empty the work is done !
                unlock(a)
                break
              end
            Threads.atomic_add!(atwork,1) #we are starting to do stuff
            x=pop!(L) #take the next task
            unlock(a)
            for j=1:100
                x^10 #useless computation
            end
            if abs(x)>1
                lock(a)
                append!(L,[x/2,x/2]) #push something into L
                unlock(a)
            end
            Threads.atomic_sub!(atwork,1) #not doing stuffs anymore
            notify(c) #tell people waiting they need to go on and check L again
        end
    end
end
0 Likes