Rocket.jl - why is teardown logic never called?

Here is an example

struct MySubscribable <: Subscribable{Int} end

function Rocket.on_subscribe!(subscribable::MySubscribable, actor)
    next!(actor, 1)
    next!(actor, 2)
    next!(actor, 3)
    complete!(actor)
    return MyCustomSubscription()
end

struct MyCustomSubscription <: Teardown
    # some fields here
end

Rocket.as_teardown(::Type{<:MyCustomSubscription}) = UnsubscribableTeardownLogic()

function Rocket.on_unsubscribe!(subscription::MyCustomSubscription)
    # dispose resources here
    println("Unsubscribed!")
end

observable = MySubscribable()
my_subscription = subscribe!(observable, logger())

# this will call the on_unsubscribe! method
unsubscribe!(my_subscription)

However if I multicast the observable then the teardown logic is never called

subject = Subject(Int)
observable = MySubscribable() |> multicast(subject)
my_subscription = subscribe!(observable, logger())
connect(observable)

# teardown logic is not called here - why??
unsubscribe!(my_subscription)

Is there any way to force the correct teardown logic to be called when the observable is multicast?

1 Like

This might not be the most satisfying answer, but the types for observable and my_subscription are different. In your first example:

julia> typeof(observable)
MySubscribable

julia> typeof(my_subscription)
MyCustomSubscription

In your second example:

julia> typeof(observable)
ConnectableObservable{Int64, Subject{Int64, AsapScheduler, AsapScheduler}, MySubscribable}

julia> typeof(my_subscription)
Rocket.SubjectSubscription

As for why unsubscribing from the multicasted observable doesn’t trickle up to unsubscribe for MyCustomSubscription, I don’t know. Perhaps it’s not a safe assumption they can make.