Distributed: Passing views of an array for read access to workers (using pmap)

I have a large array and want multiple workers to have read access to that array in parallel. Each worker works on a disjoint interval of the array.

My plan was to create a view of each of the sections of the array, and then pmap the workers’ function on a list of these views.
However, I found that the creation of these workers is slow; with the example below only the creation takes about 1.4 seconds; with more data to be shared it takes much more time. In addition, there is much more data allocated in the parallel case (874.982 MiB) compared to the sequential case (2.307 MiB).
Is it possible that my approach to share parallel reader access to disjoint junks of the array is inefficient? Is the data possibly copied? Is there a more efficient way to provide parallel reader access to disjoint intervals?

Here is a minimal example, showing how long it takes to create the workers:

using Distributed

arr = rand(10^8)

# we partition arr in n_workers (almost) many equally sized junks
n_workers = length(workers())
size_per_worker = ceil(Int64,length(arr)/n_workers)
taskseq = []
for i in 1:n_workers
    start = (i-1)*size_per_worker + 1
    stop = min(i*size_per_worker,length(arr))
    push!(taskseq,@view(arr[start:stop]))
end

# this is the function that each worker should executed in parallel
# input: A view of the array, and the start time of pmap
@everywhere function worker_function((myarr,pmap_start_time))
    worker_start_time = time()
    println("Time it took to start function (worker): ", worker_start_time - pmap_start_time)
    flush(stdout)
    retval = 0
    for i in 1:length(myarr)
        retval += myarr[i]
    end
    println("Time to process in function (worker):", time() - worker_start_time)
    flush(stdout)
    return retval
end




seq_dict = Dict()
seq_dict[0] = 0 
retval = 0

println("starting sequential")
start_time = time()
@time worker_function((arr,start_time))
end_time = time()
println("time for sequential: ", end_time - start_time)



println("starting pmap"); flush(stdout)
start_time = time()
start_time_vec = ones(length(taskseq)).*start_time

# we execute worker_function in parallel
@time pmap(worker_function,zip(taskseq,start_time_vec))

end_time = time()
println("time of parallel: ", end_time - start_time)

The output would be e.g. when calling this script with 4 workers (julia -p 4 filename.jl):

starting sequential
Time it took to start function (worker): 0.027009010314941406
Time to process in function (worker):0.561241865158081
0.569265 seconds (32.95 k allocations: 2.307 MiB, 3.94% compilation time)
time for sequential: 0.5942938327789307
starting pmap
From worker 2: Time it took to start function (worker): 1.4049251079559326
From worker 3: Time it took to start function (worker): 1.405060052871704
From worker 5: Time it took to start function (worker): 1.409548044204712
From worker 4: Time it took to start function (worker): 1.4143550395965576
From worker 2: Time to process in function (worker):0.21155595779418945
From worker 3: Time to process in function (worker):0.22400593757629395
From worker 5: Time to process in function (worker):0.23674607276916504
From worker 4: Time to process in function (worker):0.24160385131835938
1.692557 seconds (1.72 M allocations: 874.982 MiB, 5.34% gc time, 28.35% compilation time)
time of parallel: 1.7246119976043701

You can see that the time to just start the functions on the workers is quite significant (and increases when arr is larger). One possible reason could be that the memory allocation is much larger in the parallel case (874.982 MiB) compared to the sequential one ( 2.307 MiB).

Note that while each worker does a summation task, which could of course be done more efficient by mapping the (+) function, I explictiy want to execute a for loop on each worker, since my (more complex) usecase requires multiple function calls per item of the array.

Take a look at Introduction · DistributedArrays.jl

Many thanks, this seems to be the right way. In particular, when constructing a distributed array ([Introduction · DistributedArrays.jl]) it is possible to pass the procs argument, so that each of the processes gets a part of the array. That seems to be the solution.

However, I am not sure from the link how to most efficiently transform my data into a distributed array with each worker having a separate junk. My use case is that the data (arr) has been generated in the main process and now needs to be shared among the workers. This is described in this section

But calling simply

darr = DArray(arr)
yields

ERROR: LoadError: type Float64 has no field where

and calling
darr = DArray(arr;procs=workers())
yields

(M1Max) (base) tass@arm64-apple-darwin20 Aljaz % julia -p 4 test_parallel_darray.jl
ERROR: LoadError: MethodError: no method matching DArray(::Vector{Float64}; procs::Vector{Int64})

The example code does some reshaping, but I do not understand why and how these dimensions are determined. Maybe I need to reshape as well?

Do you know how to distribute the data to all workers?

Maybe something like this

julia> using Distributed

julia> addprocs(4)
4-element Vector{Int64}:
 2
 3
 4
 5

julia> @everywhere using DistributedArrays

julia> A = randn((100,100))
100×100 Matrix{Float64}:
...

julia> DA = distribute(A)

julia> pmap(procs(DA)) do i
           DA[:l] # this gets you the local part of DA that is available on the ith process
       end
