Are Julia Channels/Futures thread safe (with a failing code example)?

I thought the answer is “obviously they are”, but I get an intermittent deadlock in the following code:

using Base.Threads
using Distributed

iterations = parse(Int, ARGS[1])

println("threads: $(nthreads())")
println("iterations: $(iterations)")

channel = Channel{Future}(nthreads() * 2)

counter = 0

function server()
    global counter
    println("T$(threadid()) will take")
    while counter < (nthreads() - 1) * iterations
        future = take!(channel)
        counter += 1
        println("T$(threadid()) did take, will put $(counter)")
        flush(stdout)
        put!(future, counter)
        println("T$(threadid()) did put $(counter), will take")
        flush(stdout)
    end
    println("T$(threadid()) done $(counter)")
    flush(stdout)
end

function client()
    prev = 0
    for iteration in 1:iterations
        future = Future()
        println("T$(threadid()) I$(iteration) will put")
        flush(stdout)
        put!(channel, future)
        println("T$(threadid()) I$(iteration) did put, will fetch")
        flush(stdout)
        next = fetch(future)
        println("T$(threadid()) I$(iteration) did fetch $(next)")
        flush(stdout)
        @assert next > prev
        prev = next
    end
end

@threads for t in 1:nthreads()
    if t == 1
        server()
    else
        client()
    end
end

println("T$(threadid()) done $(counter)")
flush(stdout)

@assert counter == (nthreads() - 1) * iterations

Using Julia 1.6.1 running this as JULIA_NUM_THREADS=2 julia Bug.jl 10000. Here is an example where it deadlocked with the following output:

threads: 2
iterations: 10000
T1 will take from Channel{Future}(4)
T2 I1 will put Future(1, 1, 1, nothing) into Channel{Future}(4)
T2 I1 did put, will fetch Future(1, 1, 1, nothing)
T1 did take, will put 1 into Future(1, 1, 1, nothing)
T1 did put 1 into Future(1, 1, 1, Some(1)), will take from Channel{Future}(4)
T2 I1 did fetch 1 from Future(1, 1, 1, Some(1))
T2 I2 will put Future(1, 1, 2, nothing) into Channel{Future}(4)
T2 I2 did put, will fetch Future(1, 1, 2, nothing)
T1 did take, will put 2 into Future(1, 1, 2, nothing)
T2 I2 did fetch 2 from Future(1, 1, 2, Some(2))
T1 did put 2 into Future(1, 1, 2, Some(2)), will take from Channel{Future}(4)
T2 I3 will put Future(1, 1, 3, nothing) into Channel{Future}(4)
T2 I3 did put, will fetch Future(1, 1, 3, nothing)
T1 did take, will put 3 into Future(1, 1, 3, nothing)
T2 I3 did fetch 3 from Future(1, 1, 3, Some(3))

... Many lines elided ...

T1 did put 1667 into Future(1, 1, 1667, Some(1667)), will take from Channel{Future}(4)
T2 I1668 will put Future(1, 1, 1668, nothing) into Channel{Future}(4)
T2 I1668 did put, will fetch Future(1, 1, 1668, nothing)
T1 did take, will put 1668 into Future(1, 1, 1668, nothing)
T2 I1668 did fetch 1668 from Future(1, 1, 1668, Some(1668))
T1 did put 1668 into Future(1, 1, 1668, Some(1668)), will take from Channel{Future}(4)
T2 I1669 will put Future(1, 1, 1669, nothing) into Channel{Future}(4)
T2 I1669 did put, will fetch Future(1, 1, 1669, nothing)
T1 did take, will put 1669 into Future(1, 1, 1669, nothing)
T2 I1669 did fetch 1669 from Future(1, 1, 1669, Some(1669))
T1 did put 1669 into Future(1, 1, 1669, Some(1669)), will take from Channel{Future}(4)
T2 I1670 will put Future(1, 1, 1670, nothing) into Channel{Future}(4)
T2 I1670 did put, will fetch Future(1, 1, 1670, nothing)
T1 did take, will put 1670 into Future(1, 1, 1670, nothing)
T2 I1670 did fetch 1670 from Future(1, 1, 1670, Some(1670))
T1 did put 1670 into Future(1, 1, 1670, Some(1670)), will take from Channel{Future}(4)
T2 I1671 will put Future(1, 1, 1671, nothing) into Channel{Future}(4)
T2 I1671 did put, will fetch Future(1, 1, 1671, nothing)
T1 did take, will put 1671 into Future(1, 1, 1671, nothing)
T1 did put 1671 into Future(1, 1, 1671, Some(1671)), will take from Channel{Future}(4)

