I have a large array and want multiple workers to have read access to that array in parallel. Each worker works on a disjoint interval of the array.
My plan was to create a view of each of the sections of the array, and then pmap the workers’ function on a list of these views.
However, I found that the creation of these workers is slow; with the example below only the creation takes about 1.4 seconds; with more data to be shared it takes much more time. In addition, there is much more data allocated in the parallel case (874.982 MiB) compared to the sequential case (2.307 MiB).
Is it possible that my approach to share parallel reader access to disjoint junks of the array is inefficient? Is the data possibly copied? Is there a more efficient way to provide parallel reader access to disjoint intervals?
Here is a minimal example, showing how long it takes to create the workers:
using Distributed
arr = rand(10^8)
# we partition arr in n_workers (almost) many equally sized junks
n_workers = length(workers())
size_per_worker = ceil(Int64,length(arr)/n_workers)
taskseq = []
for i in 1:n_workers
start = (i-1)*size_per_worker + 1
stop = min(i*size_per_worker,length(arr))
push!(taskseq,@view(arr[start:stop]))
end
# this is the function that each worker should executed in parallel
# input: A view of the array, and the start time of pmap
@everywhere function worker_function((myarr,pmap_start_time))
worker_start_time = time()
println("Time it took to start function (worker): ", worker_start_time - pmap_start_time)
flush(stdout)
retval = 0
for i in 1:length(myarr)
retval += myarr[i]
end
println("Time to process in function (worker):", time() - worker_start_time)
flush(stdout)
return retval
end
seq_dict = Dict()
seq_dict[0] = 0
retval = 0
println("starting sequential")
start_time = time()
@time worker_function((arr,start_time))
end_time = time()
println("time for sequential: ", end_time - start_time)
println("starting pmap"); flush(stdout)
start_time = time()
start_time_vec = ones(length(taskseq)).*start_time
# we execute worker_function in parallel
@time pmap(worker_function,zip(taskseq,start_time_vec))
end_time = time()
println("time of parallel: ", end_time - start_time)
The output would be e.g. when calling this script with 4 workers (julia -p 4 filename.jl
):
starting sequential
Time it took to start function (worker): 0.027009010314941406
Time to process in function (worker):0.561241865158081
0.569265 seconds (32.95 k allocations: 2.307 MiB, 3.94% compilation time)
time for sequential: 0.5942938327789307
starting pmap
From worker 2: Time it took to start function (worker): 1.4049251079559326
From worker 3: Time it took to start function (worker): 1.405060052871704
From worker 5: Time it took to start function (worker): 1.409548044204712
From worker 4: Time it took to start function (worker): 1.4143550395965576
From worker 2: Time to process in function (worker):0.21155595779418945
From worker 3: Time to process in function (worker):0.22400593757629395
From worker 5: Time to process in function (worker):0.23674607276916504
From worker 4: Time to process in function (worker):0.24160385131835938
1.692557 seconds (1.72 M allocations: 874.982 MiB, 5.34% gc time, 28.35% compilation time)
time of parallel: 1.7246119976043701
You can see that the time to just start the functions on the workers is quite significant (and increases when arr is larger). One possible reason could be that the memory allocation is much larger in the parallel case (874.982 MiB) compared to the sequential one ( 2.307 MiB).
Note that while each worker does a summation task, which could of course be done more efficient by mapping the (+) function, I explictiy want to execute a for loop on each worker, since my (more complex) usecase requires multiple function calls per item of the array.