# How to sum the chunks of a distributed array using a binary reduction tree?

Hi,

I use the `reduce()` function to sum all the parts of a distributed array.

``````function reduce(A::DArray{Float64,2})::Int
# Sum reduction : sums all partitions of A
# The result is on aggregator_worker
aggregator_worker = reduce((p1,p2) -> begin
fetch(@spawnat p1 begin
A_on_p2::Array{Float64,2} = @fetchfrom p2 A[:l]
A[:l] = A[:l] + A_on_p2
return nothing
end)
return p1
end, workers())
return aggregator_worker
end
``````

My understanding is that this operation should be logarithmic dependent to the number processes. However in this code the dependency is linear. I see two pain points here:

• The `fetch` operation does not allow asynchronous calls (I need it to make sure the operation is done for the previous reduction). I can think of a solution to overcome this by storing the `Future` object associated to the process `p1` into a dictionary.

• As far as I understand, the `reduce` operation is using a flat tree (linearly) (see `mapreduce_impl`). Is it possible to use a binary tree instead ?

In the end my questions are:

• Is there already a way to sum arrays that are on different nodes, which has a logarithmic dependency on the number of nodes ?
• If not, is there a way to do a local binary reduction using a binary tree ?

This is the code I came up with:

``````using Distributed
@everywhere using DistributedArrays

include("power_of_two.jl") # Introduces the is_pot() function
@everywhere include("src/darray.jl") # introduces the check_uniform_chunk_size() function

"""
Reduces the array `objs` to one using the binary operator `op`.
This uses a binary reduction tree.
"""
function binarytree_reduce(op, objs::Array{Int,1})
nobjs = length(objs)

# For now objs must contain a power of two objects
@assert is_pot(nobjs)

res = copy(objs)

# The level in the tree
level = 0
while 2^level < nobjs
for i in 1:2^(level+1):nobjs
res[i] = op(res[i], res[i+2^level])
end
level += 1
end
return res
end

"""
Sum all the parts of a DistributedArray. They must be all of the same size.

The reduction is perform in-place, so the parameter `A` will be altered.
The result stand in the chunk of the worker which id is returned.
"""
function reduce_sum(A::DArray{Float64,2})::Int
nps = prod(size(procs(A)))
ps = reshape(procs(A), nps)

# All chunk must have the same size
check_uniform_chunk_size(A)

# This dictionary stores the `Future` objects
res = Dict()

# Define the operation reduction
agg = binarytree_reduce((p1,p2) -> begin
# Wait for results of the previous reductions on these objects
if p1 in keys(res)
fetch(pop!(res, p1))
end
if p2 in keys(res)
fetch(pop!(res, p2))
end

# Sum two chunks
res[p1] = @spawnat p1 begin
A[:l] += @fetchfrom p2 A[:l]
return nothing
end
return p1
end, ps)

# Wait for last reduction
fetch(res[agg])

# `agg` contains the proc id where the result of the reduction stands
return
``````

Any feedback highly appreciated