Interrupting wait(Threads.Event())

I want to wait for a Threads.Event() for at most 1 second, then I want to move on regardless of whether the event was notified or not. The following works, but it leaks a Task and a dummy event every time I do this.

event = Threads.Event() # Actual event I want to wait for
dummy_event = Threads.Event() # Helper
@async begin
    wait(event)
    notify(dummy_event)
end
Timer(t->notify(dummy_event), 1.0)
wait(dummy_event)

Is there a way to do this without leaking any resources?

I don’t think there’s an easy answer (maybe @jameson can correct me). Typically the right thing to do is work at another level, making sure the underlying operation finishes after the desired time. Maybe there’s a resource you can close after 1 second to make the original wait call return? Could you give more context about the actual problem?

See also the related disussion on this issue:

https://github.com/JuliaLang/julia/issues/36217

2 Likes

Thanks for your interest, @sijo! My situation is this: I have a couple of devices connected over a wireless IP network, and I would like to display their locations in an interactive Makie plot. To start this plot, I would like to ask all devices for their locations and then update the axis limits before the plot is displayed. Sounds straightforward, but the catch is that I don’t want a single unreachable device to delay the plot forever. Hence I would like to wait() for the locations with a timeout.


I’m aware of the discussion in #36217. Unfortunately, while that issue contains many interesting pointers, the main advice for problems like mine seems to be “Intentionally not directly possible”. I’m sure there’s a lot of wisdom behind this statement, but on its own it is not very helpful…

You’re welcome! Here’s what I’d do based on the problem description: create one Channel for the plot, and start a task that runs a loop to take the messages as they arrive from the channel (blocking each time until a message is there). Each time I get an answer from a device I push it to the channel. This wakes the task so it can decide if it has enough messages to show the plot. To handle the timeout issue, just after creating the channel I also start a 1 second timer to push a dummy message. This will also wake the task so it can again decide what to do at this point. (Also, if the task decides to show the plot before the timer fires it can close the timer.)

Would something like this make sense in your case?

2 Likes

That’s very similar to what I had in my original post except that you use a dummy_channel instead of my dummy_event. Correspondingly, your proposal leads to the same problems as mine. However, I came to realise that I never explicitly mentioned one of these problems, so before we continue the discussion I should probably first fix that.

Going back to the solution described in the original post, avoiding the resource leak is in principle straightforward: all I have to do is to “close” the original event that I’m waiting for. Unfortunately, events don’t have an API analogous to close(), so for the purpose of this argument I’ll just notify it instead.

# Actual event I want to wait for
event = Threads.Event() 

dummy_event = Threads.Event() # Helper event
@async begin
    wait(event)
    notify(dummy_event)
end
Timer(t->notify(dummy_event), 1.0)
wait(dummy_event)

# This should be `close(event)`. The main point here 
# is to kill off the helper task somehow. 
notify(event) 

Of course, the problem with this solution is that we mess with the original event variable in a way that will likely break other code using it. Such other code could be made robust against this type of meddling, but that would be fragile and doesn’t scale well, so it’s far from an ideal solution.

Looking at things from this perspective, perhaps the missing piece in Julia’s async story is a way to close() a listener without closing the object that it is listening on. That would allow us to do things like this:

julia> c = Condition()
       l = listen(c)
       @async close(l) # Close the listener, but leave the original condition untouched.
       wait(l)
ERROR: InvalidStateException: Listener is closed

Fortunately, Julia’s open and simple internals allow us to just go ahead and implement such a primitive ourselves:

mutable struct Listener{T}
    object::T
    task::Union{Nothing, Task}
end

listen(object) = Listener(object, nothing)

function Base.wait(listener::Listener)
    listener.task = current_task()
    try
        wait(listener.object)
    finally
        listener.task = nothing
    end
end

function Base.close(listener::Listener)
    if (
        !isnothing(listener.task) &&
        !isnothing(listener.task.queue)
    )
        Base.list_deletefirst!(listener.task.queue, listener.task)
        schedule(
            listener.task,
            InvalidStateException("Listener is closed", :closed);
            error = true
        )
    end
