Rocket (Reactive Programming) Actor to Actor Message Passing, etc

Greetings. I found this amazing package as a framework for reactive programming. I have a few questions about the best practice for some patterns.

  1. Actor to Actor message passing.
    The actor subscribes to an observable. Typically it consumes the information and does something. Its actions would impact other actors. What’s the best practice to have other actors listen to any information that it gives out? One example could be, Jack (observable in this case) went on to amazon (actor) and bought ice cream from an ice cream shop ( actor ) intermittently throughout the day. Amazon needs to relay the information “send ice cream to jack” to the ice cream shop whenever Jack placed an order. Would a subject the best way to do this? here is my implementation
using Rocket

AmazonOrders = Subject(Int)

struct Amazon <: Actor{Int} end
struct IceCreamShop <: Actor{Int} end

function send_order_to_ice_cream_shop(data::Int, AmazonOrders)
    println("Amazon received $(data) Order; sending to ice cream shop")
    next!(AmazonOrders, data)

Rocket.on_next!(actor::Amazon, data::Int) = send_order_to_ice_cream_shop(data, AmazonOrders)
Rocket.on_complete!(actor::Amazon) = println("Amazon Complete")

Rocket.on_next!(actor::IceCreamShop, data::Int) = println("IceCreamShop receieved Order to purchase $data ice cream")
Rocket.on_complete!(actor::IceCreamShop) = println("IceCreamShop Complete")

jack_orders = from([1,3,4,2])

subshop = subscribe!(AmazonOrders, IceCreamShop());
subamazon = subscribe!(jack_orders, Amazon());

  1. What’s the best way to incorporate streaming information from a websocket as observable in the Rocket framework? An example would be appreciated.
  2. How does the framework deal with race conditions? like if an actor has an internal state, and depending on the information it receives, it can modify its internal state. What will happen if while the actor is processing the first event, a second event is pushed to the observable that the actor listens to? Would the actor just process the second event after processing the first event?

Thank you!


Thanks for your feedback!

  1. Usually each subscription creates its own execution of observable. For example if you subscribe on some network resource each subscription will yield a new connection. However if you want to distribute some information from a single source (e.g. one network connection) to multiple actors Subject is the right way to do it. Rocket.jl provides share() and publish() operators for these purposes (link). They use Subject and provide a simple mechanism for multicasting messages from one observable to multiple actors. However if you just want to communicate between actors you can simply call next!(other_actor, some_data). You will need to store a reference to the other_actor somewhere of course.

  2. Currently Rocket.jl does not provide any observable implementation for WebSockets. You may check the network observable example. To implement a custom observable you will need to write Rocket.on_subscribe!(observable::WebSocketObservable, actor) logic for WebSockets which should return a subscription. Here you can find more information about implementing custom observables (link1, link2).

  3. For performance reasons framework does not deal with race conditions. If you think that it might be important in your case it should be relatively straightforward to guard your code with Condition or Semaphore. You may also use built-in sync() actor to synchronise incoming messages (it uses ReentrantLock). However it is important to understand that Julia is a single threaded language and does not process things in parallel. Asynchronous does not mean parallel, so in principle if you will use built-in async() operator or AsyncScheduler() for subjects you should not experience any race conditions. Asynchronous scheduler in Rocket.jl is built upon Julia’s Task object and @async macro. It processes all incoming messages serially one by one and it is impossible to process two messages simultaneously with async scheduler from Rocket.jl. There is no official parallelism support in Rocket.jl currently, because as far as I know this functionality in Julia is still kind of experimental. Once it will be fully complete we may think of adding this functionality to Rocket.jl as well.


1 Like

Thanks Dmitry, that really answered all my questions!