Dear All,
I am new to web-stack programming and my naive implementation of websockets server is not working. I took chat server (chat_explore.jl) from WebSockets.jl examples here (chat_explore.html is needed to run the code below): https://github.com/JuliaWeb/WebSockets.jl/tree/master/examples
My aim here - i want to process the input from connected users and perform some simulations based on the text commands i receive from them. So, i have modified the original chat_explore.jl example to put connected users’ messages into a queue and then process it within server_loop(). I guess i did not make proper synchronization between async tasks here or something else, any suggestions from your will be greatly appreciated. Here is the code:
using WebSockets, DataStructures
import WebSockets:Response,
Request
using Dates
using Sockets
@info """
A chat server application. For each browser (tab) that connects,
an 'asyncronous function' aka 'coroutine' aka 'task' is started.
To use:
- include("chat_explore.jl") in REPL
- start a browser on the local ip address, e.g.: http://192.168.0.4:8080
- inspect global variables starting with 'LAST' while the chat is running asyncronously
"""
const CLOSEAFTER = Dates.Second(60)
const HTTPPORT = 8081
const LOCALIP = string(Sockets.getipaddr())
const USERNAMES = Dict{String, WebSocket}()
const HTMLSTRING = read(joinpath(@__DIR__, "chat_explore.html"), String)
# Since we are to access a websocket from outside
# it's own websocket handler coroutine, we need some kind of
# mutable container for storing references:
const WEBSOCKETS = Dict{WebSocket, Int}()
global LASTREQ = 0
global LASTWS = 0
global LASTMSG = 0
global LASTSERVER = 0
mutable struct MsgQ
username::String
msgstr::String
end
global Buffer = Queue{MsgQ}()
"""
Called by 'gatekeeper', this function will be running in a task while the
particular websocket is open. The argument is an open websocket.
Other instances of the function run in other tasks.
"""
function coroutine(thisws)
global lastws = thisws
push!(WEBSOCKETS, thisws => length(WEBSOCKETS) +1 )
t1 = now() + CLOSEAFTER
username = ""
while now() < t1
# This next call waits for a message to
# appear on the socket. If there is none,
# this task yields to other tasks.
data, success = readguarded(thisws)
!success && break
global LASTMSG = msg = String(data)
print("Received: $msg ")
if username == ""
username = approvedusername(msg, thisws)
if username != ""
println("from new user $username ")
!writeguarded(thisws, username) && break
println("Tell everybody about $username")
foreach(keys(WEBSOCKETS)) do ws
writeguarded(ws, username * " enters chat")
end
else
println(", username taken!")
!writeguarded(thisws, "Username taken!") && break
end
else
enqueue!(Buffer,MsgQ(username,msg))
println("from $username ")
distributemsg(username * ": " * msg, thisws)
startswith(msg, "exit") && break
end
end
exitmsg = username == "" ? "unknown" : username * " has left"
distributemsg(exitmsg, thisws)
println(exitmsg)
# No need to close the websocket. Just clean up external references:
removereferences(thisws)
nothing
end
function removereferences(ws)
haskey(WEBSOCKETS, ws) && pop!(WEBSOCKETS, ws)
for (discardname, wsref) in USERNAMES
if wsref === ws
pop!(USERNAMES, discardname)
break
end
end
nothing
end
function approvedusername(msg, ws)
!startswith(msg, "userName:") && return ""
newname = msg[length("userName:") + 1:end]
newname =="" && return ""
haskey(USERNAMES, newname) && return ""
push!(USERNAMES, newname => ws)
newname
end
function distributemsg(msgout, not_to_ws)
foreach(keys(WEBSOCKETS)) do ws
if ws !== not_to_ws
writeguarded(ws, msgout)
end
end
nothing
end
"""
`Server => gatekeeper(Request, WebSocket) => coroutine(WebSocket)`
The gatekeeper makes it a little harder to connect with
malicious code. It inspects the request that was upgraded
to a a websocket.
"""
function gatekeeper(req, ws)
global LASTREQ = req
global LASTWS = ws
orig = WebSockets.origin(req)
if occursin(LOCALIP, orig)
coroutine(ws)
else
@warn("Unauthorized websocket connection, $orig not approved by gatekeeper, expected $LOCALIP")
end
nothing
end
"Request to response. Response is the predefined HTML page with some javascript"
req2resp(req::Request) = HTMLSTRING |> Response
# ServerWS takes two functions; the first a http request handler function for page requests,
# one for opening websockets (which javascript in the HTML page will try to do)
global LASTSERVER = WebSockets.ServerWS(req2resp, gatekeeper)
# The following lines disblle detail messages from spilling into the
# REPL. Remove the it to gain insight.
using Logging
import Logging.shouldlog
function shouldlog(::ConsoleLogger, level, _module, group, id)
if _module == WebSockets.HTTP.Servers
if level == Logging.Warn || level == Logging.Info
return false
else
return true
end
else
return true
end
end
#@async begin
# println("HTTP server listening on $LOCALIP:$HTTPPORT for $CLOSEAFTER")
# sleep(CLOSEAFTER.value)
# println("Time out, closing down $HTTPPORT")
# put!(LASTSERVER.in, "I can send anything, you close")
# nothing
#end
# for inspecting in REPL or Atom / Juno - update after starting some clients.
#LASTWS
#LASTSERVER.out
#WEBSOCKETS
#take!(LASTSERVER.out)|>string
#nothing
@inline function send_to_user(username::String, msg::String)
writeguarded(USERNAMES[username], msg)
end
function server_loop()
not_shutdown = true
while(not_shutdown)
if isempty(Buffer)
continue
end
for i = 1:length(Buffer)
MsgQ_to_process = dequeue!(Buffer)
send_to_user(MsgQ_to_process.username, "your wrote:" *
MsgQ_to_process.msgstr)
end
end
end
function main()
# Start the server asyncronously, and stop it later
@async WebSockets.serve(LASTSERVER, LOCALIP, HTTPPORT)
println("HTTP server listening on $LOCALIP:$HTTPPORT for $CLOSEAFTER")
#start the server loop
server_loop()
#clean up and quit
end
#running the server
main()
If i remove my server_loop() function everything works just fine and i can extract user commands from queue Buffer.
Thank you very much in advance for any suggestions!