Why do use @spawnat and fetch?

Hi,

I am learning parallel in Julia. And I saw below code of summing an array.

Background:

using Distributed
using DistributedArrays
addprocs(4)
@everywhere using DistributedArrays

a = rand(10^7)
adist = distribute(a)

The parallel function:

function mysum_dist(a::DArray)
    r = Array{Future}(undef, length(procs(a)))
    for (i, id) in enumerate(procs(a))
        r[i] = @spawnat id sum(localpart(a))
    end
    return sum(fetch.(r))
end

Why do we use @spawnat and fetch here?
I think we can do it without @spawnat and fetch:

###WRONG!!! localpart(a) in local is empty.
function mysum_dist2(a::DArray)
    r = Array{Float64}(undef, length(procs(a)))
    for (i, id) in enumerate(procs(a))
        r[i] = sum(localpart(a))
    end
    return sum(r)
end

Could you please tell me what’s the difference between the above functions?
How does @spawnat and fetch work?

When I timing those two function, they have similar time:

For @benchmark mysum_dist($adist):

BenchmarkTools.Trial: 
  memory estimate:  19.20 KiB
  allocs estimate:  531
  --------------
  minimum time:     3.537 ms (0.00% GC)
  median time:      4.989 ms (0.00% GC)
  mean time:        5.045 ms (0.00% GC)
  maximum time:     10.120 ms (0.00% GC)
  --------------
  samples:          989
  evals/sample:     1

For @benchmark mysum_dist2($adist):

BenchmarkTools.Trial: 
  memory estimate:  2.55 KiB
  allocs estimate:  37
  --------------
  minimum time:     3.100 μs (0.00% GC)
  median time:      4.011 μs (0.00% GC)
  mean time:        5.534 μs (4.43% GC)
  maximum time:     361.116 μs (97.75% GC)
  --------------
  samples:          10000
  evals/sample:     8

Thanks!

Did you check if the two versions return the same result? For me, mysum_dist2 returns zero.
When you create a distributed array, the whole data is distributed to the worker process, which is why

julia> localpart(adist)
0-element Array{Float64,1}

However, using the @spawnat macro you can run a command on a certain worker, so

julia> r = @spawnat 2 localpart(adist)
Future(2, 1, 115, nothing)

will run the localpart function on the worker with id 2. However, the result of the function only exists on the worker and only after the function has completed, so we can call fetch which will wait for the operation to finish and then copies the data from the worker back to the main process.

julia> fetch(r)
2500000-element Array{Float64,1}:
 0.2612222333532057
 0.9666371698145217
 ⋮                 
 0.6747917592939274

which is the part of the distributed array that belongs to worker 2. Because transferring data between processes is slow, you want to do the reduction (calling sum) on the worker and then only transfer the result back, which is a single number and

julia> fetch(@spawnat 2 sum(localpart(adist)))
1.2494661338187964e6

does exactly this. I hope this helps understand what is happening here.

2 Likes

Thank you very much.

I also found that no matter we fetch() the result, each core will do its calculation once we use @spawnat to assign calculation.

Before I thought fetch() is like a lazy operation, which means each core will start to calculate only when we fetch() them.

I know my previous opinion was wrong by runing:

r= @spawnat 1 (sleep(20);rand())

Then I wait for 20s and run fetch(r). fetch(r) will return results immediantly instead of after 20s.

I also found why @spawnat and fetch() can do parallel computing.

The code below will print “last” immediately, which means the for-loop is running immediately. This is because @spawnat only assigns tasks and it assigns to each worker almost at the same time. However, each workers will doing sleep(10);rand() simultaneously.

for i in 1:3
    r = @spawnat i (sleep(10);rand())
end
println("last")