end

Admittedly, I’m messing around here with things that I only just barely understand, so I’d be curious to hear what more knowledgeable people than me think about this!

Could you explain the problems in the case of my channel-based solution?

Among the reasons why I think the channel is a good fit here:

  • it can be closed
  • you can interrupt the waiter by sending a dummy value over the channel

So it seems to me that it does solve the problems. I don’t see anything being leaked: there’s just one task and one channel associated with the plot (and the task can even be stopped at any time if desired). In the worst case, you have a closed channel that stays around after the plot is finished, if you still have pending requests. But that’s not a leak, it’s just an object tied to the lifetime of the requests.

Or maybe the problem is that the program is “leaking” requests? Like if you make a ton of plots and for each plot some requests never finish. Then you would “leak” one closed channel per plot. But then I think the real problem is with the requests, the channel would be insignificant.

The problem with channels is that they require very fine-grained cooperation between the sender and receiver. Specifically, channels allow me to only ping one receiver at the time, and if that one receiver can’t keep up with the sender then the sender will eventually get blocked. In my application, this would mean that every time I want to add a new feature that requires listening in on location updates (e.g. if I change my mind and want to update axis limits continuously, or I want to add a logger), then I would have to go and edit the code of my main loop to open up a new channel, and if any of these dependants die then eventually the main loop will get blocked. That’s something I’d like to avoid; ideally, the main loop would just expose a single condition variable that gets notified every time a new location comes in. That way, any number of listeners can tune in, and if any of them can’t keep up with the main loop then they’ll just miss an update or two instead of blocking the main loop. All in all, this sounds like a more modular and robust design to me.

However, there is still one problem with this condition variable approach, namely I can only ever unsubscribe from updates when a new update comes in. In my case, this would mean that if I subscribe to location updates many times before the first update comes in, then by the time that update does come in I will have to work my way through a huge pile of subscribers only to realise that most of them have been abandoned by now. It would be more scalable if I could subscribe for location updates and also unsubscribe if I lose interest. That’s the need that my listen() API proposed above is trying to address.

1 Like

Ah so there are several receivers for the same sender. That looks like a classic publish-subscribe problem which you can solve with channels but:

You’re right… I’m surprised to see that Julia channels don’t support non-blocking send and receive :cry: (I’ve used channels a lot in Go and these features were very useful and supported on day 1).

If possible I’d rather design a solution where the cause of the event plugs into my API (rather than me plugging into the event), maybe something like this:

struct Subscription
    event::Threads.Event
end

const Subscriptions = Set{Subscription}

# An event publisher
struct Publisher
    subscriptions::Subscriptions
    Publisher() = new(Subscriptions())
end

function publish(p::Publisher)
    for s in p.subscriptions
        notify(s.event)
    end
end

function subscribe(c::Publisher)
    s = Subscription(Threads.Event())
    push!(c.subscriptions, s)
    return s
end

# Block until the event is published or until unsubscribe is called
Base.wait(s::Subscription) = wait(s.event)

function unsubscribe(c::Publisher, s::Subscription)
    delete!(c.subscriptions, s)
    notify(s.event)
end

# Test

p = Publisher()

s1 = subscribe(p)
s2 = subscribe(p)
s3 = subscribe(p)

t1 = @async begin wait(s1); println("s1 done"); end
t2 = @async begin wait(s2); println("s2 done"); end
t3 = @async begin wait(s3); println("s3 done"); end

unsubscribe(p, s1)
wait(t1)

println("Publishing...")
publish(p)
wait(t2)
wait(t3)

(Mutex protections would be necessary if the publisher is used from multiple threads.)

If I cannot get the event producer to plug into the publisher, I can do it manually:

external_event = Threads.Event()

# Connect external event to publisher
@async begin
    wait(external_event)
    publish(p)
end

# Fire event after 1 second
Timer(1) do timer
    println("Firing event...")
    notify(external_event)
