# While loop and parallel computing

Dear all,

Suppose I have a container, say `res = []`, that I want to be filled by the output of a function, say `f() = rand() > .5 ? 1 : nothing`. Suppose I’m done once `res` has a certain length. Given that the actual function I’m interested in takes a long time to finish, I would like to run this in a `while` loop that lets the function run in parallel. I am having difficulty figuring out how to do this. I suppose the solution would have the following form:

``````using Distributed

res = [] # or @everywhere res = [] ?

while length(res) < 100
# run f() in parallel till the condition is met
end
``````

However, I have no idea how to fill in the details. Any help would be greatly appreciated.

Probably it’s easiest to use `Distributed.pmap`, e.g.

``````res = pmap(x -> f(), 1:100)
``````

Thanks Steven, but this gives me as output a 100-element `Vector{Union{Nothing, Vector{Any}}}`, with some elements being `nothing` and others vectors of 1s of variable length. What I’d like to get is a single vector with 100 1s. Is that possible?

I think that you would need to parallelize in batches. Use an approach of Steven for certain number of iterations, compute, how many valid points do you have, if you have enough, stop, of not not, ask for more in parallel.

Alternative might be to create some kind of dynamic scheduler, which will continue to issue “jobs” until the desired number of items is obtain. I can imagine to use channels for that. But I do no know about anything that would work straight away.

How did you define `f`? This works for me:

``````julia> pmap(_ -> rand() < 0.5 ? 1 : nothing, 1:4)
4-element Vector{Union{Nothing, Int64}}:
1
nothing
nothing
1
``````

Sorry, yes, when I tried Steven’s proposal I forgot to put `@everywhere` in front of the definition of `f`. But even then it doesn’t solve the problem. I suppose I have to try something like what @Tomas_Pevny suggests, although I wouldn’t quite know where to start with that. In any case, thanks for your replies.

If the function being computed is slow, using a lock will be fine. You can do something like this (which combines the idea of using batches with the while loop):

``````julia> f() = rand() > 0.5 ? 1 : nothing
function res()
lk = ReentrantLock()
res = Int[]
while length(res) < 100
r = f()
if !isnothing(r)
lock(lk) do
length(res) < 100 && push!(res,r)
end
end
end
end
return res
end
res (generic function with 1 method)

julia> count(isequal(1),res()) == length(res())
true

``````

I think you may compute the function unnecessarily in some threads, and therefore the check `length(res) < 100 && .... ` is needed there, but that may be a minor problem if you have to accumulate many results.

2 Likes

If you use multi-threading, you can simply use an atomic counter combined with the entire reduction infrastructure in JuliaFolds. In particular, FLoops’ support of `break` in parallel reduction is useful here:

``````using FLoops
using BangBang

function sample_somethings(f, nitems)
@floop for x in 1:typemax(Int)
if len[] > nitems
break
end
y = f()
if y !== nothing
items = (something(y),)
@reduce() do (results = Union{}[]; items)
results = append!!(results, items)
end
end
end
return results
end
``````
``````julia> sample_somethings(() -> rand() > 0.5 ? 1 : nothing, 100)
101-element Vector{Int64}:
1
1
1
1
⋮
1
1
1
``````

Unfortunately, this strategy is rather tricky to implement for a distributed setting. Perhaps the easiest way to implement this is to use a `RemoteChannel`:

``````using Distributed
using BangBang

function distributed_sample_somethings(f, nitems, batchsize = 20)
chan = RemoteChannel()
@sync try
for w in workers()
Distributed.@spawnat w try
while true
ys = map(1:batchsize) do _
f()
end
try
put!(chan, ys)
catch
break
end
end
finally
close(chan)
end
end
results = Union{}[]
while true
ys = take!(chan)::Vector
results = append!!(results, Iterators.filter(!isnothing, ys))
if length(results) > nitems
close(chan)
break
end
end
return results
finally
close(chan)
end
end
``````
``````julia> distributed_sample_somethings(() -> rand() > 0.5 ? 1 : nothing, 100)
108-element Vector{Int64}:
1
1
1
1
⋮
1
1
1
``````

Here, `batchsize` is an extra parameter to exchange the communication cost with possibly-wasted computation.

As a side note, I’d point out that we are forced to use an ugly catch-all pattern that can hide potential bugs:

``````try
put!(chan, ys)
catch
break
end
``````

This is probably impossible to avoid when only using public API. It’d be nice if we can have something like `maybetake!` in add maybetake! and tryput! by tkf · Pull Request #41966 · JuliaLang/julia · GitHub

2 Likes

Thank you very much @lmiq. That gives me the speed I need. Tomorrow, out of curiosity, I’ll also try to implement @tkf’s suggestion, for which I’m also grateful.