I have a large data structure g (passed in to my parent function) on which I would like to perform a large number (n = 1e4 to 1e6) of (moderately-expensive) functions with different arguments (argList). The function results will be completely indepedent of each other, which means that parallelism should be easy. However, I’m struggling with syntax and mechanics. Here’s what I’ve tried / thought:
resultList = pmap ((x) -> myfunction(g, x), argList) : this works, but takes ~92 seconds (serial takes ~5 seconds). I believe it’s because g is being passed to the workers each time, and this is a lot of data transfer.
@parallel (f) for a in argList: I’m not sure how to do this. I can create an f but it will take multiple arguments in order to work (it mutates an accumulator vector), and I’m unsure how to pass multiple arguments into a @parallel for loop. I have not found a suitable example anywhere.
ParallelDataTransfer.jl: this looks like I’d be able to pass g once to each worker, but then I don’t know how to rewrite pmap so that it uses the worker’s local copy of g.
could you create g on each worker and then call the function? if that is too large, what about a SharedArray? this is almost certainly overhead from copying data across workers. pmap is what I would use here I think :
# save in a file mod.jl
module m
g = ...
end
# then load
@everywhere include("mod.jl")
# run pmap
Don’t do this. @parallel splits the job before computing anything. If any of the jobs are quicker, it will just wait instead of picking up the slack. This is fine for quick jobs (and has slightly less overhead), but pmap is what you want in almost any case.
function test()
global a
b = a::MyType # could this be one line?
# Continue in your code using b
end
You might need to use a generated function to get MyType.
That MWE caused swapping which was the cause of the delay in parallel. I’m not sure I can come up with a trivial example. Native Julia functions are just too fast.
Here we go:
moderatelyexpensivefunction(obj, x) = sum(expm(x.*(obj[1:1000, 1:1000])))
function mwe(obj, toeval)
a = 0.0
for v in toeval
a += moderatelyexpensivefunction(obj, v)
end
return a
end
function parmwe(obj, toeval)
a = sum(pmap((v)->moderatelyexpensivefunction(obj, v), toeval))
return a
end
with results
julia> myobj = rand(10_000,10_000);
julia> myargs = rand(1:1000, 20);
julia> @time y = LightGraphs.mwe(myobj, myargs);
27.973902 seconds (2.42 k allocations: 8.166 GiB, 24.00% gc time)
julia> @time z = LightGraphs.parmwe(myobj, myargs);
37.013757 seconds (3.67 k allocations: 167.000 KiB)
I suspect the bottleneck here is the memory bus (between RAM and CPU cache). To compute expm(obj) each worker has to inspect the entire 1000x1000 matrix. In the end the program is slower because of the pmap overhead.
Is there some kind of benchmark to measure the memory bus’s bandwidth? It should be possible to do a back-of-the-envelope estimate based on that.
If that’s the case then it’s not the right MWE. In my case I’m passing a large graph object for each call to pmap, and I’m merely trying to figure out the best way to let each worker use its own local copy.
I dislike MWEs because it’s too easy to distract from the original issue.
I believe you can create a RemoteChannel, rg, to the initial big datastructure g on its owner, say worker 0, and ask all other workers w to fetch (@spawnat w fetch(rg)) this RemoteChannel. Doing this returns a Future to each worker w’s local copy of g to worker 0. Worker 0 stores these Futures in an array rgw:
addprocs()
N = 10_000
g = rand(N,N)
rg = RemoteChannel(()->Channel{typeof(g)}(1), myid())
put!(rg,g)
rem_fetch(w,rg) = @spawnat w fetch(rg)
rgw = map(w->rem_fetch(w,rg), workers())
Then you can write something like pmap which uses @spawnat w moderatelyexpensivefunction(fetch(rgw[w]), myargs).