Spawn-fetch usage

Hi all!

I am interest in using @spawn and fetch for parallel programing. I am getting some outputs that are not exactly what I would expect from the tutorial. I would be grateful if someone could point out to me if I am doing or interpreting something wrong.

Consider the following example, which is a slight modification from the tutorial: https://docs.julialang.org/en/latest/manual/parallel-computing/#Parallel-Computing-1

julia> r = @spawn @show rand(2,2)
Future(2, 1, 4, Nullable{Any}())
	From worker 2:	rand(2, 2) = [0.125496 0.388475; 0.370902 0.763974]

julia> s = @spawn @show 1 .+ fetch(r)
Future(3, 1, 5, Nullable{Any}())
	From worker 3:	1 .+ fetch(r) = [1.1255 1.38848; 1.3709 1.76397]

julia> fetch(s)
2Ă—2 Array{Float64,2}:
 1.1255  1.38848
 1.3709  1.76397

What seems wrong to me is that process “2” which owns r is not the same one that execute the instruction @show 1 .+ fetch(r). This seems to contradict the follow statement from the tutorial "...@spawn is smart enough to perform the computation on the process that owns r, so the fetch will be a no-op (no work is done).".

I agree. It looks like the manual is not correct. The source code also seems to suggest that @spawn is just cycling through the available workers.

https://github.com/JuliaLang/julia/blob/cc13293e09b10984311fe1fafe147db5bd23bb92/base/distributed/macros.jl#L42-L45

https://github.com/JuliaLang/julia/blob/cc13293e09b10984311fe1fafe147db5bd23bb92/base/distributed/macros.jl#L17-L17

https://github.com/JuliaLang/julia/blob/cc13293e09b10984311fe1fafe147db5bd23bb92/base/distributed/macros.jl#L5-L12

1 Like

Thank you very much for checking this up.

So, if I want to allocate a variable r in some of the workers and guarantee that I use this allocated variable internally on a subsequent remote calls what do you think I should do? I didn’t manage to grasp this from the source code or from the tutorial.

I’m not sure this is what you need, but there’s a macro @spawnat that allows you to specify the process where the expression will be evaluated.

1 Like

I thought about it. Using something like:

julia> id = 2  # for instance
julia> r = @spawnat id rand(2,2)
Future(2, 1, 4, Nullable{Any}())
julia> s = @spawnat id 1 .+ fetch(r)
Future(3, 1, 5, Nullable{Any}())

would be an option.

I do not know, however, if this would be efficient. That is, if fetch(r) would recognize that r is a variable already computed by this worker and would consider fetch as a “no-op” operation.

That’s a good question. I don’t think fetch is a noop in this case, but I’m far from being an expert. I’d try to include as much computation as possible in a single @spawnat or remote call.

1 Like

Seems like it is doing something but the time is in nanoseconds when on p = 1, with no allocations so it’s pretty trivial. But there is a higher overhead to using @spawnat, in microseconds and with some allocations.

julia> using BenchmarkTools

julia> @everywhere f(x) = x.^2

julia> r = @spawnat 2 f(rand(10));

julia> @btime @spawnat 2 fetch(r);
  25.162 ÎĽs (48 allocations: 2.02 KiB)

julia> r = @spawnat 2 f(rand(100));

julia> @btime @spawnat 2 fetch(r);
  25.162 ÎĽs (47 allocations: 2.00 KiB)

julia> r = @spawnat 2 f(rand(1000));

julia> @btime @spawnat 2 fetch(r);
  25.163 ÎĽs (47 allocations: 2.00 KiB)

julia> r = @spawnat 1 f(rand(10));

julia> @btime fetch(r);
  16.410 ns (0 allocations: 0 bytes)

julia> r = @spawnat 1 f(rand(100));

julia> @btime fetch(r);
  16.410 ns (0 allocations: 0 bytes)

julia> r = @spawnat 1 f(rand(1000));

julia> @btime fetch(r);
  16.774 ns (0 allocations: 0 bytes)
1 Like

Hi Mohamed,

It seems to me that using @btime @spawnat 2 fetch(r) will not produce the correct benchmark result, because you are just checking the time for sending the command to the worker and not for the execution itself. So the more correct would be to use @spawnat 2 @btime fetch(r).

I did a experiment based on what you have proposed with this modification I have just mentioned. I have also used larger memory allocations to make the results easier to visualise.

# First Execution (compilation + run time)
julia> r = @spawnat 2 rand(10000, 10000)

julia> s = @spawnat 3 rand(10000, 10000)

julia> @time @spawnat 2 @time fetch(r);
0.131639 seconds (45.03 k allocations: 2.463 MiB)
From worker 2:	  0.025717 seconds (1.52 k allocations: 95.458 KiB)

julia> @time @spawnat 2 @time fetch(s)
0.105274 seconds (23.54 k allocations: 1.292 MiB, 14.16% gc time));
From worker 2:	 15.557443 seconds (148.76 k allocations: 768.124 MiB, 4.74% gc time)

# Second Execution (run time)
julia> r = @spawnat 2 rand(10000, 10000)

julia> s = @spawnat 3 rand(10000, 10000)

julia> @time @spawnat 2 @time fetch(r);
0.005726 seconds (183 allocations: 11.311 KiB)
From worker 2:	  0.005913 seconds (8 allocations: 256 bytes)

