Hi,
I use the reduce()
function to sum all the parts of a distributed array.
function reduce(A::DArray{Float64,2})::Int
# Sum reduction : sums all partitions of A
# The result is on aggregator_worker
aggregator_worker = reduce((p1,p2) -> begin
fetch(@spawnat p1 begin
A_on_p2::Array{Float64,2} = @fetchfrom p2 A[:l]
A[:l] = A[:l] + A_on_p2
return nothing
end)
return p1
end, workers())
return aggregator_worker
end
My understanding is that this operation should be logarithmic dependent to the number processes. However in this code the dependency is linear. I see two pain points here:
-
The fetch
operation does not allow asynchronous calls (I need it to make sure the operation is done for the previous reduction). I can think of a solution to overcome this by storing the Future
object associated to the process p1
into a dictionary.
-
As far as I understand, the reduce
operation is using a flat tree (linearly) (see mapreduce_impl
). Is it possible to use a binary tree instead ?
In the end my questions are:
- Is there already a way to sum arrays that are on different nodes, which has a logarithmic dependency on the number of nodes ?
- If not, is there a way to do a local binary reduction using a binary tree ?
Thank you for your insight.
This is the code I came up with:
using Distributed
@everywhere using DistributedArrays
include("power_of_two.jl") # Introduces the is_pot() function
@everywhere include("src/darray.jl") # introduces the check_uniform_chunk_size() function
"""
Reduces the array `objs` to one using the binary operator `op`.
This uses a binary reduction tree.
"""
function binarytree_reduce(op, objs::Array{Int,1})
nobjs = length(objs)
# For now objs must contain a power of two objects
@assert is_pot(nobjs)
res = copy(objs)
# The level in the tree
level = 0
while 2^level < nobjs
for i in 1:2^(level+1):nobjs
res[i] = op(res[i], res[i+2^level])
end
level += 1
end
return res[1]
end
"""
Sum all the parts of a DistributedArray. They must be all of the same size.
The reduction is perform in-place, so the parameter `A` will be altered.
The result stand in the chunk of the worker which id is returned.
"""
function reduce_sum(A::DArray{Float64,2})::Int
nps = prod(size(procs(A)))
ps = reshape(procs(A), nps)
# All chunk must have the same size
check_uniform_chunk_size(A)
# This dictionary stores the `Future` objects
res = Dict()
# Define the operation reduction
agg = binarytree_reduce((p1,p2) -> begin
# Wait for results of the previous reductions on these objects
if p1 in keys(res)
fetch(pop!(res, p1))
end
if p2 in keys(res)
fetch(pop!(res, p2))
end
# Sum two chunks
res[p1] = @spawnat p1 begin
A[:l] += @fetchfrom p2 A[:l]
return nothing
end
return p1
end, ps)
# Wait for last reduction
fetch(res[agg])
# `agg` contains the proc id where the result of the reduction stands
return
Any feedback highly appreciated