Mixing blocking MPI communication and Julia concurrency

Hi Julia users, I have the following problem.

I have an MPI code with a controller/worker architecture: one process distributes and coordinates work across a bunch of worker processes. This is implemented using blocking MPI.Recv and MPI.Send and works fine as is but now I want the controller process to be able to do some work in its free time as well. I thought concurrent programming is a great way to achieve that.

It is simple: just start two tasks (all on the same thread!), one controller and one worker on a single process. Let them talk to each other via MPI (this is completed supported by the standard!)

Now the devil is in the details. I faced two challenges. one I solved already. The second one is kind of leaving me stumped.

Problem 1 (solved)

The standard MPI.Send and MPI.Recv commands will block and not yield to other tasks. So I made a wrappers like this

function send(data, comm; dest, tag)
    req = MPI.Isend(data, comm; dest, tag)
    while !MPI.Test(req)
        yield()
    end
    return nothing
end

This works like a charm!

Problem 2: Error handling

What happens if either the controller or worker error? It depends on how you schedule the tasks. If you do

@sync begin
    @Threads.spawn start_controller()
    @Threats.spawn start_worker()
end

This will hang because of the behavior of @sync: It will return a composite exception after all tasks have completed. But with the interlocked MPI communications, as soon as one task fails, the other one will hang forever!

What I want is some sync_or_error that aborts completely as soon as even one of the task throws an error (sort of like MPI would behave). So I wrote a cheeky version of that

function sync_or_error(tasks::AbstractArray{Task})
    c = Channel(Inf)
    for t in tasks
        @Threads.spawn begin
            Base._wait(t)
            put!(c, t)
        end
    end
    for _ in eachindex(tasks)
        t = take!(c)
        if istaskfailed(t)
            throw(TaskFailedException(t))
        end
    end
    close(c)
end

This will indeed exit immediately with a nice exception once anything goes wrong. But it is cheeky in that the other tasks that did throw an exception just keep running/hanging in the background forever. This can cause trouble if you are in the REPL and try to start the controller again since they will try to talk to it out of nowhere.

My question: Is there an easy way to abort these hanging tasks?

I tried to read up a little about it and did not find that I don’t quite understand these caveats around using schedule(t, v; error=true) on running tasks. It seems almost like what I want. One challenge is that I cannot control where all of the yields are in the worker function since it is user code. Maybe that is related?