I am looking for how to “load balance” parallel reduction on multiple CPUs that have different speeds. The following snippet illustrates the problem:
using Distributed
addprocs(3)
res = @time @distributed (+) for (t, n) in collect(zip(1:9, vcat([10], ones(8))))
println("Task $(t) sleeping: $(n) s")
sleep(n)
n
end
which yields
From worker 2: Task 1 sleeping: 10.0 s
From worker 3: Task 4 sleeping: 1.0 s
From worker 4: Task 7 sleeping: 1.0 s
From worker 3: Task 5 sleeping: 1.0 s
From worker 4: Task 8 sleeping: 1.0 s
From worker 3: Task 6 sleeping: 1.0 s
From worker 4: Task 9 sleeping: 1.0 s
From worker 2: Task 2 sleeping: 1.0 s
From worker 2: Task 3 sleeping: 1.0 s
12.243273 seconds (510.34 k allocations: 25.608 MiB)
18.0
That is, worker 2 still performs task 2 and 3, while workers 3 and 4 are doing nothing. What can be done in this situation so that worker 3 and 4 performs task 2 and 3?
To quote from a comment in the source associated with @distributed: “Statically split range [1,N] into equal sized chunks for np processors”, so here worker 2 gets tasks 1-3, 3 gets 4,5,6, and 4 gets 7,8,9. You could deal with this yourself using @spawn, but it would be more work for you than using @distributed.
pmap does this load balancing for you, but it works a bit differently – jobs are submitted one by one to free workers instead of in batches as in the distributed for loop.
but can one do the reduction on each worker with pmap? with pmap, isnt the result of each task sent back to master?
This is true. One solution is to not return the value in the function applied, but to push it to a RemoteChannel on each worker, and asynchronously run a reduction operation. I’m not sure if there is a standard function for this.
Maybe this is helpful for you (disclaimer: I have not used it so far):
Edit: just realized that your question is about multiprocessing, not multithreading. Therefore this is probably not helpful for you.
Ok, so this is an implementation mapreduce using RemoteChannel.
@everywhere function do_work(f, op, jobs, res)
args = try
take!(jobs)
catch InvalidStateException
# No work to do
return nothing
end
v = f(args...)
while true
args = try
take!(jobs)
catch InvalidStateException
# We are done
break
end
v = op(v, f(args...))
end
put!(res, v)
end
function pmapreduce(f, op, itr; nbuf::Int = 100)
jobs = RemoteChannel(()-> Channel(length(itr)))
all_w = workers()[1:min(nworkers(), length(itr))]
res = RemoteChannel(()-> Channel(length(all_w)))
for pid in all_w
remotecall(do_work, pid, f, op, jobs, res)
end
# Make jobs
for job_arg in itr
put!(jobs, job_arg)
end
close(jobs)
# Collect results
v = take!(res)
if length(all_w) > 1
for n in 2:length(all_w)
v = op(v, take!(res))
end
end
close(res)
return v
end
@everywhere function f(n)
println("Task sleeping: $(n) s")
sleep(n)
n
end
input = vcat([10], ones(8))
res = @time pmapreduce(f, +, input)
println(res)
res = @time @distributed (+) for (t, n) in collect(zip(1:9, input))
println("Task $(t) sleeping: $(n) s")
sleep(n)
n
end
yielding
julia> include("pmapreduce.jl")
From worker 3: Task sleeping: 1.0 s
From worker 4: Task sleeping: 10.0 s
From worker 2: Task sleeping: 1.0 s
From worker 3: Task sleeping: 1.0 s
From worker 2: Task sleeping: 1.0 s
From worker 3: Task sleeping: 1.0 s
From worker 2: Task sleeping: 1.0 s
From worker 3: Task sleeping: 1.0 s
From worker 2: Task sleeping: 1.0 s
10.297645 seconds (232.16 k allocations: 11.238 MiB, 0.15% gc time)
18.0
From worker 3: Task 4 sleeping: 1.0 s
From worker 4: Task 7 sleeping: 1.0 s
From worker 2: Task 1 sleeping: 10.0 s
From worker 3: Task 5 sleeping: 1.0 s
From worker 4: Task 8 sleeping: 1.0 s
From worker 3: Task 6 sleeping: 1.0 s
From worker 4: Task 9 sleeping: 1.0 s
From worker 2: Task 2 sleeping: 1.0 s
From worker 2: Task 3 sleeping: 1.0 s
12.090770 seconds (65.97 k allocations: 3.238 MiB)
18.0
1 Like