Multiple `async` sources for a `sync` `Rocket.jl` actor

Hi,

I am learning reactive programming with the great package Rocket.jl and have a question (pinging the lead dev @bvdmitri). A MWE of my use case to make my question explicit is shown below.

import Rocket

struct MyActor <: Rocket.Actor{Union{Char, Int}} end

function Rocket.next!(a::MyActor, c::Char)
    println("got char $c, waiting ..."); flush(stdout)
    sleep(1)
    println("leaving char"); flush(stdout)
    return nothing
end

function Rocket.next!(a::MyActor, c::Int)
    println("got Int $c, waiting ..."); flush(stdout)
    sleep(1)
    println("leaving Int"); flush(stdout)
    return nothing
end

Rocket.complete!(a::MyActor) = println("done"); flush(stdout)

intsource = Rocket.make(Int) do actor
    Rocket.setTimeout(1000) do
        Rocket.next!(actor, 1)
    end
    Rocket.setTimeout(2000) do
        Rocket.next!(actor, 2)
    end
    Rocket.setTimeout(3000) do
        Rocket.next!(actor, 3)
    end
    Rocket.setTimeout(5000) do
        Rocket.complete!(actor)
    end
end

charsource = Rocket.make(Char) do actor
    Rocket.setTimeout(1000) do
        Rocket.next!(actor, 'a')
    end
    Rocket.setTimeout(2000) do
        Rocket.next!(actor, 'b')
    end
    Rocket.setTimeout(3000) do
        Rocket.next!(actor, 'c')
    end
    Rocket.setTimeout(5000) do
        Rocket.complete!(actor)
    end
end

actor = Rocket.sync(MyActor())

Rocket.subscribe!(charsource |> Rocket.async(), actor)
Rocket.subscribe!(intsource  |> Rocket.async(), actor)

yield()
wait(actor)

There is a sync actor that is fed by multiple async sources. In this example, the actor is fed Chars and Ints and elements arrive asynchronously (in my real case, these would be network events). In each of the two next! method definitions, there is some computation that takes a bit of time.

I would expect that, once any of these two methods gets called first, the function ends before any other next! can be called. Depending on the temporal separation of the async events, and on the duration of the computations inside the next! methods, this might not be true. A typical output from the code above is

got char a, waiting ...
got Int 1, waiting ...
leaving char
got char b, waiting ...
leaving Int
got Int 2, waiting ...
leaving char
got char c, waiting ...
leaving Int
got Int 3, waiting ...
leaving char
leaving Int
done

while I would expect

got char a, waiting ...
leaving char
got Int 1, waiting ...
leaving Int
got char b, waiting ...
leaving char
got Int 2, waiting ...
leaving Int
got char c, waiting ...
leaving char
got Int 3, waiting ...
leaving Int
done

It is likely that I am misusing some of the package functionality, but any clarification would be appreciated.

1 Like

Hi @gasagna!

Thanks for your feedback!

Could you please try your example with:

actor = Rocket.sync(MyActor(); withlock = true)

It will force inner asynchronous calls to be synchronised with ReentrantLock().

You may also look into merged() operator, e.g, merged((charsource, intsource)) |> Rocket.async().
For example this piece of code has the same output you would like:

actor = Rocket.sync(MyActor())

Rocket.subscribe!(Rocket.merged((charsource, intsource)) |> Rocket.async(), actor)

wait(actor)

This behaviour has nothing to do with Rocket.jl in general, but just with Julia’s asynchronous programming. Rocket.jl does not introduce its own asynchronous scheduler but uses Julia’s Task object and @async macro.

In your particular example I would not expect the output you want without withlock argument, because both println and sleep are asynchronous calls in Julia as well and they do yield an execution to other tasks in the system. So what is happening in a little bit more details:

  1. First of all setTimeout in both sources start almost simultaneously
  2. Depending on Julia’s task dispatcher you will get either ‘got char…’ or ‘got Int…’ first
  3. sleep(1) yields an execution to other tasks so you get second print with got ...

In this example it is important to understand that both println and sleep yield an execution to other asynchronous tasks in the system. It will never wait until next! function is executed without withlock = true.

