Running for loop in parallel and appending result to vector

parallel

#1

I have the following snippet of code where I am looping over and would like to append/push to curetime or progressiontime based on conditions met. As I want to increase the size of the loop it would be helpful to run this in parallel:

nPatients = 10^6
global curetime = #SharedArray{Float64}(nPatients)
global progressiontime = #SharedArray{Float64}(nPatients)

for n = 1 : nPatients

(t, discretePop, continuousPop) = main()

if ~isempty(discretePop) && discretePop[4,end] == 0.0
    println("cure at t = $(t[end])")
    # curetime[n] = t[end]
    push!(curetime,t[end])
elseif continuousPop[4,end] > continuousPop[4,1]*1.2
    println("progression at t = $(t[end])")
    # progressiontime[n] = t[end]
    push!(progressiontime,t[end])
else
    println("Stable at t = 200.0")
end

end

How can I fix this code to work in parallel? I have tried @distributed but I get complaints when using push!. I am open to preallocating the array size and potentially returning a list (e.g. (timeOfEvent, eventType), where eventType = cure or progression) if this easier to implement.

As an aside, I have also tried @time @distributed and got a time returned of < 1 second when it actually took >10 seconds to return this value to the screen.


#2

First, take a look at this topic on how to format and phrase your question to get better help.

Mostly, it’s very helpful if you can post a minimal, working, self-contained example. It’s not clear from this code what main does, how n is supposed to be used in the loop, how often you’ll push to the vectors, if you actually intend to print for each iteration, etc.

Is @distributed what you want (parallel computing), and not just @threads (multiple threads in a single computer)? More details here.

Multi-threading is usually the last optimization step I do, since it tends to complicate the code, and for unoptimized code you can typically achieve much better speedups using other techniques. Is the rest of your code already optimized? Proper choice of data structures and algorithms, type stability, caching, vectorization where applicable?


#3

I will do my best to give a minimal working example. The question on how often I want to append to a vector is based on whether one of those two conditions is met. It is certainly possible that I can optimize the code in other areas, but as I want to run this on a cluster, this is an easy way to utilize that computing power without trying to change too much to working code.

@threads may be a much better choice, but to be honest I am struggling through some of the documentation on how to apply this to my work and what exactly @threads is doing where I have a relative intuition of what @distributed is doing.

The algorithm main essentially is running a hybrid ODE-stochastic model where I check the size of variables and update them in the small pop stochastic algorithm (Gillespie) vs the large deterministic (DIfferentialEquation.jl) solver.

I will try to make a minimal example and get back to you in a few hours. Thanks for the tips!


#4

pmap is one of the easiest ways to do this, but you need to have a 1-to-1 correspondence between input elements and output elements. You could perhaps pmap(main, ...) over your patients and then have a second for loop that does the post-processing of the results.


#5

Here, I hope is a minimal working example, let me know if this makes sense.

global pop1 = Array{Float64}
global pop2 = Array{Float64}
@time @distributed for 1:nT

roll = rand()

if roll < 0.5
push!(pop1,roll)
else
push!(pop2,roll)
end

end

Alternatively, it may be more efficient to instead save a list:

nT = 100
global pop = ? # type that will have a list of form (roll, "event 1 or 2 occurred")
@time @distributed for 1:nT

roll = rand()

if roll < 0.5
# append to pop -> (roll, "event 1")
else
# append to pop -> (roll, "event 2")
end

end

#6

Perhaps just execute pmap on a function that returns, say a tuple (eventType, timeOfEvent) where eventType indicates cure/progression, and then partition the results.

allResults = pmap(f, 1:nPatients)
cureResults = [ t[2] for t in allResults if t[1] == cure ]
...

#7

I tried pmap on a test function

function f()
roll = rand()
if roll < 0.5
event = "cure"
else
event = "progression"
end
return (roll, event)
end

n = 10^3
allResults = pmap(f, 1:n)

and get this error:
ERROR: MethodError: objects of type Tuple{Float64,String} are not callable


#8

Perhaps something like this?
(works for me on Julia v1.0.3)

using Distributed
addprocs()

@everywhere function f(patient)
    roll = rand()
    if roll < 0.5
        event = :cure
    else
        event = :progression
    end
    return (eventType=event, timeOfEvent=roll)
end

n = 10^3
allResults = pmap(f, 1:n)
cureResults = [ t.timeOfEvent for t in allResults if t.eventType == :cure ]
progressionResults = [ t.timeOfEvent for t in allResults if t.eventType == :progression ]

