@distributed @sync @async

Dear Julia peers,

I got three questions regarding @distributed @sync @async, as follows.

  1. I am parallel running 5 layers of for loops using 5 @distributed for. However, from the output, I found some operations within the loops were missed.
@distributed for i = 1:100
    @distributed for j = 1:2
        @distributed for k = 1:3
            @distributed for l = 1:4
                 @distributed for m = 1:2
                     operations (some functions)
                 end
             end
         end 
     end
end
  1. I was trying to add @sync and @async to the above codes as below
@sync begin
@distributed for i = 1:100
    @distributed for j = 1:2
        @distributed for k = 1:3
            @distributed for l = 1:4
                 @distributed for m = 1:2
                     @async begin
                     operations (some functions)
                     end
                 end
             end
         end 
     end
end
end

However, I got an error message saying that running program cannot be serialized. What does this mean?

  1. I was trying to run 1 again, but Julia didn’t run the tasks but returned a message “Task (queued) @0x00000001154f9d50”. I googled but didn’t find the answer.

Any suggestions will be highly appreciated. Thank you!

1 Like

As far as I know, there currently is no support for parallel nested for loops, see e.g. https://github.com/JuliaLang/julia/issues/10109.

In your case, it would probably be enough to just put the @distributed on the outermost loop since that one is long enough to allow for effective parallelisation (unless you run on a supercomputer with >100 cores).

Also, you do not need the @async in the innermost loop, but you probably do want the @sync in front of the outermost loop. The effect is illustrated in the following example:

julia> @distributed for i = 1:4; println(i); end; println("done")
done 
      From worker 3:	2
      From worker 2:	1
      From worker 5:	4
      From worker 4:	3
julia> @sync @distributed for i = 1:4; println(i); end; println("done")
      From worker 4:	3
      From worker 3:	2
      From worker 2:	1
      From worker 5:	4
done

In the first case, the “done” could appear anywhere in the list of outputs. Only in the second case are you guaranteed that the “done” appears at the end of the list.

2 Likes

@ettersi Thank you!

First, it seems like that parallel computing works for nested for loops. I tried with five @distributed for all for loops (but without @sync), it did speed up a lot compared with just @distributed at the outermost for loop. The only problem I met is that it missed two or three items in the five layers of for loops (for example, result for i=1, j=2, k=1, l=1, m=2 was missing) - I guess this was because that I didn’t put @sync there.

However, after I put @sync, an error message appeared: “running program cannot be serialized”.

Second, I do have a super computer with 100 cores, so it would be ideal to realize parallelism for these nested for loops.

Third, why we never need @async with @distributed? For remotecall_fetch, @sync is always coupled with @async.

Thanks a lot!

Do any of the loops depend on the result from the previous loop. This may be out in left field but could you do this:

tasks = []

for i = 1:100
    for j = 1:2
        for k = 1:3
            for l = 1:4
                for m = 1:2
                    push!(tasks, (i, j, k, l, m))
                end
            end
        end
    end
end

@distributed for (i, j, k, l, m) in tasks
    operations (some functions)
end
2 Likes

If pixel27’s proposal works for you, then that is probably the best way to achieve what you want.
Aside: You can avoid creating the task list explicitly by using CartesianIndices:

julia> @sync @distributed for i in CartesianIndices((2,2,2))
           @show i.I
       end;
      From worker 2:	i.I = (1, 1, 1)
      From worker 2:	i.I = (2, 1, 1)
      From worker 3:	i.I = (1, 2, 1)
      From worker 3:	i.I = (2, 2, 1)
      From worker 5:	i.I = (1, 2, 2)
      From worker 5:	i.I = (2, 2, 2)
      From worker 4:	i.I = (1, 1, 2)
      From worker 4:	i.I = (2, 1, 2)

If this doesn’t work for you, then it would be good if you could share a minimum working example (MWE) which in particular demonstrates how some items are missed. On my end, nested @distributed loops seem to work unless I put the @sync in front of them.

Regarding your questions about @distributed, @sync, @async, etc: I must admit that I am no expert in parallel computing using Julia myself, but here is how I understand it.

When you start Julia with julia -p p, you create one master process and p worker processes. Each of these processes (both the master and the workers) keep a list of tasks which they should complete, and they can switch between these tasks if progress on one task depends on input from another task or another process.

@distributed for i = range
    do_work(i)
end

splits range into p pieces of as-equal-as-possible lengths and then adds to the task lists of each of the worker processes a task of the form

for i = [this process's share of range]
    do_work(i)
end

Furthermore, on the master process it creates a task which consists in simply waiting for all the worker processes to complete their tasks, and it returns this task as the result of the @distributed for loop. This master task does not block progress in the “main” master task, however. If you want to do that, you have to explicitly call wait() on the @distributed for task. You can see this playing out in the following example.

julia> master_task = @distributed for i = 1:2
           sleep(1.0)
           @show i
       end
       println("Waiting for workers to finish")
       wait(master_task)
       println("All workers done")
Waiting for workers to finish
      From worker 2:	i = 1
      From worker 3:	i = 2
All workers done

Note how Waiting for workers to finish appears before the output from the workers because the master task does not wait for the workers to finish until we call wait(master_task).

@sync is intended to alleviate you of the burden of explicitly keeping track of the master_task and waiting for it to finish. For example,

@sync @distributed for ...

is equivalent to

master_task = @distributed for ...
wait(master_task)

I believe that this is all that is needed to understand why the @sync fails in your example. (It’s possible that I am off, though. Corrections welcome.) In code of the form

@distributed for i = 1:2
    @distributed for j = 1:2
         ...
    end
end

the inner @distributed for loop is executed on the worker processes, and hence the master_tasks associated with the inner for loop live on the worker processes, not the master. If you strap an @sync around all of this, then this @sync gets confused about how to properly handle the various tasks on various processors, and this is what leads to the error message.

It is in principle possible to avoid this @sync confusion, but even then it remains questionable to use nested @distributed for loops since the outer @distributed loop simply amounts to parallelising the launching of worker tasks, which is more complicated and likely less performant than it could be.

1 Like

It takes time to ‘parcel out’ parallel work. If the loop takes less time than this in fact you could end up slowing things down. Non-intuitive I know. Go for the low hanging fruit first.