How should I implement parallel maximum likelihood?

In my maximum likelihood problem, I have a fixed collection of data and I want to evaluate a complicated likelihood function on it for different parameter values until I find the best one. The simplest solution I’ve found is to chunk the data and use pmap over the chunks, and repeat for each parameter value. It’s IID data, so there is no need to communicate between the processes.

However, the data isn’t changing. I would like to send the data chunks to the nodes once, and repeatedly send them new parameter values and receive likelihood contributions in return.

I saw this topic How to avoid repeated data movement between processes? - #3 by Michael_Eastwood suggesting the use of a CachingPool, but I don’t really understand what it does or if it’s relevant here.

Has anyone done this? I think this is a pretty standard problem, so I wouldn’t be surprised if there is already code out there which does this efficiently.

The general idea is to capture the data you only want to move once in a closure. CachingPool will move the closure (and the corresponding data) to each worker only once. I’ve constructed an artificial example below where the cost of moving the data is greater than the cost of doing the computation. Therefore using pmap without a CachingPool loses to regular-old map. The CachingPool successfully gets rid of this overhead.

Hope this helps!

julia> addprocs(2)
2-element Array{Int64,1}:
 2
 3

julia> big_data = randn(10000, 10000);

julia> function run_serial(big_data, N)
           map(sum, Iterators.repeated(big_data, N))
       end
run_serial (generic function with 1 method)

julia> function run_parallel(big_data, N)
           pmap(sum, Iterators.repeated(big_data, N))
       end
run_parallel (generic function with 1 method)

julia> function run_cached(big_data, N)
           closure(x) = sum(big_data)
           pmap(CachingPool(workers()), closure, 1:N)
       end
run_cached (generic function with 1 method)

julia> @time run_serial(big_data, 100);
 10.128203 seconds (108 allocations: 4.219 KiB)

julia> @time run_parallel(big_data, 100);
 30.426528 seconds (8.80 k allocations: 308.719 KiB)

julia> @time run_cached(big_data, 100);
  3.634198 seconds (9.67 k allocations: 346.922 KiB)
7 Likes

I see. The CachingPool contains the data in the closure. It is still unclear to me how it decides which data to send. Is it all the data available to the function?

For my use case I need each process to use a different chunk of the data. I found a way to do this below by closing over a vector of arrays and passing an index to each process. However, I’m pretty sure this still sends the entire dataset to each node, when only a chunk is necessary. This should be fine because it’s only sent once, but it might be a problem if the data was really huge.

The example below looks more like a maximum likelihood problem because I loop over multiple thetas, keeping the same data. The CachingPool seems to work very well. The parallel speedup is close to the maximum for 4 cores.

julia> addprocs(4)
4-element Array{Int64,1}:
 2
 3
 4
 5

julia> function run_serial(big_data)
           f(theta, n) = sum(x^theta for x in big_data[n])
           x = zeros(4)
           b = ones(4)
           for theta in 1:.01:2
               x = map(f, theta*b, 1:4)
           end
           sum(x)
       end
run_serial (generic function with 1 method)

julia> function run_parallel(big_data)
           f(theta, n) = sum(x^theta for x in big_data[n])
           x = zeros(4)
           b = ones(4)
           for theta in 1:.01:2
               x = pmap(f, theta*b, 1:4)
           end
           sum(x)
       end
run_parallel (generic function with 1 method)

julia> function run_cached(big_data)
           f(theta, n) = sum(x^theta for x in big_data[n])
           wp = CachingPool(workers())
           x = zeros(4)
           b = ones(4)
           for theta in 1:.01:2
               x = pmap(wp, f, theta*b, 1:4)
           end
           sum(x)
       end
run_cached (generic function with 1 method)

julia> big_data = [rand(100000), rand(100000), rand(100000), rand(100000)]
4-element Array{Array{Float64,1},1}:
 [0.634355, 0.528308, 0.201827, 0.769327, 0.962185, 0.188918, 0.096454, 0.00439114, 0.262142, 0.338069  …  0.568727, 0.114778, 0.913829, 0.428653, 0.288401, 0.465263, 0.73819, 0.62727, 0.934231, 0.586302] 
 [0.584522, 0.870068, 0.23399, 0.789817, 0.464353, 0.728496, 0.419617, 0.770256, 0.747527, 0.935073  …  0.0398883, 0.647269, 0.969051, 0.0330922, 0.639597, 0.430352, 0.687566, 0.100313, 0.779581, 0.104456]
 [0.923503, 0.437401, 0.127655, 0.30547, 0.298625, 0.266635, 0.856103, 0.0752927, 0.552258, 0.467141  …  0.0554492, 0.635918, 0.394429, 0.858632, 0.624893, 0.372602, 0.433335, 0.586451, 0.340953, 0.836536]
 [0.311625, 0.696548, 0.88832, 0.472612, 0.567086, 0.553389, 0.682172, 0.618924, 0.0917297, 0.279764  …  0.595471, 0.42165, 0.0448332, 0.475926, 0.622473, 0.0604174, 0.442415, 0.32164, 0.407013, 0.277669] 

julia> run_serial(big_data)
133398.25067928832

julia> run_parallel(big_data)
133398.25067928832

julia> run_cached(big_data)
133398.25067928832