And the program blocks. Doing kill -QUIT gives me the traces:

signal (3): Quit
in expression starting at Bug.jl:46
epoll_wait at /usr/bin/../lib/libc.so.6 (unknown line)
uv__io_poll at /usr/bin/../lib/julia/libjulia-internal.so.1 (unknown line)
uv_run at /usr/bin/../lib/julia/libjulia-internal.so.1 (unknown line)
jl_task_get_next at /usr/bin/../lib/julia/libjulia-internal.so.1 (unknown line)
unknown function (ip: 0x7f5c43cac7fd)
unknown function (ip: 0x7f5c43ca2f44)
fetch_buffered at ./channels.jl:365
fetch at ./channels.jl:359
fetch_ref at /build/julia/src/julia-1.6.1/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:539
call_on_owner at /build/julia/src/julia-1.6.1/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:492 [inlined]
fetch at /build/julia/src/julia-1.6.1/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:533
client at Bug.jl:38
macro expansion at Bug.jl:50 [inlined]
#3#threadsfor_fun at ./threadingconstructs.jl:81
#3#threadsfor_fun at ./threadingconstructs.jl:48
unknown function (ip: 0x7f5c3cb5f3bc)
unknown function (ip: 0x7f5c55851c1b)
unknown function (ip: (nil))
unknown function (ip: (nil))
__futex_abstimed_wait_common64 at /usr/bin/../lib/libpthread.so.0 (unknown line)
pthread_cond_wait at /usr/bin/../lib/libpthread.so.0 (unknown line)
uv_cond_wait at /usr/bin/../lib/julia/libjulia-internal.so.1 (unknown line)
jl_task_get_next at /usr/bin/../lib/julia/libjulia-internal.so.1 (unknown line)
unknown function (ip: 0x7f5c43cac7fd)
unknown function (ip: 0x7f5c43ca2f44)
take_buffered at ./channels.jl:389
take! at ./channels.jl:383
server at Bug.jl:17
macro expansion at Bug.jl:48 [inlined]
#3#threadsfor_fun at ./threadingconstructs.jl:81
#3#threadsfor_fun at ./threadingconstructs.jl:48
unknown function (ip: 0x7f5c3cb5f3bc)
unknown function (ip: 0x7f5c55851c1b)
unknown function (ip: (nil))
unknown function (ip: 0x7f5c42c1fdaf)
unknown function (ip: (nil))
Allocations: 949350 (Pool: 943523; Big: 5827); GC: 1

So… why does this deadlock?

On its face T1 did put a value into the future but T2 is blocked fetching it. This happens intermittently. If I run the program with a low number of iterations it has a fair chance of succeeding. This seems to imply some sort of concurrency bug. Am I “abusing” the channel/futures mechanism here? If so, what is the correct way to establish this sort of client/server threads (which would also work if the server thread was on a different worker process)?

(This is the most simplified form I could come up with of the issue I asked about here Problem combining worker processes and threads - #2 by jpsamaroo - here there’s only one process and I’m using futures to send the response from the server to the client(s), so the code is much simpler and the issue might be clearer to figure out.

Help?

1 Like

It seems the answer is “no” as in Concurrency violation on interplay between Distributed and Base.Threads · Issue #37706 · JuliaLang/julia · GitHub - I must say I’m surprised that basic coordination classes such as futures and channels are not thread safe.

1 Like

I tried your code, it seems to have been fixed as of 1.7.2
As I gained confidence in the Channel being thread safe, I found a tricky race condition in my own reduction code.
I hope there’s a good test suite to maintain the multithreading facilities. They’re very well designed and a pleasure to code with.

Yes, many of the races in Distributed were fixed in the 1.7 release branch. Multithreading support for Distributed should only improve, since that stdlib is mostly only seeing bugfixes these days (it’s basically feature-complete).

1 Like