How can I return a live websocket connection in HTTP.jl?

I have a live streaming connection opened via HTTP.jl’s Websocket with this snippet

using HTTP
using JSON3

HTTP.WebSockets.open("wss://ws.coincap.io/trades/binance") do ws
    while !eof(ws)
        println(readavailable(ws) |> JSON3.read)
    end
end

My goal is to make a function that returns the live streaming connection in ws such that this connection can be used by other processes in my codebase.

Here’s my current non-working attempt to break this operation with a function:

function open_socket()
    HTTP.WebSockets.open("wss://ws.coincap.io/trades/binance") do ws
        return ws
    end
end

con = open_socket()


while !eof(con)
    println(readavailable(con) |> JSON3.read)
end

Thanks in advance!

Just guessing…

1 Like

I tried that and got this error:

function open_socket()
    return HTTP.WebSockets.open("wss://ws.coincap.io/trades/binance")
end

con = open_socket()
julia> con = open_socket()
ERROR: MethodError: no method matching open(::String)
You may have intended to import Base.open
Closest candidates are:
  open(::Function, ::Any; binary, verbose, headers, kw...) at /Users/../.julia/packages/HTTP/D0FSE/src/WebSockets.jl:92
Stacktrace:
 [1] open_socket()
   @ Main ./REPL[14]:2
 [2] top-level scope
   @ REPL[15]:1

Why not just pass ws to your function directly in the do block? Or put it into a Channel, from which you pull on another task, if you really want to move sockets around like that.

You don’t have to use do either - you can pass your favorite processing function to open directly as well:

my_ws_processing_func(ws) = ...

HTTP.open(my_ws_processing_func, "wss://ws.coincap.io/trades/binance")

do after all is just fancy syntax for passing an anonymous function to the caller as its first argument.

2 Likes

The primary issue is that I need that ws connection returned to be used in some external function. I am using Rocket.jl to process this stream

Do you have an example of what you would do if you were able to get at that directly? Some pseudocode maybe?

1 Like

Sure. Something like this:

using Rocket
using HTTP
using JSON3


struct WebSocketObservable <: ScheduledSubscribable{Any}
    # Any field here
    api_key::AbstractString
    tickers::AbstractString
end


struct WebsocketSubscription <: Teardown
    #.. some fields ...
    live_socket::Any
    tickers::AbstractString
end


struct MyActor <: Rocket.Actor{Any} end

Rocket.getscheduler(::WebSocketObservable) = AsyncScheduler()


function Rocket.on_subscribe!(source::WebSocketObservable, actor, scheduler)

    @async begin
        try
            HTTP.WebSockets.open("wss://socket.polygon.io/crypto") do ws
                while isopen(ws)
                    # Authenticate & Subscribe to ticker stream
                    write(ws, JSON3.read("""{"action":"auth", "params":$(source.api_key)}"""))
                    write(ws, JSON3.read("""{"action":"subscribe", "params":$(source.tickers)}"""))

                    if !eof(ws)
                        next!(actor, readavailable(ws) |> JSON3.read, scheduler)
                    else
                        complete!(actor, scheduler)
                    end
                end
            end
        catch e
            if !e.is_a(EOFError)
                error!(actor, e, scheduler)
            end
        end
    #return WebsocketSubscription(ws, source.tickers)
    end

    return WebsocketSubscription(ws, source.tickers)  # can't get access to ws in the above
end


function Rocket.on_unsubscribe!(subscription::WebsocketSubscription)
    # stop listening for web socket here
    write(subscription.live_socket, JSON3.read("""{"action":"unsubscribe", "params":$(subscription.tickers)}"""))

    # usually unsubscription returns nothing
    return nothing
end


obsv = WebSocketObservable("API_KEY", "XT.*")
subscription = subscribe!(obsv, logger())
unsubscribe!(subscription)

As you can see, I need access to ws to be able to unsubscribe from the stream.

Since HTTP.open closes the web socket for you when the passed function exits, I’d use a different mechanism for communicating that the socket should be closed rather than closing the socket manually. E.g. pass a channel to the function handling the websocket, put! a message that you want to exit in it and return based on that. The idea is to not pass the socket around and have a bunch of places reading/writing from/to it, but only one instead. That makes your code easier to maintain, since you don’t have to hunt down erronous writes (and safer as well, since you don’t have multiple tasks potentially writing to the socket at the same time…).

