Best practices for managing websocket/http server lifecycle

Hello folks!

I’m experimenting with HTTP.jl and I’m trying to put in a place a watchdog task that would regularly check on the server, and spin up a new one if it goes down.

The issue I’m encountering is that I get this error: “Base.IOError(“listen: address already in use (EADDRINUSE)”, -48)” , despite any server being terminated - I’m saving their tasks in an array and can inspect their task status.

The only way to make it work is to restart the Julia process (I’m testing this in Pluto btw).

This is my implementation:

Watchdog function:

function start_ws_watchdog()

## on first run
	
	# terminate any running watchdog(s) - by InterruptException
	terminate_active_watchdog!()

	# terminate any active websockets
	terminate_active_ws!()
	
	sleep(1)

	# start new watchdog
	watchdog_task = @spawn try 

			#ws_array |> empty!

			while true
				try
				
					# start websocket server if there's none or they're all done/closed
					if isempty(ws_array) || all(ws -> istaskdone(ws.task), ws_array) || all(ws -> !isopen(ws.listener.server), ws_array)
						start_ws_server()				
					end

				catch e
					
					if isa(e, Base.IOError) && occursin("EADDRINUSE", e.msg)
						# port already in use, re-start was too early
						@warn "WARNING - ws start, port already in use $e"
						sleep(1)	
						terminate_active_ws!()
					else
						rethrow(e)
					end
				end
				
				sleep(5) 
			end
				
		catch e
			if isa(e, InterruptException)
	            @warn "LOG - Websocket watchdog terminated"
	        else
				@error "ERROR - ws watchdog, $e"
				rethrow(e)
	        end
		end

	@info "LOG - New watchdog started: $watchdog_task"
    
    # save watchdog task reference
    push!(ws_watchdog, watchdog_task)

end

WS server:

function start_ws_server()


	# start new server (it spawns its on task)
    ws_server = WebSockets.listen!(ws_ip, ws_port; verbose = true) do ws

		_info = "LOG - New Websocket server started: $(ws_server.task)"
		@info _info

		for msg in ws

			@spawn try

				# save received messages as-is
				lock(msg_lock)
					push!(ws_msg_log_raw, msg)
				unlock(msg_lock)

				parsed_msg = JSON3.read(msg, Dict) |> dict_keys_to_sym
				
				# save parsed message
				setindex!(parsed_msg, "received", :type)
				lock(msg_lock)
					push!(ws_msg_log, parsed_msg)
				unlock(msg_lock)

				# pass msg on
				msg_handler(ws, parsed_msg)
			
			catch e
				@error "ERROR - Message handler error, $e"
			end
		end
		
    end

	# saves server handler & task for reference
    push!(ws_array, ws_server)
	
end

And these are the killer functions:

function terminate_active_ws!()

	for ws in filter(ws -> !istaskdone(ws.task) || !isempty(ws.connections) || isopen(ws.listener.server), ws_array)
		HTTP.forceclose(ws)
		_num_conn = ws.connections |> length
		@warn "WARNING - Terminating ws server: $(ws.task) - $_num_conn connections"
	end
end
function terminate_active_watchdog!()

	for task in ws_watchdog
		if !istaskdone(task)
   			schedule(task, InterruptException(), error=true)
			@warn "WARNING - Terminating ws watchdog: $task"
		end
	end
end

I don’t understand what I’m doing wrong.
It would work fine for a while (<1hr?), but then get unresponsive - even if server still has a running task and it’s open, and finally give this error if I try to trigger server restart through the watchdog.

Anyone encountered similar issues?

1 Like

Just out of curiosity, is your main intention to eventually use this code outside of Pluto? …or are you trying to setup a websocket server primarily for use inside Pluto?

I’m mainly asking, because trying to do this inside Pluto can complicate the situation due to how Pluto wants to control order-of-execution. (Playing with long-lived tasks in Pluto is tricky in general.) Depending on how you answer the question above, the solution may change a bit.

I’ve been doing a bit of websocket work myself lately, and I feel like I could come up with a solution in a non-Pluto context, but inside Pluto would be challenging.

1 Like

Oh yeah, this is meant to live as a script inside Docker - I use Pluto out of convenience for development and quick inspection.

Wasn’t thinking that Pluto could be messing with it, thanks for the heads-up!
I’ll test it more extensively out of it then :thinking:

1 Like

I turned your code into a gist that people can clone and try in the REPL. I also put some instructions at the bottom in a comment.

So far, so good. I’ll let you know if I get any EADDRINUSE issues. If I do, that would imply to me that the websocket server that was supposed to be killed didn’t die. We’ll see if it happens.

2 Likes

Hi, thanks for the gist. I am wondering, what do you think about Visor.jl? Would it be applicable in this case?

1 Like

Visor.jl looks like something worth looking into. I think it could be used here.

1 Like

To be honest, I have used it before in a similar scenario; however, for some reason, I lost all my previous code and notes on this topic. I guess the question is: could it be more desirable than the currently proposed solution?

1 Like

So, I kept investigating the issue, and this is my understanding so far.
Some context first:

  • the code above is part of a personal project
  • the call to msg_handler function is part of a chain of ~dozen functions, with each using setters/getters on a global dictionary
  • I’m using re-entrant locks with the try/catch/finally pattern

What happened:

  • one of the msg_handlers task didn’t terminate correctly, and kept holding the lock
  • this blocked any following msg_handler tasks, so the whole service could only receive but never respond

Now I’m trying to nail down in which function downstream this happens and how.

As for Visor: thanks for sharing, I didn’t know about it!

(@g-gundam thanks for the gist btw)

2 Likes

You are welcome; all credit goes to @attdona, who wrote the package, and to @g-gundam for facilitating this discussion. By the way, to be honest, I have to admit that the Visor documentation related to websockets could be a bit more explicit. Don’t you think so?

The Visor package provides tools for designing a task supervision tree. However, it is not tied to or directly related to the WebSocket protocol.

For tasks more relevant to WebSockets, you might want to explore the Rembus.jl package. Rembus builds on Visor to implement middleware for Remote Procedure Calls (RPC) and Publish/Subscribe (Pub/Sub) communications. While I don’t know if Rembus suits your specific case, it can serve as a “reference implementation” of a supervised WebSocket server.

To dive deeper, a good starting point with Rembus would be the serve_ws task, which implements the WebSocket server. Its implementation is available here:

With a basic understanding of the Visor API, I hope this helps illustrate the potential of Visor (and also of Rembus) and its use cases.

3 Likes

Ah, so it was Rembus. I lost all my notes related to this project. Thank you very much for providing this additional information.

I think Stefan’s advice at the very end of his post is good, because it eliminates locks altogether.

@attdona appreciate you elaborating and suggesting.

@g-gundam yep, I’m leaning towards it!

2 Likes