Problem combining worker processes and threads

(Running julia 1.6.1 if that matters)

Seeking help in understanding what am I doing wrong in an application which combines multiple worker processes and multiple threads within each one. I’ve been pushing and pulling at this for a while, any advice woyld be appreciated.

Combining worker processes and threads seems to be not well-supported by Julia (that is, compared to the excellent support for using either multiple worker processes or multiple threads).

The package I’m writing attempts to provide convenient API for this combination, the notion being that one runs a coherent application across multiple servers in a compute cluster, each with multiple CPUs.

At any rate, things seem to be working reasonably well, except that I get really strange errors in some tests. Specifically, if you repeatedly run only the dt_foreach test (a test which runs a loop across all threads of all worker processors), I see:

  1. The test passes (most often)
  2. Deadlocks (less common, but not that rare)
  3. An error saying I’m taking from a closed channel (which “can’t happen”). Pretty rare, but happens.
  4. An error converting an object from one type to another (which also “can’t happen”). Also pretty rare, but happens.
  5. GC crashing on “task switch within GC” (which really “can’t happen”). I only saw this once.

The detailed crash traces are available in Errors running dt_foreach test in Snarl.jl commit 8d82051 · GitHub
The code is in GitHub - orenbenkiki/Snarl.jl: An opinionated framework for parallel processing. - latest commit 8d82051e7fac06dbf3e153fac64d81b59a68ef2d as of the time I’m writing this.

I’m assuming I’m doing something very wrong, but I can’t figure out what. Seeing the GC crash got me suspecting I might be pushing Julia beyond what it is capable of - perhaps even a bug in the standard libraries/compiler - but of course it is still more likely the fault is mine.

The code that breaks is a small part of the system, actually just a part of the testing code. I’m spawning a service thread that runs “forever” on the main process, which listens on a channel which is known everywhere. Every other thread (on either the main process or on a worker process) can send, on this channel, a request for a unique value for a counter, which is encoded as a tuple of (counter-identifier, response-channel). The service thread increments the designated counter (there’s an array of them) and sends the result unique value on the response channel, then closes it (closes the response channel, not the global channel, of course). That is, the response channel is only used to pass one message, possibly between threads in different processes.

Here is the relevant code fragment:

used_counters = zeros(Int, COUNTERS)

function make_counters_channel()::Channel{Tuple{Int,RemoteChannel{Channel{Int}}}}
    return Channel{Tuple{Int,RemoteChannel{Channel{Int}}}}(nprocs() * nthreads())
end

@send_everywhere counters_channel RemoteChannel(make_counters_channel)

function serve_counters()::Nothing
    while true
        request = take!(counters_channel)

        counter = request[1]
        response_channel = request[2]

        used_counters[counter] += 1

        put!(response_channel, used_counters[counter])
        close(response_channel)
    end

    error("Never happens")  # untested
end

remote_do(serve_counters, 1)

@everywhere function next!(counter::Int)::Int
    response_channel = RemoteChannel(() -> Channel{Int}(1))
    put!(counters_channel, (counter, response_channel))
    sleep(0) # Doesn't solve the problem.
    atomic_fence() # Doesn't solve the problem.
    yield()  # Doesn't solve the problem.
    # Problems manifested here:
    # - Deadlock
    # - GC error
    # - Taking from a closed channel.
    return take!(response_channel)
end

remote_do(serve_counters, 1)

@everywhere function next!(counter::Int)::Int
    response_channel = RemoteChannel(() -> Channel{Int}(1))
    put!(counters_channel, (counter, response_channel))
    sleep(0) # Doesn't solve the problem.
    atomic_fence() # Doesn't solve the problem.
    yield()  # Doesn't solve the problem.
    # Problems manifested here:
    # - Deadlock
    # - GC error
    # - Taking from a closed channel.
    return take!(response_channel)
end

Where @send_everywhere is a macro I provide for setting a global variable “@everywhere”.

Am I doing something very wrong here, or did I hit on a bug?

Any help would be appreciated.

This isn’t a GC “crash”, this is just a regular error from a finalizer (which looks like a crash because it shows a non-fancy stacktrace). The issue is that WeakKeyDict uses a ReentrantLock internally to ensure consistency, but trying to lock that within a finalizer can sometimes cause a yield(), which is not valid in a finalizer. I believe this should be fixed by fix numerous issues with WeakKeyDict by vtjnash · Pull Request #38180 · JuliaLang/julia · GitHub.

The rest of the errors I’m not sure about since I haven’t looked at your code for very long. Can you post an MWE that exhibits the bugs in question? The above code snippet doesn’t define COUNTERS, and doesn’t qualify nthreads(); once I set COUNTERS=100 and made the definition Threads.nthreads(), it ran fine for me with 6 threads and 6 processes. Am I supposed to call next!() to reproduce the bug?

Thanks for the clarification of the GC issue, it had me worried. Thanks!

I see the errors when running 4 worker processes and 4 threads in each one (a total of 16 threads). For the test I’m running them all on the same machine. The value of COUNTERS seems immaterial.

Yes, one should repeatedly call next!() on each of the threads on each of the processes - currently my way to trigger this is to run the tests in my package (which requires git cloning the package and running the tests - deps/test.sh dm_foreach would do the trick).

Not looking at the code for long, and ignoring the GC issue, it should be “easy” to see that there’s no way that take!(response_channel) would be trying to read from an empty closed channel as the serving thread always put!(response_channel, ...) before it does close(response_channel). So on its face the error complaining about taking from a closed channel is “impossible” (line 124 of the 1st file in the errors gist)?

I guess there’s no helping it and I’ll have to extract just this part of it to a standalone mwe.jl file and hope the problem doesn’t mysteriously disappear when I do that…

UPDATE: I have created a simple file that reproduces the problem (or at least, close to it). Running it as in JULIA_NUM_THREADS=4 julia Bug.jl 4 1000 quiet will either (1) Deadlock (2) Succeed (3) Die with an error about conversion or (4) Die with errors about GC and concurrency violations. The following gist Julia Mixing Worker Processes and Threads Bug · GitHub contains the source code (Bug.jl) and outputs demonstrating (3) and (4).

Am I doing something “illegal” in this code?

So, it seems that this is due to Are Julia Channels/Futures thread safe (with a failing code example)? (which ties in to Concurrency violation on interplay between Distributed and Base.Threads · Issue #37706 · JuliaLang/julia · GitHub). I must say I’m surprised that basic coordination classes such as futures and channels are not thread safe.

Composable multithreading was only added to the language in Julia 1.3, but Distributed, Channels, and Futures have been around much longer (probably since the beginning of the language?). If they were reimplemented today, I’m sure they’d be thread-safe from the start.

Also, note that Channels are thread-safe: they have internal locking to ensure that you can’t corrupt a Channel by putting/taking from multiple threads.

My original code used only channels, I still got unpredictable problems - not only GC errors but also deadlocks and crashes. I definitly can crash a Julia program using only channels and threads, with no futures in the picture.

RemoteChannel != Channel. RemoteChannel is a Distributed object, which relies on Distributed itself being thread-safe.

1 Like

Fair point. That said, I still wish the documentation for Distributed had a warning at the start that “Distributed is not thread-safe unless stated otherwise”. Then the specific thread-safe operations would be listed (a growing list given the active work on this). Right now, it is difficult to tell what is/not thread-safe.