You’ll also have to handle the socket closing prematurely, e.g. if the server doesn’t respond anymore and communicate that to Rocket.jl as well.

2 Likes

Never used websockets or channels before. Is it possible to have a practical example?

Something like this:

struct WebSocketObservable <: ScheduledSubscribable{Any}
    # Any field here
    api_key::AbstractString
    tickers::AbstractString
    # utility
    comm::Channel
end

struct WebsocketSubscription <: Teardown
    #.. some fields ...
    live_socket::Any
    tickers::AbstractString

    # utility
    comm::Channel
end

function handle_socket(c::Channel)
    try
        HTTP.WebSockets.open(...) do ws
            while isopen(ws) && isopen(c)
                # do regular socket processing
            end

           if !isopen(ws) # something failed
               # [...]
           end
           if !isopen(c) # we closed down regularly, have to check whether the socket is open!
               if isready(c) # we have data available
                  ret = take!(c)
                  # handle close, e.g. by doing this
                  write(ws, JSON3.read("""{"action":"unsubscribe", "params":$(subscription.tickers)}"""))

               else
                   # closed channel but no message? => Error?
               end
           end
        end
    catch
      # do catching stuff
    end
end

function Rocket.on_subscribe!(source::WebSocketObservable, actor, scheduler)
    chan = source.comm
    # untested, may have to interpolate `chan` here via `$chan`. Check `?@async` for more info.
    @async handle_socket(chan)
    return WebsocketSubscription(chan, source.tickers)
end

function Rocket.on_unsubscribe!(subscription::WebsocketSubscription)
    # stop listening for web socket here
    put!(subscription.comm, "we're done here")
    close(subscription.comm)

    # usually unsubscription returns nothing
    return nothing
end

You may have to interpolate chan into the @async expression via $chan - check ?@async for more information.

You could also use the Channel directly for passing messages that should be written to the websocket (and close the socket if it’s a closing message). One way would be to have a Channel that holds Tuple{Bool, String}, the first being a boolean indicating whether to terminate the socket (usually false) and the second being the message to be sent. That would move the decision making logic of “what to send” out of the “how to send” logic, increasing seperation between these two concepts.

2 Likes

Thanks for your suggestions. I tried that and was unsuccessful. Here’s the attempt:

using Rocket
using HTTP
using JSON3


struct WebSocketObservable <: ScheduledSubscribable{Any}
    # Any field here
    api_key::AbstractString
    tickers::AbstractString
    # utility
    comm::AbstractChannel
end


struct WebsocketSubscription <: Teardown
    # utility
    comm::AbstractChannel
    #.. some fields ...
    tickers::AbstractString
end


struct MyActor <: Rocket.Actor{Any} end

Rocket.getscheduler(::WebSocketObservable) = AsyncScheduler()


function handle_socket(source::WebSocketObservable, c::Channel, actor, scheduler)
    try
        HTTP.WebSockets.open("wss://socket.polygon.io/crypto") do ws

            while isopen(ws) && isopen(c)
                # Authenticate & Subscribe to ticker stream
                write(ws, JSON3.read("""{"action":"auth", "params":$(source.api_key)}"""))
                write(ws, JSON3.read("""{"action":"subscribe", "params":$(source.tickers)}"""))

                if !eof(ws)
                    next!(actor, readavailable(ws) |> JSON3.read, scheduler)
                else
                    complete!(actor, scheduler)
                end
            end

        #    if !isopen(ws) # something failed
        #        # [...]
        #    end

           if !isopen(c) # we closed down regularly, have to check whether the socket is open!
               if isready(c) # we have data available
                  ret = take!(c)
                  # handle close, e.g. by doing this
                  write(ws, JSON3.read("""{"action":"unsubscribe", "params":$(source.tickers)}"""))

               else
                   # closed channel but no message? => Error?
                   nothing  # not sure what to do here
               end
           end

        end

    catch e
        if !e.is_a(EOFError)
            error!(actor, e, scheduler)
        end
    end
end


function Rocket.on_subscribe!(source::WebSocketObservable, actor, scheduler)
    chan = source.comm
    # untested, may have to interpolate `chan` here via `$chan`. Check `?@async` for more info.
    @async handle_socket(source.tickers, chan, actor, scheduler)  # tried both `$chan` and `chan`
    return WebsocketSubscription(chan, source.tickers)