julia> @time @spawnat 2 @time fetch(s)
 0.004214 seconds (204 allocations: 12.075 KiB)
From worker 2:	 11.560953 seconds (81.85 k allocations: 764.341 MiB, 9.35% gc time)

OBS: I have used @time instead of @btime because the command @spawnat 2 @btime fetch(s) was failing for some reason I do not completely understand.

So it seems to me that using @spawnat + fetch is indeed what I wanted. When using a variable that was created within the same process it does not seems to be allocating extra memory at the host process and does not introduce much overhead on the execution time neither.

Did you do @everywhere using BenchmarkTools?

True, which is why I did the same thing on proc 1 and used fetch without spawnat to time spawnat and fetch simultaneously. spawnat seems like it has some overhead in microseconds so you might want to avoid too many of those.

Yes, it is having trouble because for some reason it does not recognise r whenever I include @btime inside @spawnat

Yes it seems to introduce this overhead. My major concern, however, was on discovering if it was relocating memory and introducing large oveheads on the worker. Which does not seems to be much of a problem after the experiments we have tried out.

I cannot reproduce the error, it is only taking excessively long so I kill the session which is probably because it is running way too many experiments for benchmarking. Anyways, at least you have your answers!

Try:

julia>  @everywhere using BenchmarkTools
julia> r = @spawnat 2 rand(100, 100)
julia> x = @spawnat 2 @btime fetch(r);
julia> fetch(x)
ERROR: On worker 2:
UndefVarError: r not defined

You will only get the error when you try to fetch x.

Okaaay, as I am sure you can tell I am discovering this world together with you. So this error made me correct a few misconceptions I had about parallel programming. Here is the summary:

  1. When you call @spawnat, any error that happens on another process is not directly visible to you through the REPL. spawnat itself is asynchronous so process 1 will continue to run after telling the other process what needs to be done. The errors are only visible when you fetch the Future that comes out of the @spawnat which basically resembles the final return value or state of the function call. This is obvious from the following piece of code.
julia> r = @spawnat 2 throw("Error")
Future(2, 1, 56, Nullable{Any}())

julia> fetch(r)
ERROR: On worker 2:
"Error"
....truncated
  1. If you fetch a Future twice, Julia will freeze. This is probably a bug as it should at least give an error.

  2. A Future is only defined in process 1 when you declare it like that r = @spawnat ...... Proof:

julia> r = @spawnat 2 rand(10)
Future(2, 1, 5, Nullable{Any}())

julia> fetch(@spawnat 2 isdefined(:r))
false

julia> fetch(@spawnat 1 isdefined(:r))
true

julia> fetch(@spawnat 1 names(Main))
5-element Array{Symbol,1}:
 :Base
 :Core
 :Main
 :ans 
 :r   

julia> fetch(@spawnat 2 names(Main))
3-element Array{Symbol,1}:
 :Base
 :Core
 :Main
  1. A variable is only defined in the Main of another process if preceded with global. This may or may not be a bug, someone more knowledgeable will probably have to comment. Proof:
julia> fut1 = @spawnat 2 r = rand(100, 100);

julia> fetch(@spawnat 2 names(Main))
3-element Array{Symbol,1}:
 :Base
 :Core
 :Main

julia> fut1 = @spawnat 2 global r = rand(100, 100);

julia> fetch(@spawnat 2 names(Main))
4-element Array{Symbol,1}:
 :Base
 :Core
 :Main
 :r   

Given all the above new revelations to me, I think the way to do the intended from the above code of yours is:

@everywhere using BenchmarkTools
@spawnat 2 global r = rand(100, 100); #No need to store the Future here because we don't want `r` back
x = @spawnat 2 @btime r.^2 #This will square the `r` matrix and benchmark it on process 2, returning a Future that should point to the element-wise squared `r` matrix. And if you wait a little you will see the result of @btime. 
m = fetch(x); # This will synchronize with process 2, by waiting for it to finish then it will get the resulting matrix to process 1.
typeof(m) # Should give us an Array{Float64, 2}
# Happy ending!

I hope that makes some sense! I am sure there is a lot more to learn. At some point, PRing the docs on parallel programming might be a good idea, as it is obviously faaaar from comprehensive.

For potential experts reading this and trying to figure out the sources of the potential bugs mentioned above, here is the versioninfo:

julia> versioninfo()
Julia Version 0.6.0
Commit 9036443 (2017-06-19 13:05 UTC)
Platform Info:
  OS: Linux (x86_64-linux-gnu)
  CPU: Intel(R) Core(TM) i7-6700 CPU @ 3.40GHz
  WORD_SIZE: 64
  BLAS: libopenblas (USE64BITINT DYNAMIC_ARCH NO_AFFINITY Haswell)
  LAPACK: libopenblas64_
  LIBM: libopenlibm
  LLVM: libLLVM-3.9.1 (ORCJIT, skylake)

You might want to check the seemingly lovely ParallelDataTransfer.jl by @ChrisRackauckas (sorry for pinging you!)

3 Likes

Thank you very much for the summary.

The package ParallelDataTransfer.jl does what I wanted and would be a nice alternative to what I intended to do with “@spawnat+fetch”, thank you for bring it up.