@async calls to WebSockets read and write methods

I’ve put together a simple coroutine that makes use of the WebSockets.jl package, but I’m not sure if WebSockets.jl (or the underlying HTTP.jl) is designed to be able to handle this sort of asynchronous behavior:

using WebSockets, Test

inbox = Channel{String}(10)
outbox = Channel{String}(10)

ws_task = @async WebSockets.open("wss://echo.websocket.org") do ws
    @sync begin
        inbox_task = @async while !eof(ws)
            put!(inbox, String(read(ws)))
        end
        outbox_task = @async while isopen(ws)
            write(ws, take!(outbox))
        end
    end
end

put!(outbox, "Hello")
put!(outbox, "World!")

@test take!(inbox) == "Hello"
@test take!(inbox) == "World!"

The tests above pass, but I’m still worried that simultaneous attempts to read(ws) and write(ws, "...") could interfere with one-another. If one of the read or write tasks is blocked, is it possible that performing the other task at the same time could leave the WebSocket in an inconsistent state?

Since you only have one writer and one reader, it’s probably fine. Those write and read calls are themselves blocking their respective tasks until completion, which is enough to ensure FIFO in each direction. If you add more writers or readers, then I might worry about out-of-order operations occurring.

I see. Thanks for the insight!

jpsamaroo has it right.

What you put together there is an elegant and useful example which we should put in the ‘examples’ folder.

If you’re planning to use this in a relay server, I’d suggest these modifications to get fewer restarts and some traceability:

@async WebSockets.open ...> @async WebSockets.with_logger(WebSocketLogger()) do
write -> writeguarded
read -> readguarded
@wslog currentmessage
1 Like

Hi @hustf, thanks for the suggestions! Here’s an updated example

inbox = Channel{String}(10)
outbox = Channel{String}(10)

ws_task = @async WebSockets.with_logger(WebSocketLogger()) do
    WebSockets.open("wss://echo.websocket.org") do ws
        @sync begin
            inbox_task = @async try
                while !eof(ws)
                    in_data, success = readguarded(ws)
                    success || break
                    in_msg = String(in_data)
                    @wslog in_msg
                    put!(inbox, in_msg)
                end
            finally
                close(outbox)
            end
            outbox_task = @async try
                for outmsg in outbox
                    isopen(ws) && writeguarded(ws, outmsg) || break
                end
            finally
                close(ws)
            end
        end
    end
end

put!(outbox, "Hello")
put!(outbox, "World!")

@test take!(inbox) == "Hello"
@test take!(inbox) == "World!"

close(outbox) # close(outbox) causes outbox_task to call close(ws)
wait(ws_task)

@test istaskdone(ws_task)
@test !Base.istaskfailed(ws_task)

Interesting. We actually do get some workqueue inconsistency here, which I hope to learn more from. I made a pull request, with some debug logging.

The flow during closing would be

  1. close outbox channel
  2. enter outbox_task finally
  3. initiate a closing handshake with the external server. outbox_task enters a state of waiting.

When the server responds, two things can happen first:
4a) readguarded exits with success = false
or
4b) close(ws) is finished, outbox_task is done

So I think you mean

    finally
        close(inbox)
    end

I believe that small correction is unrelated to the workqueue inconsitency. Here is the output from the code as slightly modified in the pull request:

[ Wslog 11:15:58.894: Hello
take!(inbox) = [ Wslog 11:15:59.002: World!
"Hello"
take!(inbox) = "World!"
┌ Debug: Closing
│   ws = WebSocket{SSLContext}(client, CONNECTED): MbedTLS.SSLContext(Ptr{Nothing} @0x000000002af00750, ReentrantLock(nothing, Base.GenericCondition{Base.Threads.SpinLock}(Base.InvasiveLinkedList{Task}(nothing, nothing), Base.Threads.SpinLock(Base.Threads.Atomic{Int64}(0))), 0), ReentrantLock(Task (runnable) @0x00000000109388b0, Base.GenericCondition{Base.Threads.SpinLock}(Base.InvasiveLinkedList{Task}(nothing, nothing), Base.Threads.SpinLock(Base.Threads.Atomic{Int64}(0))), 1), MbedTLS.SSLConfig(), true, 0, false, nothing, Sockets.TCPSocket(Base.Libc.WindowsRawSocket(0x000000000000034c) active, 0 bytes waiting))
└ 11:15:59.315 @ Main C:\Users\F\.julia\dev\WebSockets\examples\inbox_outbox.jl:28

WARNING: Workqueue inconsistency detected: popfirst!(Workqueue).state != :runnable
┌ Debug: WebSocketClosedError("ws|client respond to OPCODE_CLOSE  No reason ")
└ 11:15:59.803 @ WebSockets C:\Users\F\.julia\dev\WebSockets\src\WebSockets.jl:721
┌ Debug: WebSocket{SSLContext}(client, CLOSED)
│    Now closing inbox. = " Now closing inbox."
└ 11:15:59.965 @ Main C:\Users\F\.julia\dev\WebSockets\examples\inbox_outbox.jl:20
istaskdone(ws_task) = true
Base.istaskfailed(ws_task) = false

Yes, I’ve been getting the same Workqueue inconsistency during testing on my machine.

The reason I originally included

finally
   close(outbox)
end

was so that, if the websocket is closed by the websocket.org server, the close(outbox) call on exit from inbox_task would trigger the
for outmsg in outbox loop to exit. This way the outbox_task will not wait indefinitely on an empty outbox channel even after the ws has been closed, and the @sync block in ws_task will be able to finish.

1 Like