I have the following code, that works well:
# Dummy function, the real code makes heavy use of ModelingToolkit and DifferentialEquations
function rel_sigma(;λ_nom)
sleep(1)
rand()
end
λ_noms = sort(unique(vcat(7:0.1:9.5, 7.8:0.02:8.2, 7.98:0.005:8.02)))
rel_sigmas = zeros(length(λ_noms))
Threads.@threads for i in eachindex(λ_noms, rel_sigmas)
λ_nom = λ_noms[i]
println("lambda_nom: $λ_nom")
rel_sigma_ = 100 * rel_sigma(λ_nom=λ_nom)
rel_sigmas[i] = rel_sigma_
end
The weak point of multi-threading in my use case is, that I only have a speed gain of a factor of two, even though I have 16 cores, mainly because of the garbage collection.
Now I want to try to use multiple processes.
This works:
using Distributed
@everywhere rel_sigma(λ_nom=8)
But how can I achieve that:
- each worker processes a different input value
- the results are returned to the main process
Any hints welcome!
I have a first code that works for two workers:
function calc()
λ_noms = sort(unique(vcat(7:0.1:9.5)))
rel_sigmas = zeros(length(λ_noms))
a = @spawnat 1 rel_sigma(λ_nom = λ_noms[1])
b = @spawnat 2 rel_sigma(λ_nom = λ_noms[2])
rel_sigmas[1] = fetch(a)
rel_sigmas[2] = fetch(b)
rel_sigmas[1:2]
end
calc()
I will check pmap, thanks for the hint!
This is what I have now:
function calc()
λ_noms = sort(unique(vcat(7:0.1:9.5, 7.3:0.02:7.7, 7.4:0.005:7.6, 7.8:0.02:8.2, 7.98:0.005:8.02)))
rel_sigmas = zeros(length(λ_noms))
m = length(λ_noms)
n = length(workers())
for j in 0:div(m,n)
procs = Future[]
for (i, worker) in pairs(workers())
λ_nom = λ_noms[i+n*j]
proc = @spawnat worker rel_sigma(λ_nom = λ_nom)
push!(procs, proc)
if i+n*j == m
break
end
end
for (i, proc) in pairs(procs)
rel_sigmas[i+n*j] = fetch(proc)
if i+n*j == m
break
end
end
end
λ_noms, rel_sigmas
end
You could replace @spawnat worker ...
here with Dagger.@spawn ...
if you wanted to use Dagger.jl for multithreading and distributed computing simultaneously. You’d also need to change to procs = Dagger.EagerThunk[]
(or just Any
type it) to match Dagger.@spawn
’s return type.
Indeed, pmap does the job fine:
@everywhere function rel_sigma1(λ_nom)
rel_sigma(λ_nom = λ_nom)
end
function calc_rel_sigmas(λ_noms)
pmap(rel_sigma1, λ_noms)
end
λ_noms = sort(unique(vcat(7:0.1:9.5)))
rel_sigmas = calc_rel_sigmas(λ_noms)
Findings so far:
- using threads I achieve a speed-up by a factor of two
- using pmap I achieve a speed-up by a factor of 5.5
On a machine with 16 cores, where in theory a speed-up of 13.1 should be possible. 13 and not 16 because the cpu frequency with all cores active drops from 5.5 GHz to 4.5 GHz.
This is already quite nice. I think that I cannot achieve a higher speedup is mainly due to the limited CPU cache size.
Other finding: I had to add the lines to the function rel_sigma
# do a garbage collection if less than 6GB free RAM
if Sys.free_memory()/2^30 < 6.0
GC.gc()
end
to avoid an out-of-memory error. There seams to be a bug in the garbage collector that it does not work correctly when you run many processes in parallel.
5 Likes
I noticed large memory use in parallel workloads and have been trying to work out what was causing it.
I suspected the garbage collector but good to know someone else has come to the same/similar conclusion!