Iterating through an inner loop with @async

Suppose I have the following function stoptest where I am able to stop the function from outside the loop by setting a Ref() object = false. The return variable should equal to the value of the variable in the println when the function is stopped. e.g.

function stoptest(a, b, rep = Ref(true))
    a = Ref(a) 
    @async for i = 1:b
        rep[]||break
        println(a[] -= 1)
        sleep(5)
    end
    return(a,rep)
end

This returns

a,cont = stoptest(10,3)
9
(Base.RefValue{Int64}(9), Base.RefValue{Bool}(true))
8

cont[]=false
false

a[]
8

How can I run stoptest through elements of an Array in a for loop? i.e. The desired output would be

b, cont = trial([10,20,30],3)
9
8
7
(Base.RefValue{Int64}(30), Base.RefValue{Bool}(true))

19
18
17
cont[]=false
false

b[]
17

If I try plugging stoptest directly into a wrapper function

function trial(J,b, rep = Ref(true))
    l = Ref(0)
    @async for i = J 
        rep[] || break 
        k,rep = stoptest(i,b,rep)
        l[]=k[] 
    end
    return (l, rep)
end

The output is

b, cont = trial([10,20,30],3)
9
19
29
(Base.RefValue{Int64}(30), Base.RefValue{Bool}(true))

8
18
28
cont[]=false
false

b[]
30

The function will complete the println on each element before moving onto the next iteration by dropping the inner @async for loop . However I am still unable to get the function to return the value of k that is printed in the REPL at the time the function is stopped.

function trial2(J,b, rep = Ref(true))
    k = Ref(0)
    @async for i = J
        k[] = i 
        for l = 1:b
            rep[]||break
            println(k[] -= 1)
            sleep(5)
        end
    end
    return(k,rep)
end

This is pretty clunky code, but just wondering if there was a simple method to get the desired behavior by plugging stoptest into a wrapper function or if there was a better way to go about it. Thank you so much!

1 Like

Your trial2 returns immediately after scheduling the task, and is likely that this will happen even before first println(k[] -= 1) gets executed.

Another interpretation is that you are referring to the value that is printed by the last call to println(k[] -= 1) (and that would mean to you that the function is stopped). If this is the right interpretation, then it is not reasonable to expect the function trial2 to return something that will only be realized in the future, at least not in this form. You could wait for the scheduled task to finish and only return after all the work was performed. In this scenario, you can define something like t = @async for... and wait(t) before the return statement. In that way, you can guarantee only to exit the function after the entire work is done.

Also, by returning k instead of k[], you are returning the reference, not the actual value that was last printed.

1 Like

Now, related to your trial function.

You are basically scheduling a task that will in turn schedule three instances of stoptest:

Scheduling stoptest a=10 b=3
Scheduling stoptest a=20 b=3
Scheduling stoptest a=30 b=3

These are scheduled (almost) at the same time and will run their course in a concurrent manner.

Your desired output implies that you want to wait for each stoptest to finish before starting the next one. For this to happen you need to somehow create the possibility to wait for the task scheduled inside stoptest.

Consider this:

function stoptest(a, b, rep = Ref(true))
    a = Ref(a) 
    t = @async for i = 1:b
        rep[]||break
        println(a[] -= 1)
        sleep(5)
    end
    return(t, a, rep)
end

You can see that the task t is also returned. Now, in your trial:

function trial(J,b, rep = Ref(true))
    l = Ref(0)
    @async for i = J 
        rep[] || break 
        println("Scheduling stoptest a=$i b=$b")
        t, k, rep = stoptest(i,b,rep)
        wait(t)
        l[]=k[]         
    end
    return (l, rep)
end

Please notice the wait instruction: in this way, we ensure that the loop is only going to progress to the next vector/array element after the task scheduled inside stoptest is completed.

And now you should be able to see your desired outcome (9, 8, 7, 19, 18 …). And indeed setting your ref cell to false will stop the progression of work altogether (however, you don’t have the guarantee to instantly stop all the tasks instantly - since it is likely that your switch off is going to take place inside the 5 seconds sleep interval - but indeed the work will be stopped: no more println... will get executed afterward).

I hope this helps.

P. S. I tried not to alter your code too much - and this might be the simplest way to transition from your existing code to the desired behavior (e.g., exposing the task and waiting on it). There might be other ways to refactor this and obtain the same behavior more elegantly (maybe take a look at Channels).

P. P. S. I also feel that I should point you to the following warning/suggestion from the documentation:

It is strongly encouraged to favor Threads.@spawn over @async always even when no parallelism is required especially in publicly distributed libraries. This is because a use of @async disables the migration of the parent task across worker threads in the current implementation of Julia. Thus, seemingly innocent use of @async in a library function can have a large impact on the performance of very different parts of user applications.

Switching from @async to @spawn will not require additional changes to your code (besides the actual expression swapping).

1 Like