#9

This does work, but it is odd to me that pmap is so much slower than just running a simple for loop. I tested with n = 10^5 and found that

using Distributed

n = 10^5
test = Array{Tuple{Float64,String},1}(undef,n)
@time for i = 1 : n
       test[i] = f()
       end
  0.011374 seconds (399.00 k allocations: 9.142 MiB)

addprocs(3);

@time allResults = pmap(f, 1:n);
  7.553350 seconds (9.44 M allocations: 351.596 MiB, 2.60% gc time)

#10

Well yes, pmap has some overhead for inter-process communication. It’s a good choice when the mapped function is expensive, since the overhead becomes marginal/negligible. It can be a poor choice for inexpensive workloads, for example your toy function. The advantage of pmap is that it scales well and can be used across a cluster. The iterations are distributed dynamically which is useful for uneven workloads.

Using @distributed with SharedArray is another option. This can be faster then pmap. Iterations are statically distributed across workers, and inter-process overhead is minimized. Obviously SharedArrays are limited to processes with access to the shared memory (typically the same machine, cannot be used across a cluster).

You could also consider using Multi-Threading. This can be the fastest option, but is still somewhat experimental in Julia and obviously requires a thread-safe application.

Here’s a rough outline of how these might work for your toy example:

using BenchmarkTools
using Distributed
using SharedArrays

addprocs(Threads.nthreads())
println("Num threads = ", Threads.nthreads())
println("Num workers = ", nworkers())

@everywhere function toyf(patient)
    roll = rand()
    if isodd(patient)
        event = 1
    else
        event = 2
    end
    return (event, patient*5.0)
end

function shared(f, n)
    results = SharedArray{Tuple{Int,Float64}}(n)
    @sync @distributed for i = 1:n
        results[i] = f(i)
    end 
    return results
end 

function threaded(f, n)
    results = Vector{Tuple{Int,Float64}}(undef, n)
    Threads.@threads for i = 1:n
        results[i] = f(i)
    end
    return results
end

n = 10^3
pmapResults = pmap(toyf, 1:n)
sharedResults = shared(toyf, n)
threadedResults = threaded(toyf, n)
@assert pmapResults == sharedResults == threadedResults

cureResults = [ t[2] for t in pmapResults if t[1] == 1 ]
progressionResults = [ t[2] for t in pmapResults if t[1] == 2 ]

@btime pmap($toyf, $(1:n));
@btime shared($toyf, $n);
@btime threaded($toyf, $n);

As a matter of interest, timings on my machine (24 cores, 24 workers, 24 threads) are below.
But note that these timing differences might become meaningless for a non-toy workload.

Julia-1.0.3> @btime pmap($toyf, $(1:n));
  38.842 ms (63844 allocations: 2.26 MiB)

Julia-1.0.3> @btime shared($toyf, $n);
  8.681 ms (7054 allocations: 323.63 KiB)

Julia-1.0.3> @btime threaded($toyf, $n);
  27.712 μs (2 allocations: 15.78 KiB)

#11

