Multi-processing and Distributed Computing Manual Questions and Suggestions

I am trying to learn how to do distributed computing in Julia, so I looked at the documentation in the official Julia manual (which I think is the only documentation page, if there’s a more detailed or standalone one I would love to know).

I have some questions and concerns, and I hope by posting them here I can learn more about how Distributed works and improve the official documentation for new users.

For all code that follows, I have launched julia using julia -p 2, just like in the manual.

In the first code block, the manual gives the example

$ julia -p 2

julia> r = remotecall(rand, 2, 2, 2)
Future(2, 1, 4, nothing)

julia> s = @spawnat 2 1 .+ fetch(r)
Future(2, 1, 5, nothing)

julia> fetch(s)
2×2 Array{Float64,2}:
 1.18526  1.50912
 1.16296  1.60607

But when I run it on my laptop, I get

julia> r = remotecall(rand, 2, 2, 2)
Future(2, 1, 8, ReentrantLock(nothing, 0x00000000, 0x00, Base.GenericCondition{Base.Threads.SpinLock}(Base.IntrusiveLinkedList{Task}(nothing, nothing), Base.Threads.SpinLock(0)), (0, 0, 144628617296)), nothing)

I am pretty sure that the first two arguments in Future(...) are the id’s of the processes that run the function and initiate the remotecall, respectively (or maybe the second one is just the idea of whatever process is currently running). I am less sure, but I think that the third argument is some kind of id identifying the returned value on the remote process, so that when I call fetch on the Future, the fetching process knows where the data it is supposed to fetch is located.

I am guessing that the last parameter in Future(...) somehow represents how the different processes are being managed, and that if I ran this on a computing cluster I would see some kind of object that has to do with accessing remote devices, instead of an object from Base.Threads. But I’m really going out on a limb here.

From the start, I think the meaning of Future(2, 1, 4, nothing) should be explained. Looking at the documentation for the Future type did not help much, except for identifying that Future has something to do with the RRID type, which I identified with remoteref_id. In particular, v::Union{Some, Nothing}=nothing tells me absolutely nothing about what that argument does. Distributed Computing · The Julia Language

The fourth argument in the returned Future(...) was especially concerning to me, since what I got (ReentrantLock(...[very long and intimidating])) was very different from what was shown in the manual (nothing).

The second code block gives an example of remotecall_fetch:

julia> remotecall_fetch(r-> fetch(r)[1, 1], 2, r)
0.18526337335308085

" This fetches the array on worker 2 and returns the first value. Note, that fetch doesn’t move any data in this case, since it’s executed on the worker that owns the array. […]"

I don’t have any problem with the code block, but I would like to nitpick the text that follows. I believe (although I don’t know how I would test this) that s = @spawnat 2 1 .+ fetch(r) in the first code block also didn’t move any data, since we executed it on process 2, the same one we used for the remote call that created r. I also assume this is why @spawnat needs to be implemented as a macro instead of a function call, so that fetch(r) can be interpreted to mean “on the remote process, fetch r”, instead of fetching r on process 1 and then spawnat sending the fetched data to process 2 (which would result in two pointless data transfers). But the fact the lack of moving data was not mentioned until after the second code block initially mislead me to think the lack of data movement had something to do with remotecall_fetch specifically.

The fourth code block gives an example of using :any with @spawnat:

julia> r = @spawnat :any rand(2,2)
Future(2, 1, 4, nothing)

julia> s = @spawnat :any 1 .+ fetch(r)
Future(3, 1, 5, nothing)

julia> fetch(s)
2×2 Array{Float64,2}:
 1.38854  1.9098
 1.20939  1.57158

" Note that we used 1 .+ fetch(r) instead of 1 .+ r. This is because we do not know where the code will run, so in general a fetch might be required to move r to the process doing the addition. In this case, @spawnat is smart enough to perform the computation on the process that owns r, so the fetch will be a no-op (no work is done)."

Unless I have misunderstood what Future(arg1, arg2, arg3, arg4) means, I don’t think the last sentence is correct. The random matrix is owned by process 2, and the addition is performed by process 3. This seems to be the case on my laptop as well. If I perform more additions: s2 = @spawnat :any 1 .+ fetch(r), s3 = @spawnat :any 1 .+ fetch(r) …, the executing process seems to toggle between 2 and 3. So @spawnat actually appears to be using a “dumb” strategy of just cycling between all the processes when scheduling each new computation. Which is fine, but if I mistakenly think that @spawnat is smart enough to avoid memory transfer when it really is not, I might write very inefficient code.