In general I would suggest you not to expect any predefined order from asynchronous execution of multiple sources, because it really may depend on many aspects like hardware, Julia’s version, operating system etc. It also may differ in different programming languages. Its better to write asynchronous code which is independent on the actual order of events from multiple sources. Rocket.jl async() operator only guaranties you that you will receive events in order from a single observable, e.g. you will never receive ‘c’ before ‘a’, or you will never receive ‘3’ before ‘1’.

P.S. I just noticed that our documentation is misleading, it states that withlock = true by default, but actually it is withlock = false by default, we will change that of course.

P.S.S. sync() actor does not sync after very first complete! event. It assumes that it will receive completion event only once. If you want a different behaviour I would suggest you to check sync implementation in Rocket.jl repository and reimplement it a bit differently. merged() operator solves this problem easily because it completes only after all inner sources has been completed.

1 Like

Hi @bvdmitri,

thanks for the clear and exhaustive reply.

I did suspect that the print statements were yielding execution to other tasks. The confusion was that I thought that withlock=true was enabled by default, that’s why this behaviour looked strange. Good that the docs can be fixed.

Regarding the order of execution, this is not important for me. In the example I have reported, the timeouts of the two sources are equal so that events are generated closer in time than the time it takes for the computation in each next! call to finish.

By adding withlock=true, some of the events are dropped. For instance, with

actor = Rocket.sync(MyActor(); withlock=true)

the output is

got char a, waiting ...
leaving char
got char b, waiting ...
leaving char
got char c, waiting ...
leaving char
done

i.e. all the intsource events are dropped. Not sure this is the intended behaviour, but at least using merged avoids this and all events are processed.

Hm, interestingly but intsource events aren’t dropped for me (I run your example in Jupyter notebook). But the output is still not the same as you would expect without merged().

got char a, waiting ...
leaving char
got char b, waiting ...
leaving char
got char c, waiting ...
leaving char
got Int 1, waiting ...
leaving Int
got Int 2, waiting ...
leaving Int
got Int 3, waiting ...
leaving Int
done
done

I’m pretty much sure that is has to do something with the fact that sync() actor assumes only one completion event (though I think we can change it in the future). So it probably drops the second source or maybe printing is failing. merged() operator deals with that automatically by merging two completion events into one.

It is also an interesting example of my previous message that asynchronous behaviour really may depend on a hardware, operating system and environment where you run your code (in my case its MacOS, Jupyter notebook, Julia 1.6.1).

You may also look into factory feature. For example:

struct MyActorFactory <: Rocket.AbstractActorFactory end

# Factory needs to have one single method `create_actor` which will be executed during each subscription
# Here L gives you the exact type of the stream, so you can drop Union{Char, Int} in your actor and use the exact L
# e. g. struct MyActor{L} <: Rocket.Actor{L} end
# but in this example I didn't change you original implementation
Rocket.create_actor(::Type{L}, factory::MyActorFactory) where { L } = MyActor()
factory = Rocket.sync(MyActorFactory(); withlock = true)

Rocket.subscribe!(charsource |> Rocket.async(), factory)
Rocket.subscribe!(intsource |> Rocket.async(), factory)

wait(factory)

In this example sync factory will create two actors for each source independently and will wait for both of them. The downside here is that withlock will also be independent for both actors and you will have unsynchronised events again. With factories you also don’t have a direct reference to actors (you may save it though in create_actor method if you want). With you original example the better approach is to use merged() of course, but factories might be also useful in many cases. For example builtin logger() actor is actually the factory that creates a LoggerActor every time it subscribes to some source.

I already fixed sync() to have withlock = true by default to be consistent with the documentation. I will probably wait a little bit more for other bugfixes for a new minor release.

Hi @bvdmitri,

thanks for your reply.

Yes, interesting indeed. In my Jupiter notebook, I get the same behaviour as in the REPL.

The factory approach is interesting, but the actors in my application have permanent internal state and logically it does not make sense to have more than one. Thank for the tip anyway.

Regarding your previous P.S.S comment,

Would it be possible to add a keyword argument so that a merged source ends its execution stream when the first (not the last) source completes? The current implementation is greedy and consumes all events in all sources, but a non-greedy mode would be useful. For instance, in my use case, I have an observable monitoring the keyboard that completes when a certain key is pressed. If I merge this observable with other sources, I would be able to stop the overall execution, which is something I cannot do with the current behaviour.

