From multithreading to distributed

I have the following code, that works well:

# Dummy function, the real code makes heavy use of ModelingToolkit and DifferentialEquations
function rel_sigma(;λ_nom)
    sleep(1)
    rand()
end

λ_noms = sort(unique(vcat(7:0.1:9.5, 7.8:0.02:8.2, 7.98:0.005:8.02)))
rel_sigmas = zeros(length(λ_noms))
Threads.@threads for i in eachindex(λ_noms, rel_sigmas)
    λ_nom = λ_noms[i]
    println("lambda_nom: $λ_nom")
    rel_sigma_ = 100 * rel_sigma(λ_nom=λ_nom)
    rel_sigmas[i] = rel_sigma_
end

The weak point of multi-threading in my use case is, that I only have a speed gain of a factor of two, even though I have 16 cores, mainly because of the garbage collection.

Now I want to try to use multiple processes.

This works:

using Distributed

@everywhere rel_sigma(λ_nom=8)

But how can I achieve that:

  • each worker processes a different input value
  • the results are returned to the main process

Any hints welcome!

I have a first code that works for two workers:

function calc()
    λ_noms = sort(unique(vcat(7:0.1:9.5)))
    rel_sigmas = zeros(length(λ_noms))

    a = @spawnat 1 rel_sigma(λ_nom = λ_noms[1])
    b = @spawnat 2 rel_sigma(λ_nom = λ_noms[2])
    rel_sigmas[1] = fetch(a)
    rel_sigmas[2] = fetch(b)
    rel_sigmas[1:2]
end

calc()

Can’t you just use pmap?

I will check pmap, thanks for the hint!
This is what I have now:

function calc()
    λ_noms = sort(unique(vcat(7:0.1:9.5, 7.3:0.02:7.7, 7.4:0.005:7.6, 7.8:0.02:8.2, 7.98:0.005:8.02)))
    rel_sigmas = zeros(length(λ_noms))
    m = length(λ_noms)
    n = length(workers())
    for j in 0:div(m,n)
        procs = Future[]
        for (i, worker) in pairs(workers())
            λ_nom = λ_noms[i+n*j]
            proc = @spawnat worker rel_sigma(λ_nom = λ_nom)
            push!(procs, proc)
            if i+n*j == m
                break
            end
        end
        for (i, proc) in pairs(procs)
            rel_sigmas[i+n*j] = fetch(proc)
            if i+n*j == m
                break
            end
        end
    end
    λ_noms, rel_sigmas
end

You could replace @spawnat worker ... here with Dagger.@spawn ... if you wanted to use Dagger.jl for multithreading and distributed computing simultaneously. You’d also need to change to procs = Dagger.EagerThunk[] (or just Any type it) to match Dagger.@spawn’s return type.

Indeed, pmap does the job fine:

@everywhere function rel_sigma1(λ_nom)
    rel_sigma(λ_nom = λ_nom)
end

function calc_rel_sigmas(λ_noms)
    pmap(rel_sigma1, λ_noms)
end

λ_noms = sort(unique(vcat(7:0.1:9.5)))
rel_sigmas = calc_rel_sigmas(λ_noms)

Findings so far:

  • using threads I achieve a speed-up by a factor of two
  • using pmap I achieve a speed-up by a factor of 5.5

On a machine with 16 cores, where in theory a speed-up of 13.1 should be possible. 13 and not 16 because the cpu frequency with all cores active drops from 5.5 GHz to 4.5 GHz.

This is already quite nice. I think that I cannot achieve a higher speedup is mainly due to the limited CPU cache size.

Other finding: I had to add the lines to the function rel_sigma

    # do a garbage collection if less than 6GB free RAM
    if Sys.free_memory()/2^30 < 6.0
        GC.gc()
    end

to avoid an out-of-memory error. There seams to be a bug in the garbage collector that it does not work correctly when you run many processes in parallel.

3 Likes

I noticed large memory use in parallel workloads and have been trying to work out what was causing it.

I suspected the garbage collector but good to know someone else has come to the same/similar conclusion!