I have the following code, that works well:

``````# Dummy function, the real code makes heavy use of ModelingToolkit and DifferentialEquations
function rel_sigma(;λ_nom)
sleep(1)
rand()
end

λ_noms = sort(unique(vcat(7:0.1:9.5, 7.8:0.02:8.2, 7.98:0.005:8.02)))
rel_sigmas = zeros(length(λ_noms))
λ_nom = λ_noms[i]
println("lambda_nom: \$λ_nom")
rel_sigma_ = 100 * rel_sigma(λ_nom=λ_nom)
rel_sigmas[i] = rel_sigma_
end
``````

The weak point of multi-threading in my use case is, that I only have a speed gain of a factor of two, even though I have 16 cores, mainly because of the garbage collection.

Now I want to try to use multiple processes.

This works:

``````using Distributed

@everywhere rel_sigma(λ_nom=8)
``````

But how can I achieve that:

• each worker processes a different input value
• the results are returned to the main process

Any hints welcome!

I have a first code that works for two workers:

``````function calc()
λ_noms = sort(unique(vcat(7:0.1:9.5)))
rel_sigmas = zeros(length(λ_noms))

a = @spawnat 1 rel_sigma(λ_nom = λ_noms)
b = @spawnat 2 rel_sigma(λ_nom = λ_noms)
rel_sigmas = fetch(a)
rel_sigmas = fetch(b)
rel_sigmas[1:2]
end

calc()
``````

Can’t you just use `pmap`?

I will check pmap, thanks for the hint!
This is what I have now:

``````function calc()
λ_noms = sort(unique(vcat(7:0.1:9.5, 7.3:0.02:7.7, 7.4:0.005:7.6, 7.8:0.02:8.2, 7.98:0.005:8.02)))
rel_sigmas = zeros(length(λ_noms))
m = length(λ_noms)
n = length(workers())
for j in 0:div(m,n)
procs = Future[]
for (i, worker) in pairs(workers())
λ_nom = λ_noms[i+n*j]
proc = @spawnat worker rel_sigma(λ_nom = λ_nom)
push!(procs, proc)
if i+n*j == m
break
end
end
for (i, proc) in pairs(procs)
rel_sigmas[i+n*j] = fetch(proc)
if i+n*j == m
break
end
end
end
λ_noms, rel_sigmas
end
``````

You could replace `@spawnat worker ...` here with `Dagger.@spawn ...` if you wanted to use Dagger.jl for multithreading and distributed computing simultaneously. You’d also need to change to `procs = Dagger.EagerThunk[]` (or just `Any` type it) to match `Dagger.@spawn`’s return type.

Indeed, pmap does the job fine:

``````@everywhere function rel_sigma1(λ_nom)
rel_sigma(λ_nom = λ_nom)
end

function calc_rel_sigmas(λ_noms)
pmap(rel_sigma1, λ_noms)
end

λ_noms = sort(unique(vcat(7:0.1:9.5)))
rel_sigmas = calc_rel_sigmas(λ_noms)
``````

Findings so far:

• using threads I achieve a speed-up by a factor of two
• using pmap I achieve a speed-up by a factor of 5.5

On a machine with 16 cores, where in theory a speed-up of 13.1 should be possible. 13 and not 16 because the cpu frequency with all cores active drops from 5.5 GHz to 4.5 GHz.

This is already quite nice. I think that I cannot achieve a higher speedup is mainly due to the limited CPU cache size.

Other finding: I had to add the lines to the function rel_sigma

``````    # do a garbage collection if less than 6GB free RAM
if Sys.free_memory()/2^30 < 6.0
GC.gc()
end
``````

to avoid an out-of-memory error. There seams to be a bug in the garbage collector that it does not work correctly when you run many processes in parallel.

3 Likes

I noticed large memory use in parallel workloads and have been trying to work out what was causing it.

I suspected the garbage collector but good to know someone else has come to the same/similar conclusion!