Return the ith iteration value of a variable in a for loop when the broken from the outside using @async

Hi sorry if I am being dense here. I would like the option of terminating a for loop from outside the function. If the loop is completed it should return the value of the variable being modified. However if the loop is terminated prematurely it should return the value of a variable at that given iteration. Conceptually I guess something like.

function stoptest(a,b, rep= Ref(true))
    @async for i = 1:b
        rep[] || break
        @show a-=1
        sleep(8)
    end
    return(a,rep)
end
testvalue, cont = stoptest(10,3)

Needless to say this is not working. Firstly the loop is not terminating so it does not return the final value of 7 to testvalue if allowed to run. Secondly, if I break the loop early by setting cont[]=false, then somehow testvalue is assigned the value of 10 rather than the value at the ith iteration. Any thoughts on how this could be achieved would be greatly appreciated thanks!

What did you expect instead of 10? If you pass rep = Ref(false), the loop is broken at the first iteration, before any change is made to a. It can only return the value given to a in the call.

1 Like

@async <expression> is equivalent to schedule(Task( () -> <expression> )), which also returns that Task. That means that an anonymous function running that expression (and capturing whatever variables from the outer scope) was put in a Task that got scheduled to the asynchronous task queue.

In your case, the expression is that for-loop. After that loop is queued, your stoptest function continues without waiting for that task to finishing running, so it returns a when it is equal to 10. You maintain some control over that task running via cont because it holds the same mutable Ref that the task holds via rep. Your loop does terminate by itself on my machine, I’m guessing you think it doesn’t because you set your sleep for 8 seconds, so the task loops a pause over 8*3=24 seconds.

If you want the return to happen only after the task finishes running, you could surround the @async block with a @sync block, but that control flow is equivalent to not involving asynchronous flow at all, only with unnecessary work.

If you want a testvalue that asynchronously updates its value until it reaches 7 instead, you should input a = Ref(10) and decrement it by a[]-=1. You need to use a mutable Ref because the local a and global testvalue are not the same variables and do not share a scope, so the task cannot reassign testvalue, only mutate an instance they share. You can’t work with local a directly, only receive its return value, so a global testvalue is needed for asynchronous interaction with the task. If you don’t want to miss updates to such a shared value, try Channels.

2 Likes

Thanks so much for your helpful responses!

@Henrique_Becker Thanks for pointing this out. I’m pretty sure I got this wrong. I wanted to reference the fact that the function returns a value of 10 for testvalue regardless of which iteration was interrupted by rep = Ref(false).

@Benny Thanks so much for the detailed explanation. So even if the original function runs to completion the modified value of a will not be assigned to testvalue by the return statement because with @async the return statement has already been executed prior to the completion of the loop?

Just a follow up if you guys don’t mind.

Given the above, I should be able to attain the desired result like this?

function stoptest(a,b, rep= Ref(true))
    a = Ref(a) 
    @async for i = 1:b
        rep[] || break
        @show a[] -= 1
        sleep(8)
    end
    return(a,rep)
end

testvalue, cont = stoptest(10,3)

Because this returns a as a Ref object whose valued can be modified by the for loop? (In my tests testvalue[] does return the desired result of 7, but I wanted to make sure there wasn’t some kind of pitfall I was missing.)

It helps to think of variables as just nametags of instances, and these nametags only exist in their scope. When you return, you’re not sending the nametag, you’re sending the instance out of the function scope, as the call finishes and throws away those nametags. In your original example, the nametag a was attached to 10 when you reach the return statement, so the 10 is what made it out. The task itself captured the variable a so it’s still reassigning it, but in the global scope, you have no access to that task’s a.

The Ref makes it so that a is not reassigned, and testvalue has the same 1 instance as the task’s a does. This is the purpose of mutable types, it’s less about changing data, more about the data being shared by multiple variables. You don’t need mutability to change data, you can just reassign a variable with new instances like a -= 1; however, that kind of change can’t be shared by multiple variables, which you need here.

2 Likes

While I was not thinking about synchronism at the time, I have an answer about Julia’s variable model that may help you make a mental model: Is it appropriate to say a variable is a label or box? - #19 by Henrique_Becker

1 Like

In your original example:

function stoptest(a,b, rep= Ref(true))
    @async for i = 1:b
        rep[] || break
        @show a-=1
        sleep(8)
    end
    return(a,rep)
end

we can use @code_lowered to take a look at how Julia represent this.

julia> @code_lowered stoptest(10, 3, Ref(true))
CodeInfo(
1 ─       a@_8 = a@_2
│         a@_8 = Core.Box(a@_8)
│   %3  = Main.:(var"#3#4")
│   %4  = Core.typeof(b)
│   %5  = Core.typeof(rep)
│   %6  = Core.apply_type(%3, %4, %5)
│   %7  = a@_8
│         #3 = %new(%6, %7, b, rep)
│   %9  = #3
│         task = Base.Task(%9)
└──       goto #3 if not false
2 ─       Base.put!(Main.:(var"##sync#48"), task)
3 ┄       Base.schedule(task)
│         task
│   %15 = Core.isdefined(a@_8, :contents)
└──       goto #5 if not %15
4 ─       goto #6
5 ─       Core.NewvarNode(:(a@_7))
└──       a@_7
6 ┄ %20 = Core.getfield(a@_8, :contents)
│   %21 = Core.tuple(%20, rep)
└──       return %21
)

So we can see that due to the the task a becomes a box a@_8 = Core.Box(a@_8).
We then schedule that task, and it may or may not run immediately.

In %20 = Core.getfield(a@_8, :contents) we then read the contents of the box. Depending on order of execution you may or may not find the original value 10, 9, 8, or 7.

This is why @sync changes the value to be 7.

function stoptest(a,b, rep= Ref(true))
    @sync @async for i = 1:b
        rep[] || break
        @show a-=1
    end
    return(a,rep)
end
julia> stoptest(10, 3, Ref(true))
a -= 1 = 9
a -= 1 = 8
a -= 1 = 7
(7, Base.RefValue{Bool}(true))

Another fun variant is:

function stoptest(a,b, rep= Ref(true))
     @async for i = 1:b
        rep[] || break
        @show a-=1
    end
    yield()
    return(a,rep)
end

Which at least on my computer leads to the intermediate state:

julia> stoptest(10, 3, Ref(true))
a -= 1 = 9
a -= 1 = 8
a -= 1 = 7(
9, Base.RefValue{Bool}(true))

More formally your program has a write-read race and you need to use some sort of synchronization mechanism to resolve that concurrency race.

1 Like

Thanks for all the help! I’m still working through each answer to get a better mental model of how this works, but really appreciate you guys taking the time to give such perspicuous explanations.