2×2 Matrix{Matrix{Float64}}:
 [-1.32192 -1.03872 … -1.45507 -1.03656; 0.9152 0.935673 … 0.552808 -0.524983; … ; 1.29931 0.584092 … -0.0659655 0.558668; 0.779895 1.13761 … 0.105637 -1.30604]      …  [1.47834 -2.07625 … 0.286428 0.723728; 0.870625 1.94568 … -0.68 1.21642; … ; -1.33853 0.28787 … -1.09238 -0.573003; 0.89278 -0.146452 … -0.214458 -1.67541]
 [0.863741 0.474698 … 0.155879 -0.0570954; 0.559998 -1.62241 … -0.16028 1.39969; … ; -0.447555 0.427656 … 0.293934 -2.45839; 0.150076 0.542755 … -0.324436 0.664975]     [1.05879 0.769584 … -0.427145 -0.44544; 0.0646921 0.11107 … -0.639559 -0.348907; … ; -0.802571 1.13975 … 0.954727 -1.00489; -2.02937 1.72174 … 0.157684 1.59962]

Thank you, that does help. Using the DistributedArrays package (and the [:L] as you suggested), I could now make it run. There still seems to be a performance issue, but the bottle neck seems to be the number of allocations; I will mark this thread as solved (the distributed arrays did the trick) and open a new one with using (a) distributed arrays and (b) highlighting the memory allocation problem.

1 Like

Is there a reason why you went for processes (via Distributed.jl) instead of threads? Threads are generally much more light-weight and easier to work with. You really only need processes when you want to distribute work across multiple physical machines.

Just replace this line

@time pmap(worker_function,zip(taskseq,start_time_vec))

With something like

@time Threads.@threads for task in taskseq
    worker_function((task, time()))
end

and start Julia with some threads by replacing -p with -t: julia -t 4

Since you want to record the results and also, potentially, want load-balancing you probably want to translate this as

tasks = map(zip(taskseq,start_time_vec)) do x
    Threads.@spawn worker_function(x)
end
fetch.(tasks)
1 Like

Many thanks for your responses. That does the trick; my computation is much faster now!

However, I want to understand why. My reason for using multiple processes rather than multiple threads is because I wanted to have “real parallelism” i.e. the jobs should run on different cores (in my case, it’s a single machine with 10-40 cores) at the same time. My understanding of threads was that the computer never runs multiple threads at the same time, but rather starts them, one after the other, and the by context switching executes some lines of one thread, then some lines of another thread. I.e. if there was a line of code waiting for an outside action to be triggered (e.g. waiting for a network package), it makes sense to put that waiting part into a separate thread, since that allows other parts of the program to be run while waiting.
Here, however, there does not seem to be a line waiting for an external event to happen. Hence I wanted to use processes rather than threads.

Is my understanding wrong? An answer here says

Well, both multithreading as well as multiprocessing give you parallelism.

So do both give parallelism in the sense of “true parallelism” as described above?

Yes - in Julia they do. In Python they don’t. Python (at least as long as the Global Interpreter Lock is still present) is fundamentally single threaded and so Python “threads” work as you describe: You start them and the Python runtime switches between them but at any given point in time only a single thread is actively working.

Julia has a similar concept to that: Tasks. Basically a Task is a chunk of work. You can create multiple Tasks and Julia will switch between them to minimize total execution (e.g. switch to another Task if the current Task needs to wait for I/O).

Threading build on top of Julia’s Tasks. Basically a thread is a something that can work on completing Task and they can work “truly parallel”, i.e. each thread can use a different CPU core [1]. So if you start Julia with more threads, you can have more Tasks running in parallel.

Another way of achieving “true parallelism” is using multiple processes. This is basically a different layer of abstraction. See when you tell your OS to start a program, it puts everything belonging to the program in a process.

So what is the difference between processes and threads? For starter each Julia process have their own separate threads. But what does that mean in practice? Having multiple processes is much more expensive than having multiple threads. That is because the OS allocates resources for process separately. So each Julia process needs to load the external libraries again[2] and needs to compile the code again. Sharing memory is also harder, since the OS usually prohibits a process from accessing a different process’ memory. So accessing common data has a higher overhead.

The main advantage of using different processes is the possibility of running them on physically different hardware, i.e. on multiple computers. In that sense processes are another layer on top of threads.

Summary:

  1. Tasks allow the queuing of multiple independent pieces of work.
  2. Threads work on Tasks. So multiple threads allow parallel execution on the same machine (by using multiple cores).
  3. Processes allow distribution of work to multiple physical machines.

  1. Note that by default the threads are not pinned to a specific CPU core, you’d need to use ThreadPinning.jl for that. It can have performance advantages. If you want more on this technicalities please ask :slight_smile: ↩︎

  2. This is the reason that sometimes you need to use multiple processes though. For example BLAS (linear algebra library) uses it’s own threads and that’s a very complicated story ↩︎

2 Likes