I am looking for how to “load balance” parallel reduction on multiple CPUs that have different speeds. The following snippet illustrates the problem:

```
using Distributed
addprocs(3)
res = @time @distributed (+) for (t, n) in collect(zip(1:9, vcat([10], ones(8))))
println("Task $(t) sleeping: $(n) s")
sleep(n)
n
end
```

which yields

```
From worker 2: Task 1 sleeping: 10.0 s
From worker 3: Task 4 sleeping: 1.0 s
From worker 4: Task 7 sleeping: 1.0 s
From worker 3: Task 5 sleeping: 1.0 s
From worker 4: Task 8 sleeping: 1.0 s
From worker 3: Task 6 sleeping: 1.0 s
From worker 4: Task 9 sleeping: 1.0 s
From worker 2: Task 2 sleeping: 1.0 s
From worker 2: Task 3 sleeping: 1.0 s
12.243273 seconds (510.34 k allocations: 25.608 MiB)
18.0
```

That is, worker 2 still performs task 2 and 3, while workers 3 and 4 are doing nothing. What can be done in this situation so that worker 3 and 4 performs task 2 and 3?

To quote from a comment in the source associated with `@distributed`

: “Statically split range [1,N] into equal sized chunks for np processors”, so here worker 2 gets tasks 1-3, 3 gets 4,5,6, and 4 gets 7,8,9. You could deal with this yourself using `@spawn`

, but it would be more work for you than using `@distributed`

.

`pmap`

does this load balancing for you, but it works a bit differently – jobs are submitted one by one to free workers instead of in batches as in the distributed for loop.

but can one do the reduction on each worker with `pmap`

? with `pmap`

, isnt the result of each task sent back to master?

This is true. One solution is to not return the value in the function applied, but to push it to a `RemoteChannel`

on each worker, and asynchronously run a reduction operation. I’m not sure if there is a standard function for this.

Maybe this is helpful for you (disclaimer: I have not used it so far):

Edit: just realized that your question is about multiprocessing, not multithreading. Therefore this is probably not helpful for you.

Ok, so this is an implementation `mapreduce`

using `RemoteChannel`

.

```
@everywhere function do_work(f, op, jobs, res)
args = try
take!(jobs)
catch InvalidStateException
# No work to do
return nothing
end
v = f(args...)
while true
args = try
take!(jobs)
catch InvalidStateException
# We are done
break
end
v = op(v, f(args...))
end
put!(res, v)
end
function pmapreduce(f, op, itr; nbuf::Int = 100)
jobs = RemoteChannel(()-> Channel(length(itr)))
all_w = workers()[1:min(nworkers(), length(itr))]
res = RemoteChannel(()-> Channel(length(all_w)))
for pid in all_w
remotecall(do_work, pid, f, op, jobs, res)
end
# Make jobs
for job_arg in itr
put!(jobs, job_arg)
end
close(jobs)
# Collect results
v = take!(res)
if length(all_w) > 1
for n in 2:length(all_w)
v = op(v, take!(res))
end
end
close(res)
return v
end
@everywhere function f(n)
println("Task sleeping: $(n) s")
sleep(n)
n
end
input = vcat([10], ones(8))
res = @time pmapreduce(f, +, input)
println(res)
res = @time @distributed (+) for (t, n) in collect(zip(1:9, input))
println("Task $(t) sleeping: $(n) s")
sleep(n)
n
end
```

yielding

```
julia> include("pmapreduce.jl")
From worker 3: Task sleeping: 1.0 s
From worker 4: Task sleeping: 10.0 s
From worker 2: Task sleeping: 1.0 s
From worker 3: Task sleeping: 1.0 s
From worker 2: Task sleeping: 1.0 s
From worker 3: Task sleeping: 1.0 s
From worker 2: Task sleeping: 1.0 s
From worker 3: Task sleeping: 1.0 s
From worker 2: Task sleeping: 1.0 s
10.297645 seconds (232.16 k allocations: 11.238 MiB, 0.15% gc time)
18.0
From worker 3: Task 4 sleeping: 1.0 s
From worker 4: Task 7 sleeping: 1.0 s
From worker 2: Task 1 sleeping: 10.0 s
From worker 3: Task 5 sleeping: 1.0 s
From worker 4: Task 8 sleeping: 1.0 s
From worker 3: Task 6 sleeping: 1.0 s
From worker 4: Task 9 sleeping: 1.0 s
From worker 2: Task 2 sleeping: 1.0 s
From worker 2: Task 3 sleeping: 1.0 s
12.090770 seconds (65.97 k allocations: 3.238 MiB)
18.0
```

1 Like