Parallel reductions on CPUs with different speeds

I am looking for how to “load balance” parallel reduction on multiple CPUs that have different speeds. The following snippet illustrates the problem:

using Distributed
addprocs(3)

res = @time @distributed (+) for (t, n) in collect(zip(1:9, vcat([10], ones(8))))
    println("Task $(t) sleeping: $(n) s")
    sleep(n)
    n
end

which yields

      From worker 2:	Task 1 sleeping: 10.0 s
      From worker 3:	Task 4 sleeping: 1.0 s
      From worker 4:	Task 7 sleeping: 1.0 s
      From worker 3:	Task 5 sleeping: 1.0 s
      From worker 4:	Task 8 sleeping: 1.0 s
      From worker 3:	Task 6 sleeping: 1.0 s
      From worker 4:	Task 9 sleeping: 1.0 s
      From worker 2:	Task 2 sleeping: 1.0 s
      From worker 2:	Task 3 sleeping: 1.0 s
 12.243273 seconds (510.34 k allocations: 25.608 MiB)
18.0

That is, worker 2 still performs task 2 and 3, while workers 3 and 4 are doing nothing. What can be done in this situation so that worker 3 and 4 performs task 2 and 3?

To quote from a comment in the source associated with @distributed: “Statically split range [1,N] into equal sized chunks for np processors”, so here worker 2 gets tasks 1-3, 3 gets 4,5,6, and 4 gets 7,8,9. You could deal with this yourself using @spawn, but it would be more work for you than using @distributed.

pmap does this load balancing for you, but it works a bit differently – jobs are submitted one by one to free workers instead of in batches as in the distributed for loop.

but can one do the reduction on each worker with pmap? with pmap, isnt the result of each task sent back to master?

This is true. One solution is to not return the value in the function applied, but to push it to a RemoteChannel on each worker, and asynchronously run a reduction operation. I’m not sure if there is a standard function for this.

Maybe this is helpful for you (disclaimer: I have not used it so far):

Edit: just realized that your question is about multiprocessing, not multithreading. Therefore this is probably not helpful for you.

Ok, so this is an implementation mapreduce using RemoteChannel.

@everywhere function do_work(f, op, jobs, res)
    args = try
        take!(jobs)
    catch InvalidStateException
        # No work to do
        return nothing
    end

    v = f(args...)

    while true
        args = try
            take!(jobs)
        catch InvalidStateException
            # We are done
            break
        end
        v = op(v, f(args...))
    end
    put!(res, v)
end
function pmapreduce(f, op, itr; nbuf::Int = 100)

    jobs = RemoteChannel(()-> Channel(length(itr)))
    all_w = workers()[1:min(nworkers(), length(itr))]
    res = RemoteChannel(()-> Channel(length(all_w)))

    for pid in all_w
        remotecall(do_work, pid, f, op, jobs, res)
    end

    # Make jobs
    for job_arg in itr
        put!(jobs, job_arg)
    end
    close(jobs)

    # Collect results
    v = take!(res)
    if length(all_w) > 1
        for n in 2:length(all_w)
            v = op(v, take!(res))
        end
    end
    close(res)
    return v
end
@everywhere function f(n)
    println("Task sleeping: $(n) s")
    sleep(n)
    n
end

input = vcat([10], ones(8))
res = @time pmapreduce(f, +,  input)
println(res)

res = @time @distributed (+) for (t, n) in collect(zip(1:9, input))
    println("Task $(t) sleeping: $(n) s")
    sleep(n)
    n
end

yielding

julia> include("pmapreduce.jl")
      From worker 3:	Task sleeping: 1.0 s
      From worker 4:	Task sleeping: 10.0 s
      From worker 2:	Task sleeping: 1.0 s
      From worker 3:	Task sleeping: 1.0 s
      From worker 2:	Task sleeping: 1.0 s
      From worker 3:	Task sleeping: 1.0 s
      From worker 2:	Task sleeping: 1.0 s
      From worker 3:	Task sleeping: 1.0 s
      From worker 2:	Task sleeping: 1.0 s
 10.297645 seconds (232.16 k allocations: 11.238 MiB, 0.15% gc time)
18.0
      From worker 3:	Task 4 sleeping: 1.0 s
      From worker 4:	Task 7 sleeping: 1.0 s
      From worker 2:	Task 1 sleeping: 10.0 s
      From worker 3:	Task 5 sleeping: 1.0 s
      From worker 4:	Task 8 sleeping: 1.0 s
      From worker 3:	Task 6 sleeping: 1.0 s
      From worker 4:	Task 9 sleeping: 1.0 s
      From worker 2:	Task 2 sleeping: 1.0 s
      From worker 2:	Task 3 sleeping: 1.0 s
 12.090770 seconds (65.97 k allocations: 3.238 MiB)
18.0
1 Like