I have the following code, that works well:
# Dummy function, the real code makes heavy use of ModelingToolkit and DifferentialEquations
function rel_sigma(;λ_nom)
    sleep(1)
    rand()
end
λ_noms = sort(unique(vcat(7:0.1:9.5, 7.8:0.02:8.2, 7.98:0.005:8.02)))
rel_sigmas = zeros(length(λ_noms))
Threads.@threads for i in eachindex(λ_noms, rel_sigmas)
    λ_nom = λ_noms[i]
    println("lambda_nom: $λ_nom")
    rel_sigma_ = 100 * rel_sigma(λ_nom=λ_nom)
    rel_sigmas[i] = rel_sigma_
end
The weak point of multi-threading in my use case is, that I only have a speed gain of a factor of two, even though I have 16 cores, mainly because of the garbage collection.
Now I want to try to use multiple processes.
This works:
using Distributed
@everywhere rel_sigma(λ_nom=8)
But how can I achieve that:
- each worker processes a different input value
 
- the results are returned to the main process
 
Any hints welcome!
             
            
              
              
              
            
            
           
          
            
            
              I have a first code that works for two workers:
function calc()
    λ_noms = sort(unique(vcat(7:0.1:9.5)))
    rel_sigmas = zeros(length(λ_noms))
    a = @spawnat 1 rel_sigma(λ_nom = λ_noms[1])
    b = @spawnat 2 rel_sigma(λ_nom = λ_noms[2])
    rel_sigmas[1] = fetch(a)
    rel_sigmas[2] = fetch(b)
    rel_sigmas[1:2]
end
calc()
             
            
              
              
              
            
            
           
          
            
            
              I will check pmap, thanks for the hint!
This is what I have now:
function calc()
    λ_noms = sort(unique(vcat(7:0.1:9.5, 7.3:0.02:7.7, 7.4:0.005:7.6, 7.8:0.02:8.2, 7.98:0.005:8.02)))
    rel_sigmas = zeros(length(λ_noms))
    m = length(λ_noms)
    n = length(workers())
    for j in 0:div(m,n)
        procs = Future[]
        for (i, worker) in pairs(workers())
            λ_nom = λ_noms[i+n*j]
            proc = @spawnat worker rel_sigma(λ_nom = λ_nom)
            push!(procs, proc)
            if i+n*j == m
                break
            end
        end
        for (i, proc) in pairs(procs)
            rel_sigmas[i+n*j] = fetch(proc)
            if i+n*j == m
                break
            end
        end
    end
    λ_noms, rel_sigmas
end
             
            
              
              
              
            
            
           
          
            
            
              You could replace @spawnat worker ... here with Dagger.@spawn ... if you wanted to use Dagger.jl for multithreading and distributed computing simultaneously. You’d also need to change to procs = Dagger.EagerThunk[] (or just Any type it) to match Dagger.@spawn’s return type.
             
            
              
              
              
            
            
           
          
            
            
              Indeed, pmap does the job fine:
@everywhere function rel_sigma1(λ_nom)
    rel_sigma(λ_nom = λ_nom)
end
function calc_rel_sigmas(λ_noms)
    pmap(rel_sigma1, λ_noms)
end
λ_noms = sort(unique(vcat(7:0.1:9.5)))
rel_sigmas = calc_rel_sigmas(λ_noms)
Findings so far:
- using threads I achieve a speed-up by a factor of two
 
- using pmap I achieve a speed-up by a factor of 5.5
 
On a machine with 16 cores, where in theory a speed-up of 13.1 should be possible. 13 and not 16 because the cpu frequency with all cores active drops from 5.5 GHz to 4.5 GHz.
This is already quite nice. I think that I cannot achieve a higher speedup is mainly due to the limited CPU cache size.
Other finding: I had to add the lines to the function rel_sigma
    # do a garbage collection if less than 6GB free RAM
    if Sys.free_memory()/2^30 < 6.0
        GC.gc()
    end
to avoid an out-of-memory error. There seams to be a bug in the garbage collector that it does not work correctly when you run many processes in parallel.
             
            
              
              
              5 Likes
            
            
           
          
            
            
              I noticed large memory use in parallel workloads and have been trying to work out what was causing it.
I suspected the garbage collector but good to know someone else has come to the same/similar conclusion!