The first two sentences are misleading, because they imply that if we knew where the code will run, and that r did not need to be moved to the process doing the addition, then we could use 1 .+ r instead of 1 .+ fetch(r), but I cannot do that:

julia> r = @spawnat 2 rand(2,2)
Future(2, 1, 21, ReentrantLock(nothing, 0x00000000, 0x00, Base.GenericCondition{Base.Threads.SpinLock}(Base.IntrusiveLinkedList{Task}(nothing, nothing), Base.Threads.SpinLock(0)), (128502582875952, 128502582219888, 128502628089864)), nothing)

julia> s = @spawnat 2 1 .+ r
Future(2, 1, 22, ReentrantLock(nothing, 0x00000000, 0x00, Base.GenericCondition{Base.Threads.SpinLock}(Base.IntrusiveLinkedList{Task}(nothing, nothing), Base.Threads.SpinLock(0)), (8, 0, -1)), nothing)

julia> fetch(s)
ERROR: On worker 2:
MethodError: no method matching length(::Future)
The function `length` exists, but no method is defined for this combination of argument types.
...

I think the paragraph following the fourth code block could be improved by emphasizing the difference between a Future, which “is a placeholder for a single computation of unknown termination status and time”, and the result of the computation that a Future is a placeholder for. E.g., we cannot do 1 .+ r because r is a Future type that will return a Matrix{Float64} (moving data if necesary) when fetched. r is not itself a Matrix{Float64}.

" An important thing to remember is that, once fetched, a Future will cache its value locally. Further fetch calls do not entail a network hop. Once all referencing Futures have fetched, the remote stored value is deleted."

I think it could also be clarified that when multiple processes fetch a Future, they all fetch the same data. I.e., the random matrix is not recomputed each time a new process performs a fetch.

" @async is similar to @spawnat, but only runs tasks on the local process. We use it to create a “feeder” task for each process. Each task picks the next index that needs to be computed, then waits for its process to finish, then repeats until we run out of indices. Note that the feeder tasks do not begin to execute until the main task reaches the end of the @sync block, at which point it surrenders control and waits for all the local tasks to complete before returning from the function. As for v0.7 and beyond, the feeder tasks are able to share state via nextidx because they all run on the same process. Even if Tasks are scheduled cooperatively, locking may still be required in some contexts, as in asynchronous I/O. This means context switches only occur at well-defined points: in this case, when [remotecall_fetch](Distributed Computing · The Julia Language{Any, Integer, Vararg{Any}}) is called. This is the current state of implementation and it may change for future Julia versions, as it is intended to make it possible to run up to N Tasks on M Process, aka M:N Threading. Then a lock acquiring\releasing model for nextidx will be needed, as it is not safe to let multiple processes read-write a resource at the same time."

I am very confused by this paragraph. I confess, I went straight to “Multi-processing and Distributed Computing” section of the manual, without reading the “Asynchronous Programming”, “Asynchronous Programming” and “Multi-Threading” sections, so I thought that @async was a macro from Distributed that was being introduced here, when in reality it had already been introduced earlier. That’s my fault, but I think it is pretty common to read the manual out of order, only reading a section when you need to know about the features in that section. I think it could be made clearer that @async was already introduced if the first sentence was changed to “@spawnat is similar to @async, with the main difference being that @async only runs tasks on the local process. We use @async to create a ‘feeder’ task …”.

This paragraph also uses the word “task” loosely. Are we talking about the english-language “task”, or are we talking about a Task? I would also notice that this is the first paragraph in this section that the word “task” is used, which for me raised questions (what is a task? does @spawnat create a task? does the created task live in the local process, remote process, or both?)

This is also the first time @sync is mentioned in the manual since the section “Networking and Streams”. If you have not read that section, then the sentence “Note that the feeder tasks do not begin to execute until the main task reaches the end of the @sync block […]” does not make sense, because it references the @sync which has not been introduced. (Either way, I think it may be more correct to say a/an @sync block).

That’s all I’ve read so far. I may make another post after reading more of this section of the manual.