@distributed (op) for with mutable types

I need to perform a map-reduce involving different variables, as in

a = b = 0
for n = 1:10
  a += n
  b += n*n
end

I want to do it in parallel, using @distributed, and I think a possible solution involves the definition of a new type to hold both values:

@everywhere struct T
  a::Int64
  b::Int64
end

@everywhere Base.:+(x::T, y::T) = T(x.a + y.a, x.b + y.b)

x = T(0, 0)
result = @distributed (+) for n = 1:10
  T(n, n*n)
end

That is a little involved, but I could not find a better solution…

My problem is that some of the variables I need to reduce are actually arrays and are therefore mutable. This makes the situation unclear even in the case of a single array. For example, if I start julia with julia -p 4 to have 4 workers, than this code does not perform as expected:

a = zeros(1)
@distributed (+) for n=1:10
  a[1] = n
  a
end

The result I get is 59 instead of the expected 55. Everything is fine if I use, as last line in the loop, copy(a), but this is clearly not optimal for the performance.

As far as I understand, this issue is related to the following piece of code:

a = zeros(1)
(a[1] = 1; a) + (a[1] = 2; a) ==> 4

In other words, a is modified by the last block, and the value 2 is duplicated. Is there a simple way for me to avoid this sort of issues? Wouldn’t it be a good idea anyway to have a way to specify reduction variables inside a @distributed loop, similarly to the way OpenMP works?

You may do this using DistributedArrays as

julia> @everywhere begin
       using OffsetArrays
       using DistributedArrays
       end

julia> d=DArray((10,)) do inds
           a = zeros(inds[1])
           for n in inds[1]
               a[n] = n
           end
           parent(a)
       end
10-element DArray{Float64,1,Array{Float64,1}}:
  1.0
  2.0
  3.0
  4.0
  5.0
  6.0
  7.0
  8.0
  9.0
 10.0

julia> sum(d)
55.0

You may also allocate only the reduced array, but this will be a bit more complicated. I had created ParallelUtilities.jl to help out with some of this, in this case to get the range that will be iterated over on each worker.

julia> @everywhere using ParallelUtilities

# We seek an iterator that splits the range evenly among the workers
# The signature is evenlyscatterproduct(tuple, nworkers, worker_rank)
# Here's an example:
julia> evenlyscatterproduct((1:10,),4,1) |> collect # first out of 4 workers
3-element Array{Tuple{Int64},1}:
 (1,)
 (2,)
 (3,)

julia> d=DArray((nworkers(),)) do inds
           rank = first(inds[1])
           a = zeros(1)
           tasks = evenlyscatterproduct((1:10,),nworkers(),rank) 
           for (n,) in tasks
               a[1] += n
           end
           a
       end
4-element DArray{Float64,1,Array{Float64,1}}:
  6.0
 15.0
 15.0
 19.0

julia> sum(d)
55.0

If you specifically want this operation then the package provides certain parallel mapreduce functions for convenience, eg

julia> pmapsum(x->[sum(el[1] for el in x)], 1:10)
1-element Array{Int64,1}:
 55

However this package isn’t as widely tested as DistributedArrays.

1 Like

Here is how you can do it with Transducers.jl:

using Transducers

a = zeros(1)
xf = Map() do n
    # The "loop body" of `@distributed op for ...` goes here:
    a[1] = n
    a
end

function rf!(x, y)
    x === y === nothing && return nothing
    x === nothing && return copy(y)  # copy the array _only_ for the first invocation
    y === nothing && return x
    x .+= y
    return x
end

dreduce(rf!, xf, 1:10; init=nothing)  # using Distributed as a backend

This copies the array once for each worker and reuse it for each invocation of rf!.

(In principle, you can use reduce(rf!, xf, 1:10; init=nothing) to switch to thread-based parallelism. However, a[1] = n in every thread is a data race so you can’t do this for the function like this.)

I think above definition of rf! is tricky to write. I’ve been thinking about how to make something like this easier to do. I think it’d be possible to do this by an API like

using Transducers
using BangBang: add!!

a = zeros(1)
xf = Map() do n
    a[1] = n
    readonly(a)
end

reduce(add!!, xf, 1:10)
1 Like

Thank you for your answer! Storing the full array is not an options: I have by far too many cycles and it would not be reasonable (large allocation and a lot of data transfer). Also, I feel that it is more reasonable to split the reduction among the various workers.

The other option, using ParallelUtilities.jl, is more promising: especially the last code is really simple! Still, I was hoping for something based on the standard Julia libraries, and coded using the @distributed macro (which I understand is more appropriate for short cycles), but I guess this is not possible at the moment.

Thank you! I did not know the Transducers, and it took me a little to figure out how they works, so thank you for mentioning them here and for taking the time to show me how to use them.