end

s = subscribe(p)
wait(s)
println("Wait returned.")

This will leave a task running until the event fires, even if all the subscribers have unsubscribed. If that’s unacceptable and if you have to use an event that’s outside your control (so you can’t interfere with it), then I don’t see a better way than what you propose…

Thanks for you input, @sijo, and sorry for taking quite a while to get back to you. It was time for me to buckle down and get things moving. Unfortunately, that meant that I had to pause our conversation for a little while.

You are of course absolutely right that the publish / subscribe pattern is what I want here, and it turns out the Julia ecosystem already provides precisely the tool that I needed in the form of Observables.jl. My initial design was to have a task that continually listens for location updates from my devices and then manually updates all the relevant features of the Makie plot whenever a new update came in. This very quickly got me up and running but also very quickly turned into a huge mess whenever I tried to add more complicated connection monitoring and UI features. So after a lot of trial and error, I eventually settled on a design where I have one component which is responsible exclusively for managing the connections to the devices and converting all the location updates into a single Observable{Point2f}, and then a separate component which defines a couple of derived observables using map() and then passes these observables to Makie for plotting. This design resulted in super clean and flexible code, so I was very proud of myself until my colleague pointed out that this is precisely how all major web UI frameworks work (see e.g. Vue.js or React.js). So basically I’ve just spent a week reinventing the wheel :see_no_evil:. But oh well, at least I’ve learned something along the way.


Returning to the original topic, I still think it would be nice if Julia provided a safe way to close() / notify() individual listeners for one-to-many events like Conditions and Events. AFAICT, this can be done without making any changes to the existing code (see my earlier post), and I don’t think the objections to timeouts or select statements mentioned in `wait()` with timeout · Issue #36217 · JuliaLang/julia · GitHub apply to such a design. But that’s just my $0.02, and I’d be curious to hear other people’s opinions.

Ah right I didn’t realize this was basically implemented in Observable.jl already :slight_smile:

I’m not sure about your proposed addition to the API: there might be a reason why cancelling a single listener is usually not part of the API (see for example the condition variables in the C++ and Rust standard libraries). Maybe the reason is that it would be difficult or inefficient in the general case (various operating systems). Maybe in the special case of the Julia execution model it wouldn’t be a problem, I don’t know. In any case I think it would be nice if Julia at least implemented the usual system APIs, e.g. wait calls that take a timeout and/or a predicate parameter.

Returning to the original topic, I still think it would be nice if Julia provided a safe way to close() / notify() individual listeners for one-to-many events like Condition s and Event s. AFAICT, this can be done without making any changes to the existing code (see my earlier post)

The list_deletefirst in your earlier post has a wide variety of data-races that make it very hard to reason about that code and almost impossible to write correctly or use. It does come up sometimes in Base and/or testing though, as means of recovering from some bad states (e.g. when errors happen that should not be possible), but it is used with there with the expectation that we are likely to crash anyways, so we do not care if it does so with a few extra errors along the way.

As you largely discovered, one reason that Julia doesn’t have wait_with_timeout is that it forces users to discover the cleaner design of pub-sub. It sounds like we should update the documentation to point people to Observables.jl, since your question is pretty common when coming from other languages that don’t have green-threads (and even from go, which does).

Returning to the original topic, I still think it would be nice if Julia provided a safe way to close() / notify() individual listeners for one-to-many events like Condition s and Event s

Do you just mean notify(all=false) or Event(autoreset=true)? Either of those will alert only a single listener at a time. They are notably much harder to use than all=true, but can be powerful tools when used exactly right (but otherwise your program hangs, which is generally unpleasant).

1 Like

I’d still love for Julia to add wait_with_timeout. It would align with the API of system languages, and it’s just nice for the user to have this common use-case covered by the standard library. Thinking in terms of pub-sub is a bit heavy-handed when you just want a particular wait call to have a timeout.

No, I meant single and specific listener. Something like this.

cond = Condition()  # Shared condition variable