You can already solve this problem by using take_until() operator. For example you may do something like this:

certain_key = get_keyboard_events() |> filter(is_certain_key_pressed)

subscription = subscribe!(get_some_events() |> take_until(certain_key))
...

Yes, this works, thanks.

One little comment about this solution. Most keyboard events should be processed normally by the actor and only, e.g., the key q is used to terminate execution. Based on your snippet, my implementation now looks like this

keyboard = make_keyboard_subject()

certain_key = keyboard |> filter(char -> char == 'q')

subscription = subscribe!( merged((other_sources, keyboard)) |> take_until(certain_key), myactor)

As you can see, the keyboard stream is both fed to myactor and used to end execution. I would expect that once q is pressed, the execution ends without sending the key to myactor. Instead, the event is also sent to the actor. Here is a MWE to reproduce this behaviour

using Rocket

# 
struct KeyboardStream
    ch :: Channel{Char}
    function KeyboardStream()
        ch = Channel{Char}(10)
        @async while true
            ret = ccall(:jl_tty_set_mode, Int32, (Ptr{Cvoid},Int32), stdin.handle, true)
            char = read(stdin, Char)
            ccall(:jl_tty_set_mode, Int32, (Ptr{Cvoid}, Int32), stdin.handle, false)
            put!(ch, char)
        end
        return new(ch)
    end
end

# iteration interface
Base.eltype(::Type{KeyboardStream}) = Char
Base.IteratorSize(::Type{KeyboardStream}) = Base.IsInfinite()
Base.IteratorEltype(::Type{KeyboardStream}) = Base.HasEltype()
Base.take!(t::KeyboardStream) = take!(t.ch)
Base.iterate(t::KeyboardStream, state::Int = 0) = (take!(t), 0)

# make subject from typed iterable
function make_subject(iter::F) where {F}
    subject = Rocket.Subject(eltype(iter))
    @async begin
        for el in iter
            next!(subject, el)
        end
        complete!(subject)
    end
    return subject
end

# did someone hit `q` ?
is_q_pressed(char::Char) = char == 'q'

# create keyboard subject
keyboard = make_subject(KeyboardStream())

# other source
other_source = Rocket.make(Int) do actor
    Rocket.setTimeout(1000) do
        Rocket.next!(actor, 1)
    end
    Rocket.setTimeout(2000) do
        Rocket.next!(actor, 2)
    end
    Rocket.setTimeout(3000) do
        Rocket.next!(actor, 3)
    end
    Rocket.setTimeout(5000) do
        Rocket.complete!(actor)
    end
end

# print stuff fed to the actor
slogger = sync(logger())

# try hitting q after you see 1 printed
subscribe!(merged((keyboard, other_source))             |> 
           take_until(keyboard |> filter(is_q_pressed)) |> async(), slogger)

yield()
wait(slogger)

Hitting q as soon as 1 gets printed gives this output

[LogActor] Data: 1
[LogActor] Data: q
[LogActor] Completed

while I would expect to see

[LogActor] Data: 1
[LogActor] Completed

It’s not the end of the world, but i would expect take_until to have precedence over the observable it is supposed to end.

I think you are right. Rocket.jl has been designed to mimic the same API as RxJS library. I quickly checked their documentation and implementation and it seems to me that take_until() operator indeed should have precedence over the observable it is supposed to end. In our implementation it is the opposite now. I think it might be considered as a bug. I will work on a fix for that. Good catch!

1 Like

Should be fixed in 1.3.11. The update is waiting for a merge in the General Registry. It should take less than an hour.

P.S. There is the iterable operator that is mostly the same as your make_subject function. It takes any iterable object as an input and converts it in the observable sequence. It is not a Subject though. You may also pass an async scheduler as an extra argument, something like

keyboard = iterable(KeyboardStream(), scheduler = AsyncScheduler())

Thanks for pushing this quickly.

Regarding your P.S., I am aware of iterable, but I really need a Subject, since keyboard events need to be broadcasted to the actor and to the take_until operator.

Ah, that’s true. For these purposes when you need to multicast same observable to multiple actors in the system you may use share() (or its variants) operator. Under the hood it uses Subject to share the original source.