Manage asynchronous tasks

multithreading

#1

Hi all:
Question about multithreading with asynchronous tasks. There are three tasks running in parallel: main script, asynchronous1 (test server), asynchronous2 (test client). What is the best way to kill the asynchronous tasks upon InterruptException. The program currently looks as such:

function t1()
serv = listen(5000)
while true
sock = accept(serv)
@async while isopen(sock)
write(sock,readline(sock))
end
sleep(0.5)
end
end

function t2()
cli = connect(5000)
while true
println(cli,“hi”)
sleep(1)
end

tserv = @async t1()
tcli = @async t2()
try
while true
sleep(1)
end
catch ex
Base.throwto(tserv,ex)
Base.throwto(tcli,ex)
throw(ex)
end

Edit: Current status
Upon interrupting and restarting program thru REPL include("test.jl"), receive ERROR (unhandled task failure): listen: address already in use (EADDRINUSE)

Cheers


#2

Here’s an attempt to do something of the same:

"""
For the closing handshake, we won't wait indefinitely for non-responsive clients.
Returns a throwaway frame if the socket happens to be empty
"""
function readframe_nonblocking(ws)
    chnl= Channel{WebSocketFragment}(1)
    # Read, output put to Channel for type stability
    function _readinterruptable(c::Channel{WebSocketFragment})
        try
            put!(chnl, read_frame(ws))
        catch
            # Output a dummy frame that is not a control frame.
            put!(chnl, WebSocketFragment(false, false, false, false,
                                UInt8(0), false, UInt64(0),
                                Vector{UInt8}([0x0,0x0,0x0,0x0]),
                                Vector{UInt8}()))
        end
    end
    # Start reading as a task. Will not return if there is nothing to read
    rt = @schedule _readinterruptable(chnl)
    bind(chnl, rt)
    yield()
    # Define a task for throwing interrupt exception to the (possibly blocked) read task.
    # We don't start this task because it would never return
    killta = @task try;Base.throwto(rt, InterruptException());end
    # We start the killing task. When it is scheduled the second time,
    # we pass an InterruptException through the scheduler.
    try;schedule(killta, InterruptException(), error = false);end
    # We now have content on chnl, and no additional tasks.
    take!(chnl)
end

#3

can you explain this?


#4

Han-so1omon, sorry for a sloppy and unhelpful reply above. From your example, it seems you are experimenting with adapting examples from the docs. I didn’t take the time to adapt the example to your context, which I should have done.

t1() is hard to read because you drop the tabs. It never returns, but creates a new task every 0.5 seconds. I can’t see any reason for doing that. Is this what you want to do?

function t1()
    serv = listen(5000)
    sock = accept(serv)
    while isopen(sock)
        write(sock, readline(sock))
    end
    close(serv)
end

The problem I was replying to was a different one. Your time is probably much better used reading about tasks.


#5

thank you for your answers. it went over my head, as I’m not sure the difference between @schedule, @task, and @async julialang macros.

yes, mostly experimenting, but open to all ideas.

My solution is similar to your second post, but for clientside. Serverside is written in C.

I occasionally end up with ERROR (unhandled task failure): connect: connection refused (ECONNREFUSED). From this it is unclear if ECONNREFUSED causes (unhandled task failure) or the other way around. Any insight into this?

Edit: Maybe this is related to server not immediately responding to connect()