task_1 = @async wait(cond)  # Some other task somewhere in the system

listener = listen(cond)  # An annotated wait() that can be canceled
task_2 = @async wait(listener)
close(listener)  # Kills task_2 but doesn't interfere with task_1

I admit that that’s basically pub-sub, and also that Observables.jl fits this use case quite precisely. So yes, maybe a reference to Observables.jl in the docs is all I would have needed to be sent in the right direction.

I believe the argument against wait_with_timeout() goes like this:

  • It’s rare that you want to wait for either of two events (the wait() target or the timeout) and don’t care which one occurred.
  • Once wait_with_timeout() returns, it can be ugly and error-prone to figure out which event occurred and act accordingly.
  • If the language provides wait_with_timeout(), then people will start by putting that statement down and then go on to write the ugly and error-prone code to figure out what to do next, even though a better design would have been right there if only they hadn’t prematurely locked into wait_with_timeout().

Note that the above is my best attempt at summarising https://github.com/JuliaLang/julia/issues/13763. I don’t necessarily claim that these statements are true, and also it is entirely possible that I misunderstood the discussion there. The Select Statement Considered Harmful blog post might also be relevant here.

Personally, I can accept that something like the above might be true, and I also find the following workaround quite acceptable. (But we could probably advertise it more. It is very frustrating to be told “don’t do this” without being offered an alternative.)

cond = Condition()
timer = Timer(1.0) do _
    notify(cond; error = true)
    # Code to execute if timer expires goes here.
end
try
    wait(cond)
catch else
    close(timer)
    # Code to execute if event occurs goes here.
end

The one issue I had is that the above requires me to mess with all listeners to cond, even though maybe what I really want is to wake only precisely one specific listener. Unfortunately, I believe @jameson’s comment implies that such a selective notify() can’t be added to Condition() or Event() in a safe way.

Thanks for summarizing that thread.

Regarding the timeout I don’t understand the conclusion (I understand it’s not your personal position). These are old concerns in the systems programming world, there’s been a lot of thought put in the current APIs you can find in C++ and Rust for example and in my experience they work quite well. Yes it’s easy to write racy code, but the situation is only worse when you offer a more bare-bones API like in Julia. See for example the wait, wait_for and wait_until functions in C++:

  • The latter two provide a timeout and the return value will tell you if the wait returned because of the timeout. If a user has to implement that themselves it’s just increasing the odds that they will get something wrong.
  • All three functions accept a predicate argument, so that the function can check automatically for spurious wakeups while holding the mutex. It’s a subtle thing for beginners, but important when using condition variables, and much easier to get wrong if you have to implement it yourself (assuming you are even aware of the issue).

I’m quite sure having these features in the standard library decreases the odds of users making race conditions. I think Julia should include them.

The usual systems APIs are far from perfect of course, but if anything the problem is that they lack high-level conveniences to help write safe code.

Regarding select, maybe the POSIX version is a good example: a common complaint in Linux programming was that there was no nice way to wait on several events of different types: user events, sockets, file changes, signals, child processes, timers, etc. (while Windows has WaitForMultipleObjects/IOCP and FreeBSD has kqueue). Over the years Linux has added timerfd, eventfd, signalfd, pidfd, etc. which can all be used together in epoll. This has probably simplified a lot of user code, leading to fewer bugs. (As for the blog post, I think it’s a lot of text about a non-issue, and actually the proposed alternative is worse than select since a select in Go is at least an explicit mark that the order of events is undefined).

A lot of the above was about kernel events but I think the same logic applies to user events.

1 Like

I fully agree. In fact, if the resistance to select and wait_with_timeout didn’t come from a source as reputable as the Julia core devs, I probably would have dismissed it near instantly. It feels like the Julia devs have some brilliant grand vision for how async code should be written, but somehow mere mortals like myself seem to never quite get the point. So a key motivation for me to participate in this discussion was to see whether maybe I can tickle a few more pointers out of the relevant people…

2 Likes