Concurrency violation on interplay between Distributed and Base.Threads

I’m working on a distributed pipeline algorithm that uses several stages per worker process. IIRC tasks cannot hop between threads once they’ve been scheduled. Since I want my stages to potentially run in parallel, I tried to create non-sticky tasks by chaining each D.@spawnat with a T.@spawn. However, this setup keeps failing/crashing and I don’t understand why.

I boiled it down to a minimal example:

using Distributed, Base.Threads

const D = Distributed
const T = Threads

pids = addprocs(10)
wids = repeat(pids, inner=2)

conns = map(RemoteChannel, wids)
fst = first(conns)
lst = RemoteChannel()
push!(conns, lst)

@everywhere begin
    function stillepost(i, prev, next)
        message = take!(prev)
        put!(next, message)
        @info "Player $i done"
    end
end

# Select how to orchestrate the players:
v = :v2 # Change me!
players = []
for i in 1:length(wids)
    w = wids[i]
    c1 = conns[i]
    c2 = conns[i+1]
    v == :v1 && (p = D.@spawnat w stillepost(i, c1, c2))
    v == :v2 && (p = D.@spawnat w fetch(T.@spawn stillepost(i, c1, c2)))
    push!(players, p)
end

# Start the game:
game = @async begin
    m1 = "gibberish"
    put!(fst, m1)
    m2 = take!(lst)
    @info "'$m1' turned into '$m2'; well done!"
end

# Demonstate success/failure:
if v == :v1
    # This one "works", but it is not my intended/preferred orchestration layout:
    wait(game)
elseif v == :v2
    # Player 2 fails, mostly due to concurrency violations:
    fetch(players[2])
end

How do I fix that? Am I holding it wrong?

Due to the double indirection even otherwise obvious things like “unknown call to stillepost” (which is fixed by know :see_no_evil:) are not that easy anymore. I find the debugger very hard to use in this distributed/parallel setting, any help or suggestions with this regard would also be greatly appreciated. :slight_smile:

Julia Version 1.5.1
Commit 697e782ab8 (2020-08-25 20:08 UTC)
Platform Info:
  OS: macOS (x86_64-apple-darwin19.5.0)
  CPU: Intel(R) Core(TM) i5-8259U CPU @ 2.30GHz
  WORD_SIZE: 64
  LIBM: libopenlibm
  LLVM: libLLVM-9.0.1 (ORCJIT, skylake)
Environment:
  JULIA_NUM_THREADS = 4
  JULIA_PROJECT = @.

Interestingly, if I use wids = fill(1, 10) both variants work just fine. On my exact use case, I get a deadlock. So there is at least one bug more I need to understand and fix.

In a fresh REPL, this is the output for version 2 selected:

julia> include("stillepost.jl")
[ Info: Player 1 done
ERROR: LoadError: On worker 2:
TaskFailedException:
concurrency violation detected
error at ./error.jl:33
concurrency_violation at ./condition.jl:8
assert_havelock at ./condition.jl:25 [inlined]
assert_havelock at ./condition.jl:48 [inlined]
assert_havelock at ./condition.jl:72 [inlined]
wait at ./condition.jl:102
wait_for_conn at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/cluster.jl:193
check_worker_state at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/cluster.jl:168
send_msg_ at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/messages.jl:176
send_msg at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/messages.jl:134 [inlined]
#remotecall_fetch#143 at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/remotecall.jl:389
remotecall_fetch at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/remotecall.jl:386
#remotecall_fetch#146 at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/remotecall.jl:421
remotecall_fetch at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/remotecall.jl:421
call_on_owner at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/remotecall.jl:494
put! at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/remotecall.jl:595 [inlined]
stillepost at /Users/jonas/.../stillepost.jl:18
#3 at ./threadingconstructs.jl:169
wait at ./task.jl:267 [inlined]
fetch at ./task.jl:282 [inlined]
#2 at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/macros.jl:87
#103 at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/process_messages.jl:290
run_work_thunk at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/process_messages.jl:79
run_work_thunk at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/process_messages.jl:88
#96 at ./task.jl:356
Stacktrace:
 [1] #remotecall_fetch#143 at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/remotecall.jl:394 [inlined]
 [2] remotecall_fetch(::Function, ::Distributed.Worker, ::Distributed.RRID) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/remotecall.jl:386
 [3] #remotecall_fetch#146 at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/remotecall.jl:421 [inlined]
 [4] remotecall_fetch at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/remotecall.jl:421 [inlined]
 [5] call_on_owner at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/remotecall.jl:494 [inlined]
 [6] fetch(::Future) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/remotecall.jl:533
 [7] top-level scope at /Users/jonas/.../stillepost.jl:48
 [8] include(::String) at ./client.jl:457
 [9] top-level scope at REPL[1]:1
in expression starting at /Users/jonas/.../stillepost.jl:44

Is this a mistake on my side or a might this be a bug in Julia?

Does look like a bug in Distributed. Can you open an issue?

I don’t understand how that can cause data corruption, but wait_for_conn waiting on the non-threadsafe Worker.c_state is definitely very smelly.

Thanks for the quick response! I opened an issue on GitHub.
https://github.com/JuliaLang/julia/issues/37706

1 Like