While loop and parallel computing

Dear all,

Suppose I have a container, say res = [], that I want to be filled by the output of a function, say f() = rand() > .5 ? 1 : nothing. Suppose I’m done once res has a certain length. Given that the actual function I’m interested in takes a long time to finish, I would like to run this in a while loop that lets the function run in parallel. I am having difficulty figuring out how to do this. I suppose the solution would have the following form:

using Distributed

addprocs()

res = [] # or @everywhere res = [] ?

while length(res) < 100
    # run f() in parallel till the condition is met
end

However, I have no idea how to fill in the details. Any help would be greatly appreciated.

Probably it’s easiest to use Distributed.pmap, e.g.

res = pmap(x -> f(), 1:100)

Thanks Steven, but this gives me as output a 100-element Vector{Union{Nothing, Vector{Any}}}, with some elements being nothing and others vectors of 1s of variable length. What I’d like to get is a single vector with 100 1s. Is that possible?

I think that you would need to parallelize in batches. Use an approach of Steven for certain number of iterations, compute, how many valid points do you have, if you have enough, stop, of not not, ask for more in parallel.

Alternative might be to create some kind of dynamic scheduler, which will continue to issue “jobs” until the desired number of items is obtain. I can imagine to use channels for that. But I do no know about anything that would work straight away.

How did you define f? This works for me:

julia> pmap(_ -> rand() < 0.5 ? 1 : nothing, 1:4)
4-element Vector{Union{Nothing, Int64}}:
 1
  nothing
  nothing
 1

Sorry, yes, when I tried Steven’s proposal I forgot to put @everywhere in front of the definition of f. But even then it doesn’t solve the problem. I suppose I have to try something like what @Tomas_Pevny suggests, although I wouldn’t quite know where to start with that. In any case, thanks for your replies.

If the function being computed is slow, using a lock will be fine. You can do something like this (which combines the idea of using batches with the while loop):

julia> f() = rand() > 0.5 ? 1 : nothing
       function res()
           lk = ReentrantLock()
           res = Int[]
           Threads.@threads for it in 1:Threads.nthreads()
               while length(res) < 100
                   r = f()
                   if !isnothing(r)
                       lock(lk) do
                           length(res) < 100 && push!(res,r)
                       end
                   end
               end
           end
           return res
       end
res (generic function with 1 method)

julia> count(isequal(1),res()) == length(res())
true

I think you may compute the function unnecessarily in some threads, and therefore the check length(res) < 100 && .... is needed there, but that may be a minor problem if you have to accumulate many results.

2 Likes

If you use multi-threading, you can simply use an atomic counter combined with the entire reduction infrastructure in JuliaFolds. In particular, FLoops’ support of break in parallel reduction is useful here:

using FLoops
using BangBang

function sample_somethings(f, nitems)
    len = Threads.Atomic{Int}(0)
    @floop for x in 1:typemax(Int)
        if len[] > nitems
            break
        end
        y = f()
        if y !== nothing
            Threads.atomic_add!(len, 1)
            items = (something(y),)
            @reduce() do (results = Union{}[]; items)
                results = append!!(results, items)
            end
        end
    end
    return results
end
julia> sample_somethings(() -> rand() > 0.5 ? 1 : nothing, 100)
101-element Vector{Int64}:
 1
 1
 1
 1
 ⋮
 1
 1
 1

Unfortunately, this strategy is rather tricky to implement for a distributed setting. Perhaps the easiest way to implement this is to use a RemoteChannel:

using Distributed
using BangBang

function distributed_sample_somethings(f, nitems, batchsize = 20)
    chan = RemoteChannel()
    @sync try
        for w in workers()
            Distributed.@spawnat w try
                while true
                    ys = map(1:batchsize) do _
                        f()
                    end
                    try
                        put!(chan, ys)
                    catch
                        break
                    end
                end
            finally
                close(chan)
            end
        end
        results = Union{}[]
        while true
            ys = take!(chan)::Vector
            results = append!!(results, Iterators.filter(!isnothing, ys))
            if length(results) > nitems
                close(chan)
                break
            end
        end
        return results
    finally
        close(chan)
    end
end
julia> distributed_sample_somethings(() -> rand() > 0.5 ? 1 : nothing, 100)
108-element Vector{Int64}:
 1
 1
 1
 1
 ⋮
 1
 1
 1

Here, batchsize is an extra parameter to exchange the communication cost with possibly-wasted computation.

As a side note, I’d point out that we are forced to use an ugly catch-all pattern that can hide potential bugs:

try
    put!(chan, ys)
catch
    break
end

This is probably impossible to avoid when only using public API. It’d be nice if we can have something like maybetake! in add maybetake! and tryput! by tkf · Pull Request #41966 · JuliaLang/julia · GitHub

2 Likes

Thank you very much @lmiq. That gives me the speed I need. Tomorrow, out of curiosity, I’ll also try to implement @tkf’s suggestion, for which I’m also grateful.