@parallel for and more complex operations

Hi,
I’m starting to appreciate the benefits and simplicity of @parallel for in my codes… sometimes these really run fast, but I have seen that the operation you can perform on the result of each iteration in the for is relatively simple -or I have not understood hot to make it do more complex things.
For exemple, we can have something like

@everywhere function fsq(x)
  return x*x
end

@parallel (+) for i in 1:1000
   fsq(i)
end

and that simply adds the result of the parallel sum on each term. So far, so good… but what happens if I want to perform, say the sum of the logarithms of the different terms? I know I can modify the fsq(x) function to return the log of x^2, but I’m not looking for that kind of solutions… what I want to know is if there is a way to replace the (+) in the

@parallel (+) for i in 1:1000

with something like

@parallel sum(log()) for i in 1:1000

…which obviously does not work as I wrote it :frowning:

Thanks in advance,

Ferran.

Can’t you just put the transformation (log above) in the function of the loop body?

No… We’ll yes but of course that was simply an example, what I was really asking was for the possibility of using more involved functions in the parallel operation loop. Can a user-defined function be employed here? Like
‘’’
@parallel my_function for I in 1:1000
‘’’
Thanks a lot…

As you can see it is possible, but the reduction is done on each proc then they get combined. So proc 2 is in charge of 1:4, proc 3 in charge of 5:8 and proc 4 in charge of 9:12. So if your function is not invariant to permutations, you will get an unexpected answer. The first input is the cumulative and the second is an individual entry.

julia> @everywhere h(x,y) = begin; @show x, y; x*y; end

julia> a = @parallel (h) for i in 1:12
           i
       end
        From worker 3:  (x, y) = (5, 6)
        From worker 2:  (x, y) = (1, 2)
        From worker 4:  (x, y) = (9, 10)
        From worker 4:  (x, y) = (90, 11)
        From worker 4:  (x, y) = (990, 12)
        From worker 3:  (x, y) = (30, 7)
        From worker 3:  (x, y) = (210, 8)
(x, y) = (24, 1680)
(x, y) = (40320, 11880)
        From worker 2:  (x, y) = (2, 3)
479001600       From worker 2:  (x, y) = (6, 4)

julia> a == prod(1:12)
true

julia> @everywhere g(x,y) = begin; @show x, y; x+log(y); end

julia> a = @parallel (g) for i in 1:12
           i
       end
        From worker 3:  (x, y) = (5, 6)
        From worker 2:  (x, y) = (1, 2)
        From worker 2:  (x, y) = (1.6931471805599454, 3)
        From worker 2:  (x, y) = (2.791759469228055, 4)
        From worker 4:  (x, y) = (9, 10)
        From worker 4:  (x, y) = (11.302585092994047, 11)
        From worker 4:  (x, y) = (13.700480365792417, 12)
(x, y) = (4.178053830347945, 10.817111159963204)
(x, y) = (6.5591830773566455, 16.185387015580417)
        From worker 3:  (x, y) = (6.7917594692280545, 7)
9       From worker 3:  (x, y) = (8.737669618283368, 8).
343291876440341

julia> a ≈ sum(log(i) for i in 1:12)
false

I will try this, thanks… But you got the exact point, as I’m willing to process permutation-invariant problems ñ this way…

Could be worth dry running it on paper to make sure it does what is expected. Probably permutation invariance is not the best word to describe the condition the reduction has to follow. More like it should be hierarchically invariant. That is the reduction can happen hierarchically without affecting the end results.

The @parallel exists in order to solve a concrete problem. You example sum(log()) does not fit what @parallel for does.

The @parallel macro rewrites your for loop as a call to the function Base.Distributed.preduce. Then this function essentially

  • generates chunks (as many workers you have)
  • applies whatever you have inside the loop at each element of each chunk. Let us assume inside the for loop you have some_func(i).
    • This is done at the same time, so some_func(chunk1[1]), some_func(chunk2[1]) ,… are computed independently by independent workers.
  • Then the partial results of all chunks are joined. This is done by a reduce operation.

Since log is not a reduce operation it is not meant to be used in this scenario. This is suitable for operations that given a pair return a single value such as *(a,b) = a*b, +(a,b) = a+b. But… what is log(a,b) ?

If you look at the source code of the preduce function you will understand why log is not suitable:

function preduce(reducer, f, R)
    N = length(R)
    chunks = splitrange(N, nworkers())
    all_w = workers()[1:length(chunks)]

    w_exec = Task[]
    for (idx,pid) in enumerate(all_w)
        t = Task(()->remotecall_fetch(f, pid, reducer, R, first(chunks[idx]), last(chunks[idx])))
        schedule(t)
        push!(w_exec, t)
    end
    reduce(reducer, [wait(t) for t in w_exec]) # what is log of an array?
end

You have to do what you said (write the log inside the function) to use this macro.

Another example:

@everywhere some_complicated_function_per_iteration(x) = (log(x+1)+(x+2))/x
@parallel (+) for i in 1:100000; some_complicated_function_per_iteration(i) end 
1 Like

David,
…replace sum(log()) with sum(log.()) and it makes sense -st least on s tuple or array.

It might seem that it makes sense but actually, as fas as I understand the code, what is computed in parallel is what is inside the for loop, not the +. Therefore, if you want to compute the log elementwise you are better of putting the log inside the function.

In my example what it takes more time is some_complicated_function_per_iteration and this is what gets parallelized.

@everywhere some_complicated_function_per_iteration(x) = (log(x+1)+(x+2))/x
@parallel (+) for i in 1:100000; some_complicated_function_per_iteration(i) end 

I think I understand what you mean, but I think that, if you want to use this macro, you have to adapt your code and do the log (or whatever else you want to do at each iteration) inside a function. Then call this function inside the loop.

Maybe if you put an example that is challenging you we can give you a hint. And maybe @parallel is not the best solution and you could use Threads.