How to write a custom actor that also acts as an observable?

Hello,
in Rocket, I’d like to do something like this:

struct CustomActor1 <: Actor{Any}
    # Data
end

function Rocket.on_next!(a::CustomActor1, data::Any)
    #Compute using a's data
    next!() # send result downstream
end

actor1 = CustomActor1()
subscribe!(source1, actor1) 
subscribe!(source2, actor1) 

struct CustomActor2 <: Actor{Any}
end

actor2 = CustomActor2()

source3 = actor1 |> multicast()
subscribe!(source3, actor2)

Can’t wrap my head around the design.

  • proxy() seems to link Actor2 with Actor1 sources.
  • Actor1 is not a Subject as it takes different input and output.
  • Can Actor1 be both a Subscribable and an Actor given that its execution is continuous (Actor2 does not ‘own’ the execution of Actor1)
1 Like

Apologies for reviving an old thread, but I’ve been using Rocket.jl for a few weeks so far, and I had a similar need. I also didn’t know subjects were not allowed to have different types for inputs and outputs, because what I did to solve it was implement a custom Subject.

I can’t help but feel that I’m probably not using Rocket.jl correctly, but… at the same time, it’s actually working really well for my needs. I’m a little embarrassed to admit that it helped me find a solution to a data-flow organization problem that had me stumped for a couple of years.

Oh!

You’re the Lucky.jl guy. That’s the reason I’m using Rocket.jl in the first place. It was a huge revelation for me. My current experimentation takes a lot of inspiration from Lucky.jl (which is a great name btw), and I’m very thankful for your work.

Hello @g-gundam,

Thank you for your kind words. The feeling of not using Rocket.jl properly is part of the journey. I still feel that sometime.

The answer to the original question was answered on github here: Next: A Proposal to easily create Operators from Actors. · Issue #47 · ReactiveBayes/Rocket.jl though I’d love to see your implementation of a Custom Subject for inspiration.

A subject can have a Union as a type so you can have multiple input and output types. You can also have them take an abstract type and hence also have children type supported (hence multiple types).

Always happy to share feedbacks. Thank you for checking out Lucky.jl, I haven’t added much lately but it’s not anywhere near stalled. Of course, contributions are welcome. :slight_smile:

1 Like

It’s probably garbage! (but it’s serving it’s purpose). Here’s a small (albeit unfinished) one. Its job is to translate async messages from the exchange (usually a fill notification) and translate them to a more generic fill notification that isn’t exchange specific. This gets looped back in to a StrategySubject that subscribes to this ExchangeFillSubject. (I wanted strategy-related code to be exchange agnostic.)

struct ExchangeFillSubject <: AbstractSubject{Any}
    subscribers::Vector
end

function Rocket.on_subscribe!(subject::ExchangeFillSubject, actor)
    push!(subject.subscribers, actor)
    return voidTeardown
end

function Rocket.on_complete!(subject::ExchangeFillSubject)
    @info :complete subject
end

function Rocket.on_next!(subject::ExchangeFillSubject, response::XO.AbstractResponse)
    for sub in subject.subscribers
        next!(sub, ExchangeFill())
        # XXX: There needs to be more information in ExchangeFill instances.
        # - timestamp (with exchange time)
        # - price (that the fill happeend at)
        # - amount (that was filled)
    end
end

From the Rocket.jl docs, I saw two sentences:

Every Subject is an Observable.

and

Every Subject is an Actor itself.

That sounded like what I needed, so I tried to work with it.

1 Like