@everywhere and pmap inside of a function?

Could you please explain the reason why this code is failing and how I could fix it? This is an example of an embarrassingly parallel loop with no data movement, the easiest thing possible in parallel programming:

function bigfunc(iterator)
  localvar = 2.5

  @everywhere function smallfunc(elm)
    # hypothetical (expensive) operation with elm
    # (in my code this is solving a linear system)
    localvar * elm
  end

  pmap(smallfunc, iterator)
end

bigfunc(1:10)

ERROR: UndefVarError: localvar not defined

Nevermind, I had a typo in the code. But still I have a question, do we have to add a @everywhere in front of each variable used inside of smallfunc? Is this copying the object to other processes or just creating a reference? How expensive is this?

Use an anonymous function.

addprocs()

function bigfunc(iterator)
  localvar = 2.5

  smallfunc = function (elm)
    # hypothetical (expensive) operation with elm
    # (in my code this is solving a linear system)
    localvar * elm
  end

  pmap(smallfunc, iterator)
end

bigfunc(1:10)

Note that if you do this, the data will be sent each time. To make this more efficient, you should use a CachingPool.

Example:

addprocs()

function bigfunc(iterator)
  localvar = 2.5

  smallfunc = function (elm)
    # hypothetical (expensive) operation with elm
    # (in my code this is solving a linear system)
    localvar * elm
  end
  wp = CachingPool(workers())
  pmap(wp,smallfunc, iterator)
end

bigfunc(1:10)

Note that this should become the default soon. Details:

https://github.com/JuliaLang/julia/issues/21946
https://github.com/JuliaLang/julia/pull/22843

2 Likes

Awesome @ChrisRackauckas, are anonymous functions equivalent to lambda expressions x -> 2x?

The trick is that anonymous functions capture the variables in this so called closure, right? Will the CachingPool be available in Julia v0.7 do you think?

Yes.

f = x -> 2x

and

f = function (x)
  2x
end

mean the same thing, with the second being the multi-line version of the first (like the difference between f(x) = 2x vs function f(x); 2x; end, except with anonymous functions).

Yes, for sure. The question is if this will be done automatically or still be required by the user to do for performance. Looks like it will be the latter, but your code will still work.

1 Like

This also runs fine for me, even though ā€œsmallfuncā€ isnā€™t anonymous:

julia> function bigfunc(iterator)
         localvar = 2.5

         function smallfunc(elm)
           # hypothetical (expensive) operation with elm
           # (in my code this is solving a linear system)
           localvar * elm
         end

         pmap(smallfunc, iterator)
       end
bigfunc (generic function with 1 method)

julia> bigfunc(1:10)
10-element Array{Float64,1}:
  2.5
  5.0
  7.5
 10.0
 12.5
 15.0
 17.5
 20.0
 22.5
 25.0
1 Like

Interesting @Elrod, thanks for sharing. @ChrisRackauckas, could you please comment on this, what is happening in this case?

There is definitely lots to ask about the behavior of pmap, bellow are three working versions for which the runtime is approximately the same:

using BenchmarkTools

function bigfunc1(iterator, arg)
  localvar = 2.5

  smallfunc = function (elm)
    localvar * elm + arg
  end

  wp = CachingPool(workers())
  pmap(wp, smallfunc, iterator)
end

function bigfunc2(iterator, arg)
  localvar = 2.5

  smallfunc = function (elm)
    localvar * elm + arg
  end

  pmap(smallfunc, iterator)
end

function bigfunc3(iterator, arg)
  localvar = 2.5

  function smallfunc(elm)
    localvar * elm + arg
  end

  pmap(smallfunc, iterator)
end

@benchmark bigfunc1(1:1000, 1.)
@benchmark bigfunc2(1:1000, 1.)
@benchmark bigfunc3(1:1000, 1.)

If you can explain what is happening in each case, step by step, this would be very helpful to future readers. Right now I have little clue about what data is being copied or not.

I am trying to decide for one version of this pattern, but before I commit to one of them in my package, I was hoping for someone with more understanding of the internals to comment on the pros and cons.

I donā€™t have an understanding of the internals, but I replaced all smallfunc bodies with:

localvar * elm[1] + arg

and then ran:

julia> iter = [randn(1000,1000) for i āˆˆ 1:200];