This worked great, thanks! However, it seems both shared and threaded when applied to my actual problem is giving identical results (perhaps expectedly). I tried changing the rng_seed, but the array was still filled identically for both threaded and shared. I have a bunch of global variables declared in f(). Perhaps they are sharing the same set of parameters (which are randomly perturbed using randn? I tried naively removing all instances of global, but this ran into errors upon entering the loop in main(), where it said the variables were now undefined.

I can try to give a minimal example if this isn’t an obvious fix.


#12

I think I found the fix. I needed to @everywhere using <packages> after calling addprocs(). Something like this

using Distributed

if nprocs() < 5
    addprocs(Threads.nthreads())
end
println("Num threads = ", Threads.nthreads())
println("Num workers = ", nworkers())

@everywhere using DifferentialEquations
@everywhere using SharedArrays

#13

The following example returns identical copies in result. What did I do wrong?

using Distributed

if nprocs() < 5
    addprocs(Threads.nthreads())
end
println("Num threads = ", Threads.nthreads())
println("Num workers = ", nworkers())

@everywhere function main()

    roll = rand()
    pars = rand(5)

    if roll < 0.5
        return (roll,"cure",pars)
    else
        return (roll,"progression",pars)
    end


end

function threaded(f,nT)

    results = Array{Tuple{Float64,String,Array{Float64}}}(undef,nT)

    Threads.@threads for n = 1 : nT

        (roll, event, params) = f
        results[n] = (roll,event,params)
    
    end

    return results

end

nT = 10^2

@time threadedresults = threaded(main(),nT);

#14

Should pass in main the function not main() the tuple.

In threaded, should assign the result from executing f(), not f.

Also I don’t think rand() is thread-safe. You can search for related topics on this.


#15

I believe I made the changes you mentioned, but I now get an error:

Error thrown in threaded loop on thread 3: InexactError(func=:check_top_bit, T=Int64, val=-48)
Error thrown in threaded loop on thread 2: InexactError(func=:check_top_bit, T=Int64, val=-40)
Error thrown in threaded loop on thread 0: InexactError(func=:check_top_bit, T=Int64, val=-40)  0.027381 seconds (103.39 k allocations: 7.308 MiB)

Subsequent runs yield this error: ERROR: LoadError: InexactError: check_top_bit(Int64, -56)

The updated code:

using Distributed

if nprocs() < 5
    addprocs(Threads.nthreads())
end
println("Num threads = ", Threads.nthreads())
println("Num workers = ", nworkers())

@everywhere function main()

    roll = rand()
    pars = rand(5)

    if roll < 0.5
        return (roll,"cure",pars)
    else
        return (roll,"progression",pars)
    end


end

function serial(f,nT)

    results = Array{Tuple{Float64,String,Array{Float64}}}(undef,nT)

    for n = 1 : nT
        results[n] = f()    
    end

    return results

end

function threaded(f,nT)

    results = Array{Tuple{Float64,String,Array{Float64}}}(undef,nT)

    Threads.@threads for n = 1 : nT

        results[n] = f()
    
    end

    return results

end

nT = 10^5

@time serialresults = serial(main,nT);
@time threadedresults = threaded(main,nT);

It is fine at the moment if rand() returns poor results, I just want to make sure I can get some parallelization running. I see other questions have been asked related to using rand() in threads.


#16

rand() is not thread-safe. You can solve that by creating a separate RNG per thread. See sample code below.

Also note that you should use @btime in BenchmarkTools instead of @time if you want to measure time somewhat accurately.

I also got rid of the Distributed and @everywhere stuff since it’s not needed for multi-threading. And I’ll repeat my earlier question – are you looking for multi-threading (single computer, multiple cores), or parallel computing (cluster, several distributed machines)? What does your cluster look like? The code below does multi-threading only.

Finally note that since your function main is so short in this example, the way you’re splitting work per thread will incur a lot of overhead. So you can’t expect to see a speedup linear to the number of threads used. There are ways around that, if indeed the work done per iteration is so small. A simple way is to handle batches at a time instead of individual elements.

const RNG = 1:Threads.nthreads() .|>_-> MersenneTwister()

function main(rng)
    roll = rand(rng)
    pars = rand(rng, 5)
    status = roll < 0.5 ? "cure" : "progression"
    (roll, status, pars)
end

function serial(f,nT)
    results = Array{Tuple{Float64,String,Array{Float64}}}(undef,nT)
    rng = MersenneTwister()
    for n = 1 : nT
        results[n] = f(rng)
    end
    return results
end

function threaded(f,nT)
    results = Array{Tuple{Float64,String,Array{Float64}}}(undef,nT)
    Threads.@threads for n = 1 : nT
        rng = RNG[Threads.threadid()]
        results[n] = f(rng)
    end
    return results
end

nT = 10^5

@time serialresults = serial(main, nT)
@time threadedresults = threaded(main, nT)

nothing

Problems using Thread.@thread
Disable global RNG
#17

Eventually, I plan to put my code onto a cluster. However threads seems to offer considerable speedup even on my computer. The cluster has several distributed machines so I had looked into using SharedArrays, but it requires bit only types and so I can’t put in an array or string which makes saving the parameters in my more complex function difficult to save.

In theory I would like a working multi-threading code for my laptop and a parallel computing code for the cluster I work on.


#18

Just use Distributed on both your laptop and the cluster. You can addprocs on your laptop, too… just fewer of them, of course.

Note that you can’t use SharedArrays across multiple machines — it uses a local filesystem to do its sharing.


#19

Ok, hopefully last question. Thank you all for being so helpful. SharedArrays can’t be used, is there a good alternative? In addition, for my output I cannot use it anyway, because I want to return the array of parameters used, so my tuple will be (Float64,Int64,Array{Float64}). I could write out each parameter as a float, but if I keep changing the number of parameters this can become cumbersome.

Also, do I have to seed the rng for each processor as we did with multi-threading? Something along the lines of bennedich’s answer?