I find this easiest to explain with an MWE. Basically, @async is changing values in a remote task when I think it shouldn’t. (Julia 1.8.1)
using Distributed
fnames = ["inside_1", "inside_2"]
addprocs(length(fnames))
@everywhere begin
mutable struct Moo
fname :: String
end
function say(thing)
@info "I am $(thing) sent to process $(myid())"
end
function doit(moo_in::Moo, fnames)
# moo = deepcopy(moo_in) # LINE A
@sync for (ipid, pid) in enumerate(workers())
moo = deepcopy(moo_in) # LINE B
moo.fname = fnames[ipid]
@info "sending $(moo) from manager to process $pid"
@async remotecall_wait(say, pid, moo)
end
end
end
moo = Moo("outside")
doit(moo, fnames)
gives, as expected,
[ Info: sending Moo("inside_1") from manager to process 2
[ Info: sending Moo("inside_2") from manager to process 3
From worker 2: [ Info: I am Moo("inside_1") sent to process 2
From worker 3: [ Info: I am Moo("inside_2") sent to process 3
But if I comment line B and uncomment line A (because I don’t want to copy structs within a loop in my actual use case), and rerun the @everywhere block and then doit(), I get
[ Info: sending Moo("inside_1") from manager to process 2
[ Info: sending Moo("inside_2") from manager to process 3
From worker 3: [ Info: I am Moo("inside_2") sent to process 3
From worker 2: [ Info: I am Moo("inside_2") sent to process 2
I thought the @sync for should ensure that what goes into the @async task is unique? Why is moo going to worker process 2 changing when I move the deepcopy outside the loop? I am flummoxed. It gets even weirder, but I’ll wait for responses first.
Sure, but moo.fname = fnames[ipid] is within an @sync block which is sequentially executed, not @async ; so the @async remotecall should get a unique fname within moo per ipid, no?
It’s a race condition I suppose. The first remote call (thanks to @async) doesn’t execute until the loop comes around a second time and modifies moo.
I’d say the cleanest option is B; copy the object inside the loop. But if you must do it the other way, this works
function doit(moo_in::Moo, fnames)
moo = deepcopy(moo_in) # LINE A
wait_for = Future[]
for (ipid, pid) in enumerate(workers())
moo.fname = fnames[ipid]
@info "sending $(moo) from manager to process $pid"
push!(wait_for, remotecall(say, pid, moo))
end
wait.(wait_for)
@info "Done."
end
Interesting, thanks. I’d have imagined the @sync for with the @async remotecall_wait was equivalent to your solution, which actually looks quite clean to me. With the exception of assignment to Future s which could be pre allocated, is there a downside to using a normal for with remotecalls? Also, is this a bug?
I don’t think it’s a bug. @async wraps remotecall_wait in a task and schedules it. However, when that task is run, is at the discretion of the scheduler. Here it doesn’t run before the loop has come around a second time. That’s not a problem if a unique moo is instantiated at every iteration, but by only modifying the fname field, all those tasks close over the same moo – which at the time when the tasks finally do run, holds the last value. Which is what Jameson explained earlier in fewer words
I don’t see a conceptual problem with my approach. What also works is using asyncmap
function pdoit(moo)
moo = deepcopy(moo)
asyncmap(enumerate(workers())) do (ipid, pid)
moo.fname = fnames[ipid]
remotecall_wait(say, pid, moo)
end
end
I think you might be confused about what @sync does.
help?> @sync @sync
Wait until all lexically-enclosed uses of @async, @spawn, @spawnat and @distributed are complete. All exceptions thrown by enclosed async operations are collected and thrown as a CompositeException.
It does not change the execution order of anything within the block. All it does is wait at the end of the block for all the asynchronous tasks to complete.
Let’s say you have three workers. The code is then essentially equivalent to the following.
function doit(moo_in::Moo, fnames)
ipid = 1
pid = 1
moo = deepcopy(moo_in) # LINE B
moo.fname = fnames[ipid]
@info "sending $(moo) from manager to process $pid"
t1 = @async remotecall_wait(say, pid, moo)
ipid = 2
pid = 2
moo = deepcopy(moo_in) # LINE B
moo.fname = fnames[ipid]
@info "sending $(moo) from manager to process $pid"
t2 = @async remotecall_wait(say, pid, moo)
ipid = 3
pid = 3
moo = deepcopy(moo_in) # LINE B
moo.fname = fnames[ipid]
@info "sending $(moo) from manager to process $pid"
t3 = @async remotecall_wait(say, pid, moo)
wait([t1, t2, t3])
end
The @sync macro is literally expanded as follows:
julia> @macroexpand1 @sync for (ipid, pid) in enumerate(workers())
moo = deepcopy(moo_in) # LINE B
moo.fname = fnames[ipid]
@info "sending $(moo) from manager to process $pid"
@async remotecall_wait(say, pid, moo)
end
quote
#= task.jl:475 =#
let var"##sync#48" = Base.Channel(Base.Inf)
#= task.jl:476 =#
var"#118#v" = for (ipid, pid) = enumerate(workers())
#= REPL[89]:2 =#
moo = deepcopy(moo_in)
#= REPL[89]:3 =#
moo.fname = fnames[ipid]
#= REPL[89]:4 =#
#= REPL[89]:4 =# @info "sending $(moo) from manager to process $(pid)"
#= REPL[89]:5 =#
#= REPL[89]:5 =# @async remotecall_wait(say, pid, moo)
#= REPL[89]:6 =#
end
#= task.jl:477 =#
Base.sync_end(var"##sync#48")
#= task.jl:478 =#
var"#118#v"
end
end
If you expand @async as well via @macroexpand, you will see that all that happens is that each task created by @async is pushed into the channel var"##sync#48". At the end, each task in var"##sync#48" is waited on until completion.
Full macroexpansion
julia> @macroexpand @sync for (ipid, pid) in enumerate(workers())
moo = deepcopy(moo_in) # LINE B
moo.fname = fnames[ipid]
@info "sending $(moo) from manager to process $pid"
@async remotecall_wait(say, pid, moo)
end
quote
#= task.jl:475 =#
let var"##sync#48" = Base.Channel(Base.Inf)
#= task.jl:476 =#
var"#119#v" = for (ipid, pid) = enumerate(workers())
#= REPL[91]:2 =#
moo = deepcopy(moo_in)
#= REPL[91]:3 =#
moo.fname = fnames[ipid]
#= REPL[91]:4 =#
begin
#= logging.jl:370 =#
let
#= logging.jl:371 =#
var"#120#level" = Base.CoreLogging.Info
#= logging.jl:372 =#
var"#121#std_level" = Base.CoreLogging.convert(Base.CoreLogging.LogLevel, var"#120#level")
#= logging.jl:373 =#
if var"#121#std_level" >= Base.CoreLogging._min_enabled_level[]
#= logging.jl:374 =#
var"#122#group" = Symbol("REPL[91]")
#= logging.jl:375 =#
var"#123#_module" = Main
#= logging.jl:376 =#
var"#124#logger" = Base.CoreLogging.current_logger_for_env(var"#121#std_level", var"#122#group", var"#123#_module")
#= logging.jl:377 =#
if !(var"#124#logger" === Base.CoreLogging.nothing)
#= logging.jl:378 =#
var"#125#id" = :Main_a02e6fb6
#= logging.jl:381 =#
if Base.CoreLogging.invokelatest(Base.CoreLogging.shouldlog, var"#124#logger", var"#120#level", var"#123#_module", var"#122#group", var"#125#id")
#= logging.jl:382 =#
var"#126#file" = "REPL[91]"
#= logging.jl:383 =#
if var"#126#file" isa Base.CoreLogging.String
#= logging.jl:384 =#
var"#126#file" = (Base.CoreLogging.Base).fixup_stdlib_path(var"#126#file")
end
#= logging.jl:386 =#
var"#127#line" = 4
#= logging.jl:387 =#
local var"#128#msg", var"#129#kwargs"
#= logging.jl:388 =#
begin
#= logging.jl:359 =#
try
#= logging.jl:360 =#
var"#128#msg" = "sending $(moo) from manager to process $(pid)"
#= logging.jl:361 =#
var"#129#kwargs" = (;)
#= logging.jl:362 =#
true
catch var"#142#err"
#= logging.jl:364 =#
Base.CoreLogging.logging_error(var"#124#logger", var"#120#level", var"#123#_module", var"#122#group", var"#125#id", var"#126#file", var"#127#line", var"#142#err", true)
#= logging.jl:365 =#
false
end
end && Base.CoreLogging.invokelatest(Base.CoreLogging.handle_message, var"#124#logger", var"#120#level", var"#128#msg", var"#123#_module", var"#122#group", var"#125#id", var"#126#file", var"#127#line"; var"#129#kwargs"...)
end
end
end
#= logging.jl:394 =#
Base.CoreLogging.nothing
end
end
#= REPL[91]:5 =#
begin
#= task.jl:517 =#
let
#= task.jl:518 =#
local var"#143#task" = Base.Task((()->begin
#= task.jl:514 =#
remotecall_wait(say, pid, moo)
end))
#= task.jl:519 =#
if $(Expr(:islocal, Symbol("##sync#48")))
#= task.jl:520 =#
Base.put!(var"##sync#48", var"#143#task")
end
#= task.jl:522 =#
Base.schedule(var"#143#task")
#= task.jl:523 =#
var"#143#task"
end
end
#= REPL[91]:6 =#
end
#= task.jl:477 =#
Base.sync_end(var"##sync#48")
#= task.jl:478 =#
var"#119#v"
end
end
Thus all @sync does it ensures that all asynchronous tasks have completed at the end of the block.