We have a server handling websocket connections that can be written to by async tasks. It turns out that a simple Socket.send(ws, msg)
is not protecting against concurrent use.
I came up with a message queue that uses Channels, but I wonder whether there’s anything already available out of the box or any other hint for a leaner setup?
My solution is the following:
const MESSAGE_QUEUE = Dict{WebSocket, Tuple{
Channel{Tuple{String, Channel{Nothing}}},
Task}
}()
function message(ws::Websocket, msg::String)
# setup a reply channel
myfuture = Channel{Nothing}(1)
# retrieve the message queue or set it up if not present
q, _ = get!(MESSAGE_QUEUE, client) do
println("Setting up websocket queue!")
queue = Channel{Tuple{String, Channel{Nothing}}}(10)
handler = @async while true
message, future = take!(queue)
try
Sockets.send(ws, message)
finally
put!(future, nothing)
end
end
queue, handler
end
put!(q, (msg, myfuture))
take!(myfuture) # Wait until the message is processed
end
# cleanup
function delete_queue!(d::Dict, client::UInt)
queue, handler = pop!(MESSAGE_QUEUE, client, (nothing, nothing))
if queue !== nothing
@async Base.throwto(handler, InterruptException())
end
end