I have a live streaming connection opened via HTTP.jl’s Websocket with this snippet
using HTTP
using JSON3
HTTP.WebSockets.open("wss://ws.coincap.io/trades/binance") do ws
while !eof(ws)
println(readavailable(ws) |> JSON3.read)
end
end
My goal is to make a function that returns the live streaming connection in ws such that this connection can be used by other processes in my codebase.
Here’s my current non-working attempt to break this operation with a function:
function open_socket()
HTTP.WebSockets.open("wss://ws.coincap.io/trades/binance") do ws
return ws
end
end
con = open_socket()
while !eof(con)
println(readavailable(con) |> JSON3.read)
end
function open_socket()
return HTTP.WebSockets.open("wss://ws.coincap.io/trades/binance")
end
con = open_socket()
julia> con = open_socket()
ERROR: MethodError: no method matching open(::String)
You may have intended to import Base.open
Closest candidates are:
open(::Function, ::Any; binary, verbose, headers, kw...) at /Users/../.julia/packages/HTTP/D0FSE/src/WebSockets.jl:92
Stacktrace:
[1] open_socket()
@ Main ./REPL[14]:2
[2] top-level scope
@ REPL[15]:1
Why not just pass ws to your function directly in the do block? Or put it into a Channel, from which you pull on another task, if you really want to move sockets around like that.
You don’t have to use do either - you can pass your favorite processing function to open directly as well:
using Rocket
using HTTP
using JSON3
struct WebSocketObservable <: ScheduledSubscribable{Any}
# Any field here
api_key::AbstractString
tickers::AbstractString
end
struct WebsocketSubscription <: Teardown
#.. some fields ...
live_socket::Any
tickers::AbstractString
end
struct MyActor <: Rocket.Actor{Any} end
Rocket.getscheduler(::WebSocketObservable) = AsyncScheduler()
function Rocket.on_subscribe!(source::WebSocketObservable, actor, scheduler)
@async begin
try
HTTP.WebSockets.open("wss://socket.polygon.io/crypto") do ws
while isopen(ws)
# Authenticate & Subscribe to ticker stream
write(ws, JSON3.read("""{"action":"auth", "params":$(source.api_key)}"""))
write(ws, JSON3.read("""{"action":"subscribe", "params":$(source.tickers)}"""))
if !eof(ws)
next!(actor, readavailable(ws) |> JSON3.read, scheduler)
else
complete!(actor, scheduler)
end
end
end
catch e
if !e.is_a(EOFError)
error!(actor, e, scheduler)
end
end
#return WebsocketSubscription(ws, source.tickers)
end
return WebsocketSubscription(ws, source.tickers) # can't get access to ws in the above
end
function Rocket.on_unsubscribe!(subscription::WebsocketSubscription)
# stop listening for web socket here
write(subscription.live_socket, JSON3.read("""{"action":"unsubscribe", "params":$(subscription.tickers)}"""))
# usually unsubscription returns nothing
return nothing
end
obsv = WebSocketObservable("API_KEY", "XT.*")
subscription = subscribe!(obsv, logger())
unsubscribe!(subscription)
As you can see, I need access to ws to be able to unsubscribe from the stream.
Since HTTP.open closes the web socket for you when the passed function exits, I’d use a different mechanism for communicating that the socket should be closed rather than closing the socket manually. E.g. pass a channel to the function handling the websocket, put! a message that you want to exit in it and return based on that. The idea is to not pass the socket around and have a bunch of places reading/writing from/to it, but only one instead. That makes your code easier to maintain, since you don’t have to hunt down erronous writes (and safer as well, since you don’t have multiple tasks potentially writing to the socket at the same time…).
You’ll also have to handle the socket closing prematurely, e.g. if the server doesn’t respond anymore and communicate that to Rocket.jl as well.
struct WebSocketObservable <: ScheduledSubscribable{Any}
# Any field here
api_key::AbstractString
tickers::AbstractString
# utility
comm::Channel
end
struct WebsocketSubscription <: Teardown
#.. some fields ...
live_socket::Any
tickers::AbstractString
# utility
comm::Channel
end
function handle_socket(c::Channel)
try
HTTP.WebSockets.open(...) do ws
while isopen(ws) && isopen(c)
# do regular socket processing
end
if !isopen(ws) # something failed
# [...]
end
if !isopen(c) # we closed down regularly, have to check whether the socket is open!
if isready(c) # we have data available
ret = take!(c)
# handle close, e.g. by doing this
write(ws, JSON3.read("""{"action":"unsubscribe", "params":$(subscription.tickers)}"""))
else
# closed channel but no message? => Error?
end
end
end
catch
# do catching stuff
end
end
function Rocket.on_subscribe!(source::WebSocketObservable, actor, scheduler)
chan = source.comm
# untested, may have to interpolate `chan` here via `$chan`. Check `?@async` for more info.
@async handle_socket(chan)
return WebsocketSubscription(chan, source.tickers)
end
function Rocket.on_unsubscribe!(subscription::WebsocketSubscription)
# stop listening for web socket here
put!(subscription.comm, "we're done here")
close(subscription.comm)
# usually unsubscription returns nothing
return nothing
end
You may have to interpolate chan into the @async expression via $chan - check ?@async for more information.
You could also use the Channel directly for passing messages that should be written to the websocket (and close the socket if it’s a closing message). One way would be to have a Channel that holds Tuple{Bool, String}, the first being a boolean indicating whether to terminate the socket (usually false) and the second being the message to be sent. That would move the decision making logic of “what to send” out of the “how to send” logic, increasing seperation between these two concepts.
Thanks for your suggestions. I tried that and was unsuccessful. Here’s the attempt:
using Rocket
using HTTP
using JSON3
struct WebSocketObservable <: ScheduledSubscribable{Any}
# Any field here
api_key::AbstractString
tickers::AbstractString
# utility
comm::AbstractChannel
end
struct WebsocketSubscription <: Teardown
# utility
comm::AbstractChannel
#.. some fields ...
tickers::AbstractString
end
struct MyActor <: Rocket.Actor{Any} end
Rocket.getscheduler(::WebSocketObservable) = AsyncScheduler()
function handle_socket(source::WebSocketObservable, c::Channel, actor, scheduler)
try
HTTP.WebSockets.open("wss://socket.polygon.io/crypto") do ws
while isopen(ws) && isopen(c)
# Authenticate & Subscribe to ticker stream
write(ws, JSON3.read("""{"action":"auth", "params":$(source.api_key)}"""))
write(ws, JSON3.read("""{"action":"subscribe", "params":$(source.tickers)}"""))
if !eof(ws)
next!(actor, readavailable(ws) |> JSON3.read, scheduler)
else
complete!(actor, scheduler)
end
end
# if !isopen(ws) # something failed
# # [...]
# end
if !isopen(c) # we closed down regularly, have to check whether the socket is open!
if isready(c) # we have data available
ret = take!(c)
# handle close, e.g. by doing this
write(ws, JSON3.read("""{"action":"unsubscribe", "params":$(source.tickers)}"""))
else
# closed channel but no message? => Error?
nothing # not sure what to do here
end
end
end
catch e
if !e.is_a(EOFError)
error!(actor, e, scheduler)
end
end
end
function Rocket.on_subscribe!(source::WebSocketObservable, actor, scheduler)
chan = source.comm
# untested, may have to interpolate `chan` here via `$chan`. Check `?@async` for more info.
@async handle_socket(source.tickers, chan, actor, scheduler) # tried both `$chan` and `chan`
return WebsocketSubscription(chan, source.tickers)
end
function Rocket.on_unsubscribe!(subscription::WebsocketSubscription)
# stop listening for web socket here
put!(subscription.comm, "we're done here")
close(subscription.comm)
# usually unsubscription returns nothing
return nothing
end
obsv = WebSocketObservable("API_KEY", "XT.*", Channel())
subscription = subscribe!(obsv, logger()) # logger() actor just logs any available data in the subscription
unsubscribe!(subscription)
What do you mean by that? What did you expect to happen, what did happen? My solution was not meant as drop-in replacement for your code, I just formulated what I thought of in some form that may reasonably run, or failing that, convey what I meant with the approach.
Have you interpolated the other arguments as well?
Ok let’s try this idea on a public web socket connection:
using Rocket
using HTTP
using JSON3
struct WebSocketObservable <: ScheduledSubscribable{Any}
# Any field here
# utility
comm::AbstractChannel
end
struct WebsocketSubscription <: Teardown
# utility
comm::AbstractChannel
end
Rocket.getscheduler(::WebSocketObservable) = AsyncScheduler()
function handle_socket(source::WebSocketObservable, c::Channel, actor, scheduler)
try
HTTP.WebSockets.open("wss://ws.coincap.io/trades/binance") do ws
while isopen(ws) && isopen(c)
if !eof(ws)
next!(actor, readavailable(ws) |> JSON3.read, scheduler)
else
complete!(actor, scheduler)
end
end
# if !isopen(ws) # something failed
# # [...]
# end
if !isopen(c) # we closed down regularly, have to check whether the socket is open!
if isready(c) # we have data available
ret = take!(c)
else
# closed channel but no message? => Error?
println(c)
nothing # not sure what to do here
end
end
end
catch e
println(e)
if !e.is_a(EOFError)
error!(actor, e, scheduler)
end
end
end
function Rocket.on_subscribe!(source::WebSocketObservable, actor, scheduler)
chan = source.comm
# untested, may have to interpolate `chan` here via `$chan`. Check `?@async` for more info.
@async handle_socket(source.tickers, $chan, $actor, $scheduler) # tried both `$chan` and `chan`
return WebsocketSubscription(chan)
end
function Rocket.on_unsubscribe!(subscription::WebsocketSubscription)
# stop listening for web socket here
put!(subscription.comm, "we're done here")
close(subscription.comm)
# usually unsubscription returns nothing
return nothing
end
obsv = WebSocketObservable(Channel())
# logger() actor just logs any available data in the subscription
subscription = subscribe!(obsv, logger())
unsubscribe!(subscription)
Not 100% sure what you mean, as I’m not running your code and you’re not telling me what (if any) error you’re getting, but I suspect you’ll want to interpolate source.tickers as well. Have you checked with the docstring of @async, as my comment suggested?
According to the docs, it needs to be interpolated (which I did). Tried to print the errors but nothing prints. No errors. Subscription is initialised and that’s it nothing happens.
Maybe too late for the purpose of this post, but today I’d like to read something like this:
using HTTP: WebSockets, send
using JuliaTrader: startWSStreamer, streamMessage
function open_socket(url)
conn = false
@async WebSockets.open(url) do ws
conn = Ref(ws)
while !WebSockets.isclosed(conn[])
sleep(1) # Keep the connection open
end
end
while !isa(conn[], WebSockets.WebSocket)
sleep(0.01)
end
return conn
end
# A WS Server that streams messages to all connected clients
startWSStreamer("0.0.0.0", 54321)
conn = open_socket("ws://0.0.0.0:54321")
@async begin
for msg in conn[]
println("Client received: $msg")
end
end
streamMessage("hello from server")
send(conn[], "hello from client")
julia> streamMessage("hello from server")
Client received: hello from server
julia> HTTP.send(conn[], "hello from client")
Server received: hello from client