Thank you so much for this very helpful answer. It is genuinely appreciated. Am I correct in understanding that the only difference between @async and @spawn is that Threads.@spawn creates a task that can be executed on any available thread, while @async creates a task that is scheduled to run on the same thread as the parent task?

Also going over the documentation on Channel I’m confused as to how it would work in a situation like this. Would you mind providing some guidance or a simple example because I am having a hard time getting Channels to work. I assume the idea would be to put!() each element of an Array J into a channel and then run stoptest on each element the via take!() thereby eliminating the need to wait for a task to complete?

1 Like

I don’t have a good understanding of the internals intricacies related to @async vs @spawn. For me it was enough to stick for the moment with @spawn so I can avoid bad surprises due to async disables the migration of the parent task across worker threads issue. However, I think you might find good threads discussing the issue here on discourse (and also on Julia’s repo).

Or maybe somebody more knowledgeable on the matter can jump in and provide an up-to-date answer.

1 Like

The channels can be tricky at the beginning, but at least in my experience, they get really rewarding once you start getting an intuition about how they work.

Imagine a channel like a pipe where you can add work from multiple threads/tasks (or the same thread) (usually via put!) and consume it (from multiple threads/tasks) at the other end (usually via take!).

A fast rework to get you started switching to the channel version (this means that with some more thinking, it might be greatly improved) might be this:

function stoptest(a, b, rc)    
    rc[] = a
    for _i = 1:b        
        println(rc[] -= 1)
        sleep(5)
    end
end

function dowork(c, rc, rep) 
    while rep[]
        (a,b) = take!(c)
        stoptest(a, b, rc)
    end    
    @info "dowork done"
end

function trial(J, b, rep = Ref(true))
    c = Channel(length(J))
    rc = Ref(0)
    for i = J         
        put!(c, (i, b))
    end
    errormonitor(Threads.@spawn dowork(c, rc, rep))
    return (c, rc, rep)
end

c, rc, rep = trial([10, 20, 30], 3)
put!(c, (50, 3))
# after 29, 28, 27... you'll also see the outputs 49, 48, 47
# that is because more work was added to the channel
# however if after a while you do rep[] = false, the "dowork" will exit 
# and adding work to the channel will do nothing
# notice that your channel has a size equal with the original array size 
# this has implications: if you attempt put! when the channel is full, 
# the put! will be a blocking call
# and you'll have to wait for the do work to consume from the channel

You can see that I only spawn one task (the consumer). However, if your scenario also implies adding the work in an async manner, you can consider a function that can be started as an additional task that will wrap the work related to put!.

I also suggest running the examples in the documentation and messing around with the code in those examples.

1 Like

Thanks again, I’ve been having alot of trouble understanding how to use channels so this was super helpful and incredibly instructive.

Given the while rep[] in dowork, if i set rep = false it will still finish the current iteration of stoptest. So if I set rep[]=false after stoptest has already initiated and prints 19 it will continue until it prints 17 and rc[]= 17 as opposed to stopping at 19.

I can revert to the behavior of your original solution by adding an additional rep[] || break to stoptest itself.

function stoptest(a, b, rc, rep)    
    rc[] = a
    for _i = 1:b 
        rep[]||break        
        println(rc[] -= 1)
        sleep(5)
    end
    return(rep)
end

function dowork(c, rc, rep) 
    while rep[]
        (a,b) = take!(c)
        stoptest(a, b, rc, rep)
    end    
    @info "dowork done"
end

But I’m confused as to why I don’t need to add another @spawn inside stoptest itself, like

function stoptest(a, b, rc, rep)    
    rc[] = a
    @spawn for _i = 1:b 
        rep[]||break        
        println(rc[] -= 1)
        sleep(5)
    end
    return(rep)
end

In fact when I tried that the output was quite unexpected. Does @spawn dowork(c,rc,rep) apply the macro to each and every loop within dowork()?

Yes, you can propagate the ref cell wherever you need. Sorry for breaking that - I am not sure what my thinking was when I removed that from stoptest.

Now, let’s get to why not @spawn inside stoptest.

You don’t want your stoptest to spawn a new task because, in your case you actually want to wait for any stoptest to finish before running stoptest with the next element of the array.

Look at the dowork - that while loop is basically taking elements from the channel (in a FIFO manner) and calls the steoptest function with those values. Now - if your stoptest function is spawning a task, it will just return right away, and the while loop will progress and retrieve the next element in the channel buffer - and fire another stoptest - and so on (and you’ll end up with multiple concurrent tasks running - all spawned inside multiple calls to stoptest).

So by doing that (e.g., spawning multiple tasks from stoptest) you are actually creating the exact behavior that you wanted to avoid. Now, to be fair - you could spawn a task inside stoptest and return the task reference inside while (e.g., t = stoptest...) - and then wait(t). But there is no benefit in doing this given your workflow: because you don’t want concurrent behavior for multiple stoptest calls.

Is this clarifying things a little bit?

1 Like

yes! thanks so much. I’m going to have to read through it a few more times to fully understand but this is really helpful thanks.

1 Like