Interrupting wait(Threads.Event())

That’s very similar to what I had in my original post except that you use a dummy_channel instead of my dummy_event. Correspondingly, your proposal leads to the same problems as mine. However, I came to realise that I never explicitly mentioned one of these problems, so before we continue the discussion I should probably first fix that.

Going back to the solution described in the original post, avoiding the resource leak is in principle straightforward: all I have to do is to “close” the original event that I’m waiting for. Unfortunately, events don’t have an API analogous to close(), so for the purpose of this argument I’ll just notify it instead.

# Actual event I want to wait for
event = Threads.Event() 

dummy_event = Threads.Event() # Helper event
@async begin
    wait(event)
    notify(dummy_event)
end
Timer(t->notify(dummy_event), 1.0)
wait(dummy_event)

# This should be `close(event)`. The main point here 
# is to kill off the helper task somehow. 
notify(event) 

Of course, the problem with this solution is that we mess with the original event variable in a way that will likely break other code using it. Such other code could be made robust against this type of meddling, but that would be fragile and doesn’t scale well, so it’s far from an ideal solution.

Looking at things from this perspective, perhaps the missing piece in Julia’s async story is a way to close() a listener without closing the object that it is listening on. That would allow us to do things like this:

julia> c = Condition()
       l = listen(c)
       @async close(l) # Close the listener, but leave the original condition untouched.
       wait(l)
ERROR: InvalidStateException: Listener is closed

Fortunately, Julia’s open and simple internals allow us to just go ahead and implement such a primitive ourselves:

mutable struct Listener{T}
    object::T
    task::Union{Nothing, Task}
end

listen(object) = Listener(object, nothing)

function Base.wait(listener::Listener)
    listener.task = current_task()
    try
        wait(listener.object)
    finally
        listener.task = nothing
    end
end

function Base.close(listener::Listener)
    if (
        !isnothing(listener.task) &&
        !isnothing(listener.task.queue)
    )
        Base.list_deletefirst!(listener.task.queue, listener.task)
        schedule(
            listener.task,
            InvalidStateException("Listener is closed", :closed);
            error = true
        )
    end
end

Admittedly, I’m messing around here with things that I only just barely understand, so I’d be curious to hear what more knowledgeable people than me think about this!