Help with parallelism

I have a large data structure g (passed in to my parent function) on which I would like to perform a large number (n = 1e4 to 1e6) of (moderately-expensive) functions with different arguments (argList). The function results will be completely indepedent of each other, which means that parallelism should be easy. However, I’m struggling with syntax and mechanics. Here’s what I’ve tried / thought:

  1. resultList = pmap ((x) -> myfunction(g, x), argList) : this works, but takes ~92 seconds (serial takes ~5 seconds). I believe it’s because g is being passed to the workers each time, and this is a lot of data transfer.

  2. @parallel (f) for a in argList: I’m not sure how to do this. I can create an f but it will take multiple arguments in order to work (it mutates an accumulator vector), and I’m unsure how to pass multiple arguments into a @parallel for loop. I have not found a suitable example anywhere.

  3. ParallelDataTransfer.jl: this looks like I’d be able to pass g once to each worker, but then I don’t know how to rewrite pmap so that it uses the worker’s local copy of g.

Advice / comments appreciated. This is a simplified version of the actual problem; for the real code for option 1) above, please see https://gist.github.com/sbromberger/91dc64cf3c6ff18ef9fb481db0795eed.

1 Like

could you create g on each worker and then call the function? if that is too large, what about a SharedArray? this is almost certainly overhead from copying data across workers. pmap is what I would use here I think :

# save in a file mod.jl
module m
g = ...

end

# then load
@everywhere include("mod.jl")

# run pmap

sorry this is only a sketch.

I can create g on each worker using ParallelDataTransfer, but then I don’t know how to access it.

Clarification: g is passed in to the parent function. This isn’t a REPL / interactive thing; it’s part of a library.

SharedArray won’t work because the return value of the function is not a bitstype.

Don’t do this. @parallel splits the job before computing anything. If any of the jobs are quicker, it will just wait instead of picking up the slack. This is fine for quick jobs (and has slightly less overhead), but pmap is what you want in almost any case.

function test()
  global a
  b = a::MyType # could this be one line?
  # Continue in your code using b
end

You might need to use a generated function to get MyType.

Can you make a MWE that doesn’t require using the package? This extra detail is getting in the way of actually benchmarking things well.

That MWE caused swapping which was the cause of the delay in parallel. I’m not sure I can come up with a trivial example. Native Julia functions are just too fast.

Here we go:

moderatelyexpensivefunction(obj, x) = sum(expm(x.*(obj[1:1000, 1:1000])))

function mwe(obj, toeval)
    a = 0.0
    for v in toeval
        a += moderatelyexpensivefunction(obj, v)
    end
    return a
end

function parmwe(obj, toeval)
    a = sum(pmap((v)->moderatelyexpensivefunction(obj, v), toeval))
    return a
end

with results

julia> myobj = rand(10_000,10_000);

julia> myargs = rand(1:1000, 20);

julia> @time y = LightGraphs.mwe(myobj, myargs);
 27.973902 seconds (2.42 k allocations: 8.166 GiB, 24.00% gc time)

julia> @time z = LightGraphs.parmwe(myobj, myargs);
 37.013757 seconds (3.67 k allocations: 167.000 KiB)

I suspect the bottleneck here is the memory bus (between RAM and CPU cache). To compute expm(obj) each worker has to inspect the entire 1000x1000 matrix. In the end the program is slower because of the pmap overhead.

Is there some kind of benchmark to measure the memory bus’s bandwidth? It should be possible to do a back-of-the-envelope estimate based on that.

If that’s the case then it’s not the right MWE. In my case I’m passing a large graph object for each call to pmap, and I’m merely trying to figure out the best way to let each worker use its own local copy.

I dislike MWEs because it’s too easy to distract from the original issue.

But without them you can’t reliably run/test/benchmark anything.

I’ll take a look at this later if no one else does. Just ping me sometime randomly in the future.

1 Like

I believe you can create a RemoteChannel, rg, to the initial big datastructure g on its owner, say worker 0, and ask all other workers w to fetch (@spawnat w fetch(rg)) this RemoteChannel. Doing this returns a Future to each worker w’s local copy of g to worker 0. Worker 0 stores these Futures in an array rgw:

addprocs()
N = 10_000
g = rand(N,N)
rg = RemoteChannel(()->Channel{typeof(g)}(1), myid())
put!(rg,g)
rem_fetch(w,rg) = @spawnat w fetch(rg)
rgw = map(w->rem_fetch(w,rg), workers())

Then you can write something like pmap which uses @spawnat w moderatelyexpensivefunction(fetch(rgw[w]), myargs).