julia> @time run_serial(big_data)
  3.005636 seconds (2.41 k allocations: 72.623 KiB)
133398.25067928832

julia> @time run_parallel(big_data)
  1.329272 seconds (100.20 k allocations: 9.717 MiB, 0.40% gc time)
133398.25067928832

julia> @time run_cached(big_data)
  0.861480 seconds (55.51 k allocations: 6.668 MiB)
133398.25067928832


Yes, so your example works something like this:

function run_cached(big_data)
    f(theta, n) = sum(x^theta for x in big_data[n]) # capture `big_data` in a closure
    wp = CachingPool(workers())
    x = zeros(4)
    b = ones(4)
    for theta in 1:.01:2
        x = pmap(wp, f, theta*b, 1:4) # compute `y = theta*b` locally
                                      # `f` (and *all* of its captured data) will be sent to each worker
                                      # for j = 1:length(y)
                                      #     send y[j] and (1:4)[j] to a worker
                                      #     evaluate f(y[j], (1:4)[j]) on the worker
                                      #     send the result back to master process
                                      #     store the result as x[j]
                                      # end
    end
    sum(x)
end

If you only want to send each worker the data that it will be working on, you might try something like

function run_cached(big_data)
    ...
    @sync for worker in workers()
        @async begin
            ...
            my_big_data = big_data[n]
            f(theta) = sum(x^theta for x in my_big_data)
            theta_range = 1:.01:2
            x = pmap(CachingPool([worker]), f, theta_range)
            ...
        end
    end
    ...
end

I haven’t tested this, so there might be some typos and bugs. The basic idea is that each worker is going to be sent a different closure with only the part of the data that it needs.

@async begin ... end creates and schedules a task such that when the call to pmap blocks, Julia can switch to a different task. We use @sync to make sure that all of the @async blocks have finished before moving on. If we did not use @sync / @async, we would end up waiting for one worker to completely finish before we could ask another worker to start working on its chunk.

2 Likes

Another alternative is Dagger, though it is a bit under-documented.

You could do something like this (I started the REPL with julia -p 4):

julia> using Dagger, BenchmarkTools

julia> data = rand(1000000);

# distribute data in 4 chunks of 250000 elements each
julia> ddata = distribute(data, fill(250000, 4));

# define your function
julia> @everywhere f(data) = sum(x^1.2 for x in data)

julia> f(data)
-999194.540417755

# To parallelize, compute the sum of log-likelihood separately on each chunk and then sum them together
julia> collect(Dagger.treereduce(delayed(+), map(delayed(f), ddata.chunks)))
454749.8482156226

julia> f(data) #serial implementation
454749.8482156202

julia> @btime collect(Dagger.treereduce(delayed(+), map(delayed($f), $ddata.chunks)))
  18.536 ms (2669 allocations: 146.45 KiB)
454749.8482156226

julia> @btime $f($data)
  57.585 ms (3 allocations: 48 bytes)
454749.8482156202

The more costly the likelihood function is to compute, the better the scaling.

EDIT: but it does seem like CachingPool gets better scaling, not sure where the overhead in Dagger comes from

I’m not sure how I’d use this approach for iteration. I need to return the likelihood to the optimization routine, then pick the next theta sequentially. I don’t know the set of thetas I want to evaluate in advance.

I’m sorry I don’t quite have time to type out a full answer, but if you don’t know all the thetas in advance you can replace the pmap with something like

pool = CachingPool(worker)
while not done
    result = remotecall_fetch(pool, f, theta)
    ...
end

If you do this, the master process will be the one deciding which theta to do next. If you’d rather the worker processes don’t have to communicate with the master, then you should modify f to loop over theta until you’ve found the right values.

You might also find the rest of my posts in the thread you linked in the op helpful:

2 Likes

Thanks. I need to think more about how to use remotecall_fetch. At least for this case, the data isn’t large enough for it to matter that each node copies the entire dataset.

I solved an actual maximum likelihood problem using a CachingPool below. It estimates the variance of a normal distribution. The CachingPool is really critical, without it the parallel version ends up slower than the serial one.

addprocs(4)
@everywhere using Distributions, Optim
nor = Normal(2, 5)
N = 4000000
big_data = [rand(nor, N), rand(nor, N), rand(nor, N), rand(nor, N)]
function run_cached(big_data, parallel = false)
    function loglik3(theta)
         x = map(loglik, Iterators.repeated(theta, 4), 1:4)
        -sum(x)
    end
    function loglik2(theta)
         x = pmap(wp, loglik, Iterators.repeated(theta, 4), 1:4)
        -sum(x)
    end
    function loglik(theta, n)
        nor_theta = Normal(2, theta)
        sum(logpdf(nor_theta, xx) for xx in big_data[n])
    end
    wp = CachingPool(workers())
    if parallel
        out = optimize(loglik2, 0., 12.)
    else
        out = optimize(loglik3, 0., 12.)
    end
end

julia> @time run_cached(big_data, true).minimizer
  2.232675 seconds (14.94 k allocations: 1.675 MiB)
4.999392209786717

julia> @time run_cached(big_data, false).minimizer
  7.751756 seconds (1.63 k allocations: 34.500 KiB)
4.999392209786717
1 Like