How to sum the chunks of a distributed array using a binary reduction tree?

Hi,

I use the reduce() function to sum all the parts of a distributed array.

function reduce(A::DArray{Float64,2})::Int
  # Sum reduction : sums all partitions of A
  # The result is on aggregator_worker
  aggregator_worker = reduce((p1,p2) -> begin
      fetch(@spawnat p1 begin
              A_on_p2::Array{Float64,2} = @fetchfrom p2 A[:l]
              A[:l] = A[:l] + A_on_p2
              return nothing
            end)
      return p1
    end, workers())
  return aggregator_worker
end

My understanding is that this operation should be logarithmic dependent to the number processes. However in this code the dependency is linear. I see two pain points here:

  • The fetch operation does not allow asynchronous calls (I need it to make sure the operation is done for the previous reduction). I can think of a solution to overcome this by storing the Future object associated to the process p1 into a dictionary.

  • As far as I understand, the reduce operation is using a flat tree (linearly) (see mapreduce_impl). Is it possible to use a binary tree instead ?

In the end my questions are:

  • Is there already a way to sum arrays that are on different nodes, which has a logarithmic dependency on the number of nodes ?
  • If not, is there a way to do a local binary reduction using a binary tree ?

Thank you for your insight.

This is the code I came up with:

using Distributed
@everywhere using DistributedArrays

include("power_of_two.jl") # Introduces the is_pot() function
@everywhere include("src/darray.jl") # introduces the check_uniform_chunk_size() function

"""
Reduces the array `objs` to one using the binary operator `op`.
This uses a binary reduction tree.
"""
function binarytree_reduce(op, objs::Array{Int,1})
  nobjs = length(objs)

  # For now objs must contain a power of two objects
  @assert is_pot(nobjs)

  res = copy(objs)

  # The level in the tree
  level = 0
  while 2^level < nobjs
    for i in 1:2^(level+1):nobjs
      res[i] = op(res[i], res[i+2^level])
    end
    level += 1
  end
  return res[1]
end

"""
Sum all the parts of a DistributedArray. They must be all of the same size.

The reduction is perform in-place, so the parameter `A` will be altered.
The result stand in the chunk of the worker which id is returned.
"""
function reduce_sum(A::DArray{Float64,2})::Int
  nps = prod(size(procs(A)))
  ps = reshape(procs(A), nps)

  # All chunk must have the same size
  check_uniform_chunk_size(A)

  # This dictionary stores the `Future` objects
  res = Dict()

  # Define the operation reduction
  agg = binarytree_reduce((p1,p2) -> begin
      # Wait for results of the previous reductions on these objects
      if p1 in keys(res)
        fetch(pop!(res, p1))
      end
      if p2 in keys(res)
        fetch(pop!(res, p2))
      end

      # Sum two chunks
      res[p1] = @spawnat p1 begin
              A[:l] += @fetchfrom p2 A[:l]
              return nothing
            end
      return p1
    end, ps)

  # Wait for last reduction
  fetch(res[agg])

  # `agg` contains the proc id where the result of the reduction stands
  return

Any feedback highly appreciated