end


function Rocket.on_unsubscribe!(subscription::WebsocketSubscription)
    # stop listening for web socket here
    put!(subscription.comm, "we're done here")
    close(subscription.comm)

    # usually unsubscription returns nothing
    return nothing
end


obsv = WebSocketObservable("API_KEY", "XT.*", Channel())
subscription = subscribe!(obsv, logger())  # logger() actor just logs any available data in the subscription
unsubscribe!(subscription)

What do you mean by that? What did you expect to happen, what did happen? My solution was not meant as drop-in replacement for your code, I just formulated what I thought of in some form that may reasonably run, or failing that, convey what I meant with the approach.

Have you interpolated the other arguments as well?

Tried it with all args and it was the same no output after subscription scenario

If a connection is successful, usually some data start to stream through. I really like your approach tho. Looks cleaner.

Could be that you’re catching an error and just ignoring it - maybe try printing all catched errors?

1 Like

Ok let’s try this idea on a public web socket connection:

using Rocket
using HTTP
using JSON3


struct WebSocketObservable <: ScheduledSubscribable{Any}
    # Any field here

    # utility
    comm::AbstractChannel
end


struct WebsocketSubscription <: Teardown
    # utility
    comm::AbstractChannel
end


Rocket.getscheduler(::WebSocketObservable) = AsyncScheduler()


function handle_socket(source::WebSocketObservable, c::Channel, actor, scheduler)
    try
        HTTP.WebSockets.open("wss://ws.coincap.io/trades/binance") do ws

            while isopen(ws) && isopen(c)

                if !eof(ws)
                    next!(actor, readavailable(ws) |> JSON3.read, scheduler)
                else
                    complete!(actor, scheduler)
                end
            end

        #    if !isopen(ws) # something failed
        #        # [...]
        #    end

        if !isopen(c) # we closed down regularly, have to check whether the socket is open!
            if isready(c) # we have data available
                ret = take!(c)
            else
                # closed channel but no message? => Error?
                println(c)
                nothing  # not sure what to do here
            end
        end

        end

    catch e
        println(e)
        if !e.is_a(EOFError)
            error!(actor, e, scheduler)
        end
    end
end


function Rocket.on_subscribe!(source::WebSocketObservable, actor, scheduler)
    chan = source.comm
    # untested, may have to interpolate `chan` here via `$chan`. Check `?@async` for more info.
    @async handle_socket(source.tickers, $chan, $actor, $scheduler)  # tried both `$chan` and `chan`
    return WebsocketSubscription(chan)
end


function Rocket.on_unsubscribe!(subscription::WebsocketSubscription)
    # stop listening for web socket here
    put!(subscription.comm, "we're done here")
    close(subscription.comm)

    # usually unsubscription returns nothing
    return nothing
end


obsv = WebSocketObservable(Channel())
# logger() actor just logs any available data in the subscription
subscription = subscribe!(obsv, logger())
unsubscribe!(subscription)

Not 100% sure what you mean, as I’m not running your code and you’re not telling me what (if any) error you’re getting, but I suspect you’ll want to interpolate source.tickers as well. Have you checked with the docstring of @async, as my comment suggested?

According to the docs, it needs to be interpolated (which I did). Tried to print the errors but nothing prints. No errors. Subscription is initialised and that’s it nothing happens.

Maybe too late for the purpose of this post, but today I’d like to read something like this:

using HTTP: WebSockets, send
using JuliaTrader: startWSStreamer, streamMessage

function open_socket(url)
    conn = false
    @async WebSockets.open(url) do ws
        conn = Ref(ws)
        while !WebSockets.isclosed(conn[])
            sleep(1) # Keep the connection open
        end
    end
    while !isa(conn[], WebSockets.WebSocket)
        sleep(0.01)
    end
    return conn
end

# A WS Server that streams messages to all connected clients
startWSStreamer("0.0.0.0", 54321)

conn = open_socket("ws://0.0.0.0:54321")
@async begin
    for msg in conn[]
        println("Client received: $msg")
    end
end

streamMessage("hello from server")
send(conn[], "hello from client")
julia> streamMessage("hello from server")
Client received: hello from server
julia> HTTP.send(conn[], "hello from client")
Server received: hello from client