rf! is indeed not so obvious, at least at first. Again, I was thinking at a slightly more “direct” approach, and I still think that the way @distributed works is perhaps not general enough: to me it looks like the end user should be able to specify directly more than one reduction variable, and have the “correct” results also for mutable types. Perhaps my view is shared and someone more knowledgeable than me (not difficult to find…) will propose a neat solution to this problem.

I don’t think specifying the reduction variable (aka accumulator) is enough, exactly because of the reason why (a[1] = 1; a) + (a[1] = 2; a) “does not work” as you demonstrated in the OP.

I think what is missing is the ability to specify the “initial value” (identity/neutral element) of the reduction.

Note that

@distributed op for x in xs
  f(x)
end

is just a very verbose way to write mapreduce(f, op, xs). Furthermore, there is no init in @distributed while you can do mapreduce(f, op, xs; init=...). So, if you want something more general, I think you’d need mapreduce-like interface.

By the way, if you don’t mind specifying the accumulator type and size, you can also do:

using Transducers

a = zeros(1)
xf = Map() do n # same `xf`
    a[1] = n
    a
end

dreduce((a, b) -> (a .+= b), xf, 1:10; init=OnInit(() -> zeros(1)))

Yeah, I think I agree OpenMP-like syntax is a nice thing to have. But it’s not fully generic as this doesn’t help you write a bit more complex reduction like findmax. So I’ve been wondering if there is a nicer syntax with a bit more generality…

Thank you again!

Note that

@distributed op for x in xs
  f(x)
end

is just a very verbose way to write mapreduce(f, op, xs)

Yes, but I think mapreduce is not distributed, is it? Also, from the Julia documentation I read that pmap (and therefore I guess also a parallel version of mapreduce) should be used in situation where the function is slow to compute, while @distributed for is more appropriate for quick computation.

Yeah, I think I agree OpenMP-like syntax is a nice thing to have. But it’s not fully generic as this doesn’t help you write a bit more complex reduction like findmax . So I’ve been wondering if there is a nicer syntax with a bit more generality…

Is this not working?

@distributed (max) for i=1:r
  f(i)
end

I think that a simple change would make it return also the index i of the maximum, on top of the maximum value.

You are right. mapreduce in Base has no parallelism. Perhaps Distributed.jl should add it. (You can also use Transducers.dreduce(op, Map(f), itr))

This is not generally true. Functional approach can have much better performance characteristics (see, e.g., [RFC/ANN] FLoops.jl: fast generic for loops (foldl for humans™)).

You need two accumulators to interact to get this. So, you can’t just independently annotate each reduction variable.

You need two accumulators to interact to get this. So, you can’t just independently annotate each reduction variable.

This will do the job, I believee

@everywhere struct ArgMax
  i::Int64
  x::Float64
end
@everywhere Base.max(a::ArgMax, b::ArgMax) = b.x > a.x ? b : a

xs = map(args->ArgMax(args...), zip(1:100, rand(100)))
@distributed (max) for n=1:100
  xs[n]
end

But, again, the use of an ad-hoc type is not very elegant IMHO, and it would be nice to do this in a simpler way.

Incidentally, putting together all the input I got from this thread, I think it is actually possible to use @distributed (op) for with mutable types. I am not arguing one should do it, and perhaps the alternatives that have been suggested are better (again, I am too ignorant to judge), but I thought it is worth mentioning.

The problem is that

a = zeros(1)
(a[1] = 1; a) + (a[1] = 2; a) ==> 4

This, however, works

a = zeros(1)
(a[1] = 1; copy(a)) + (a[1] = 2; a) ==> 3

as well as

a = zeros(1)
((a[1] = 1; copy(a)) + (a[1] = 2; a)) + ((a[1] = 3; a) ==> 6

So, really, one needs to make a copy only for the first element in each worker. Therefore

a = zeros(1)
fst = true
@distributed (+) for n=1:10
  a[1] = n
  if fst
    fst == false
    copy(a)
  else
    a
  end
end

Presumably one could modify @distributed to make most of this “automatic”. Comments are very welcome.

This is exactly what my examples are doing. Also, you are relying on that a and fst implicitly becomes worker-local for this to work. I think this is non-composable and not a good practice. This is the reason why you can’t run the same code with multi-threading backend.

IMO @distributed is just too limited (which, of course, means it may be a preferred interface when doing simple things). Having something like OnInit would be a much cleaner approach.

You don’t have to do it. For example, for the first examples in the OP, you can do

broadcasting(f) = (args...) -> f.(args...)

@distributed broadcasting(+) for n in 1:10
    (n, n*n)
end

BTW, after https://github.com/JuliaLang/julia/pull/35706, it can be shorter:

@distributed (.+) for n in 1:10
    (n, n*n)
end

Likewise, you don’t need struct ArgMax. You can just create a reducing function that works with, e.g., tuples.

1 Like