julia> @benchmark bigfunc1($iter, 1.)
BenchmarkTools.Trial: 
  memory estimate:  785.48 KiB
  allocs estimate:  22081
  --------------
  minimum time:     816.178 ms (0.00% GC)
  median time:      846.012 ms (0.00% GC)
  mean time:        843.923 ms (0.00% GC)
  maximum time:     874.067 ms (0.00% GC)
  --------------
  samples:          6
  evals/sample:     1

julia> @benchmark bigfunc2($iter, 1.)
BenchmarkTools.Trial: 
  memory estimate:  960.78 KiB
  allocs estimate:  24107
  --------------
  minimum time:     816.634 ms (0.00% GC)
  median time:      840.101 ms (0.00% GC)
  mean time:        837.120 ms (0.00% GC)
  maximum time:     852.441 ms (0.00% GC)
  --------------
  samples:          6
  evals/sample:     1

julia> @benchmark bigfunc3($iter, 1.)
BenchmarkTools.Trial: 
  memory estimate:  961.31 KiB
  allocs estimate:  24120
  --------------
  minimum time:     808.757 ms (0.00% GC)
  median time:      828.173 ms (0.00% GC)
  mean time:        826.189 ms (0.00% GC)
  maximum time:     846.766 ms (0.00% GC)
  --------------
  samples:          7
  evals/sample:     1

The low number of samples means thereā€™s a lot of noise in speed, but the difference in memory & allocations is real.

How many workers do you have @Elrod? I am trying to setup a cluster to test the code, but didnā€™t have the time yet. My laptop only has 2 cores, so basically I cannot experience any speedup even if I use all of them (1 master + 1 worker).

My desktop has 6 cores and I started the REPL with -p5 to use them all.
Starting without any extra workers (so nprocs() returns 1):

julia> @benchmark bigfunc1($iter, 1.)
BenchmarkTools.Trial: 
  memory estimate:  44.50 KiB
  allocs estimate:  1525
  --------------
  minimum time:     963.369 Ī¼s (0.00% GC)
  median time:      1.057 ms (0.00% GC)
  mean time:        1.073 ms (1.15% GC)
  maximum time:     8.679 ms (80.81% GC)
  --------------
  samples:          4647
  evals/sample:     1

julia> @benchmark bigfunc2($iter, 1.)
BenchmarkTools.Trial: 
  memory estimate:  42.17 KiB
  allocs estimate:  1496
  --------------
  minimum time:     1.065 ms (0.00% GC)
  median time:      1.166 ms (0.00% GC)
  mean time:        1.176 ms (0.64% GC)
  maximum time:     6.122 ms (74.27% GC)
  --------------
  samples:          4240
  evals/sample:     1

julia> @benchmark bigfunc3($iter, 1.)
BenchmarkTools.Trial: 
  memory estimate:  42.17 KiB
  allocs estimate:  1496
  --------------
  minimum time:     1.004 ms (0.00% GC)
  median time:      1.100 ms (0.00% GC)
  mean time:        1.118 ms (0.65% GC)
  maximum time:     6.305 ms (71.97% GC)
  --------------
  samples:          4459
  evals/sample:     1

Around 800 times faster. But this isnā€™t fair. The function call itself is incredibly cheap ā€“ the cost is almost entirely in moving data to workers.

To see a gain, you need the function to be the expensive part.

1 Like

I am yet to see a case where the function is the expensive part and the speed up is great. I am setting up the cluster soon to try this parallel pattern on my package on very large domains.

Well multiprocessing does have quite an overhead. I use it all the time for Monte Carlo simulations where each trajectory takes minutes, and the speedup is almost exactly Nx for N the number of cores. When itā€™s smaller, you need to play around with the buffer_size. And yes, CachingPool makes it send the data in the anonymous function exactly once. If youā€™re enclosing a lot of variables this makes a huge difference. If youā€™re not enclosing a lot of data, then it doesnā€™t matter all that much.

One thing to keep in mind is that you may not be close to optimal, in which case you may want to overload processors. Hereā€™s a quick explanation and example (from a long time ago, but itā€™s still read-worthy):

Thanks @ChrisRackauckas, the issue is that I donā€™t have any data to move around, that is what is bothering me the most. I am only passing the estimator object that has all the data in its internal state, but the loop is blind to these internal arrays.

I will come back with more concrete numbers after I get to the cluster.