Unexpected behavior of @async

I find this easiest to explain with an MWE. Basically, @async is changing values in a remote task when I think it shouldn’t. (Julia 1.8.1)

using Distributed

fnames = ["inside_1", "inside_2"]
addprocs(length(fnames))

@everywhere begin
    mutable struct Moo
        fname :: String
    end

    function say(thing)
        @info "I am $(thing) sent to process $(myid())"
    end

    function doit(moo_in::Moo, fnames)
        # moo = deepcopy(moo_in) # LINE A
        @sync for (ipid, pid) in enumerate(workers())
            moo = deepcopy(moo_in) # LINE B
            moo.fname = fnames[ipid]
            @info "sending $(moo) from manager to process $pid"
            @async remotecall_wait(say, pid, moo)
        end
    end
end

moo = Moo("outside")
doit(moo, fnames)

gives, as expected,

[ Info: sending Moo("inside_1") from manager to process 2
[ Info: sending Moo("inside_2") from manager to process 3
      From worker 2:    [ Info: I am Moo("inside_1") sent to process 2
      From worker 3:    [ Info: I am Moo("inside_2") sent to process 3

But if I comment line B and uncomment line A (because I don’t want to copy structs within a loop in my actual use case), and rerun the @everywhere block and then doit(), I get

[ Info: sending Moo("inside_1") from manager to process 2
[ Info: sending Moo("inside_2") from manager to process 3
      From worker 3:    [ Info: I am Moo("inside_2") sent to process 3
      From worker 2:    [ Info: I am Moo("inside_2") sent to process 2

I thought the @sync for should ensure that what goes into the @async task is unique? Why is moo going to worker process 2 changing when I move the deepcopy outside the loop? I am flummoxed. It gets even weirder, but I’ll wait for responses first.

You have one value for moo, and are sending it to all tasks. Versus having a different moo sent to each async block

Sure, but moo.fname = fnames[ipid] is within an @sync block which is sequentially executed, not @async ; so the @async remotecall should get a unique fname within moo per ipid, no?

It’s a race condition I suppose. The first remote call (thanks to @async) doesn’t execute until the loop comes around a second time and modifies moo.

I’d say the cleanest option is B; copy the object inside the loop. But if you must do it the other way, this works

function doit(moo_in::Moo, fnames)
    moo = deepcopy(moo_in) # LINE A
    wait_for = Future[]
    for (ipid, pid) in enumerate(workers())
        moo.fname = fnames[ipid]
        @info "sending $(moo) from manager to process $pid"
        push!(wait_for, remotecall(say, pid, moo))
    end
    wait.(wait_for)
    @info "Done."
end

Interesting, thanks. I’d have imagined the @sync for with the @async remotecall_wait was equivalent to your solution, which actually looks quite clean to me. With the exception of assignment to Future s which could be pre allocated, is there a downside to using a normal for with remotecalls? Also, is this a bug?

I don’t think it’s a bug. @async wraps remotecall_wait in a task and schedules it. However, when that task is run, is at the discretion of the scheduler. Here it doesn’t run before the loop has come around a second time. That’s not a problem if a unique moo is instantiated at every iteration, but by only modifying the fname field, all those tasks close over the same moo – which at the time when the tasks finally do run, holds the last value. Which is what Jameson explained earlier in fewer words :slight_smile:

I don’t see a conceptual problem with my approach. What also works is using asyncmap

function pdoit(moo)
   moo = deepcopy(moo)
   asyncmap(enumerate(workers())) do (ipid, pid)
     moo.fname = fnames[ipid]
     remotecall_wait(say, pid, moo)
   end
end

I think you might be confused about what @sync does.

help?> @sync
@sync

Wait until all lexically-enclosed uses of @async, @spawn, @spawnat and @distributed are complete. All exceptions thrown by enclosed async operations are collected and thrown as a CompositeException.

It does not change the execution order of anything within the block. All it does is wait at the end of the block for all the asynchronous tasks to complete.

Let’s say you have three workers. The code is then essentially equivalent to the following.

    function doit(moo_in::Moo, fnames)
        ipid = 1
        pid = 1
        moo = deepcopy(moo_in) # LINE B
        moo.fname = fnames[ipid]
        @info "sending $(moo) from manager to process $pid"
        t1 = @async remotecall_wait(say, pid, moo)

        ipid = 2
        pid = 2
        moo = deepcopy(moo_in) # LINE B
        moo.fname = fnames[ipid]
        @info "sending $(moo) from manager to process $pid"
        t2 = @async remotecall_wait(say, pid, moo)

        ipid = 3
        pid = 3
        moo = deepcopy(moo_in) # LINE B
        moo.fname = fnames[ipid]
        @info "sending $(moo) from manager to process $pid"
        t3 = @async remotecall_wait(say, pid, moo)
        wait([t1, t2, t3])
    end

The @sync macro is literally expanded as follows:

julia> @macroexpand1 @sync for (ipid, pid) in enumerate(workers())
                   moo = deepcopy(moo_in) # LINE B
                   moo.fname = fnames[ipid]
                   @info "sending $(moo) from manager to process $pid"
                   @async remotecall_wait(say, pid, moo)
               end
quote
    #= task.jl:475 =#
    let var"##sync#48" = Base.Channel(Base.Inf)
        #= task.jl:476 =#
        var"#118#v" = for (ipid, pid) = enumerate(workers())
                #= REPL[89]:2 =#
                moo = deepcopy(moo_in)
                #= REPL[89]:3 =#
                moo.fname = fnames[ipid]
                #= REPL[89]:4 =#
                #= REPL[89]:4 =# @info "sending $(moo) from manager to process $(pid)"
                #= REPL[89]:5 =#
                #= REPL[89]:5 =# @async remotecall_wait(say, pid, moo)
                #= REPL[89]:6 =#
            end
        #= task.jl:477 =#
        Base.sync_end(var"##sync#48")
        #= task.jl:478 =#
        var"#118#v"
    end
end

If you expand @async as well via @macroexpand, you will see that all that happens is that each task created by @async is pushed into the channel var"##sync#48". At the end, each task in var"##sync#48" is waited on until completion.

Full macroexpansion
julia> @macroexpand @sync for (ipid, pid) in enumerate(workers())
                   moo = deepcopy(moo_in) # LINE B
                   moo.fname = fnames[ipid]
                   @info "sending $(moo) from manager to process $pid"
                   @async remotecall_wait(say, pid, moo)
               end
quote
    #= task.jl:475 =#
    let var"##sync#48" = Base.Channel(Base.Inf)
        #= task.jl:476 =#
        var"#119#v" = for (ipid, pid) = enumerate(workers())
                #= REPL[91]:2 =#
                moo = deepcopy(moo_in)
                #= REPL[91]:3 =#
                moo.fname = fnames[ipid]
                #= REPL[91]:4 =#
                begin
                    #= logging.jl:370 =#
                    let
                        #= logging.jl:371 =#
                        var"#120#level" = Base.CoreLogging.Info
                        #= logging.jl:372 =#
                        var"#121#std_level" = Base.CoreLogging.convert(Base.CoreLogging.LogLevel, var"#120#level")
                        #= logging.jl:373 =#
                        if var"#121#std_level" >= Base.CoreLogging._min_enabled_level[]
                            #= logging.jl:374 =#
                            var"#122#group" = Symbol("REPL[91]")
                            #= logging.jl:375 =#
                            var"#123#_module" = Main
                            #= logging.jl:376 =#
                            var"#124#logger" = Base.CoreLogging.current_logger_for_env(var"#121#std_level", var"#122#group", var"#123#_module")
                            #= logging.jl:377 =#
                            if !(var"#124#logger" === Base.CoreLogging.nothing)
                                #= logging.jl:378 =#
                                var"#125#id" = :Main_a02e6fb6
                                #= logging.jl:381 =#
                                if Base.CoreLogging.invokelatest(Base.CoreLogging.shouldlog, var"#124#logger", var"#120#level", var"#123#_module", var"#122#group", var"#125#id")
                                    #= logging.jl:382 =#
                                    var"#126#file" = "REPL[91]"
                                    #= logging.jl:383 =#
                                    if var"#126#file" isa Base.CoreLogging.String
                                        #= logging.jl:384 =#
                                        var"#126#file" = (Base.CoreLogging.Base).fixup_stdlib_path(var"#126#file")
                                    end
                                    #= logging.jl:386 =#
                                    var"#127#line" = 4
                                    #= logging.jl:387 =#
                                    local var"#128#msg", var"#129#kwargs"
                                    #= logging.jl:388 =#
                                    begin
                                            #= logging.jl:359 =#
                                            try
                                                #= logging.jl:360 =#
                                                var"#128#msg" = "sending $(moo) from manager to process $(pid)"
                                                #= logging.jl:361 =#
                                                var"#129#kwargs" = (;)
                                                #= logging.jl:362 =#
                                                true
                                            catch var"#142#err"
                                                #= logging.jl:364 =#
                                                Base.CoreLogging.logging_error(var"#124#logger", var"#120#level", var"#123#_module", var"#122#group", var"#125#id", var"#126#file", var"#127#line", var"#142#err", true)
                                                #= logging.jl:365 =#
                                                false
                                            end
                                        end && Base.CoreLogging.invokelatest(Base.CoreLogging.handle_message, var"#124#logger", var"#120#level", var"#128#msg", var"#123#_module", var"#122#group", var"#125#id", var"#126#file", var"#127#line"; var"#129#kwargs"...)
                                end
                            end
                        end
                        #= logging.jl:394 =#
                        Base.CoreLogging.nothing
                    end
                end
                #= REPL[91]:5 =#
                begin
                    #= task.jl:517 =#
                    let
                        #= task.jl:518 =#
                        local var"#143#task" = Base.Task((()->begin
                                            #= task.jl:514 =#
                                            remotecall_wait(say, pid, moo)
                                        end))
                        #= task.jl:519 =#
                        if $(Expr(:islocal, Symbol("##sync#48")))
                            #= task.jl:520 =#
                            Base.put!(var"##sync#48", var"#143#task")
                        end
                        #= task.jl:522 =#
                        Base.schedule(var"#143#task")
                        #= task.jl:523 =#
                        var"#143#task"
                    end
                end
                #= REPL[91]:6 =#
            end
        #= task.jl:477 =#
        Base.sync_end(var"##sync#48")
        #= task.jl:478 =#
        var"#119#v"
    end
end

Thus all @sync does it ensures that all asynchronous tasks have completed at the end of the block.

1 Like