Catching errors thrown by worker processes

Hi there,

I want to be informed of errors arising in Julia worker processes. Right now the following code does not inform me about the thrown error. The second println is obviously not executed. The feedback I get, e.g. when run by “julia -p 2 script-name.jl” is

From worker 2: This is a test print from worker 2
From worker 3: This is a test print from worker 3

The code is

using Distributed

@everywhere begin
    function worker()
        worker_id = myid()
        println("This is a test print from worker ", string(worker_id))

        error("This is a thrown, uncatched error from worker"*string(worker_id))

        println("This is another test print from worker ", string(worker_id))
    end

end

function main()

    current_workers = workers()
    for i in current_workers
        @spawnat i worker()
    end

end

main()

What is the best way to get notified of errors? In future, I want to run a lot worker processes on different machines in parallel.
Thanks for answers and comments in advance,
jamble

When you fetch the result of @spawnat, the error will be either received as the result, or thrown at that call (I don’t recall which).

Thanks for the quick answer! I was not aware that fetching handles errors!

What I normally do is letting worker processes communicate with the master process via RemoteChannels. So there is no fetching involved there. What would be the way to go for getting notified of errors in worker processes in that case?

I ran into a similar issue with callbacks from a C library, if my code errored during the callback I didn’t get any notification. I ended up wrapping the callback handlers in:

function log_exception(ex, stack)
    local buffer = IOBuffer()
    showerror(buffer, ex)
    write(buffer, "\n\n")
    for bt in stacktrace(stack)
        showerror(buffer, bt)
        write(buffer, '\n')
    end
    seekstart(buffer)
    @warn "$(read(buffer, String))"
end

try
   callback()
catch ex
    log_exception(ex, catch_backtrace())
end

You could probably use the same approach, the log_exception() function currently prints the error but you could send the IOBuffer it back to the control instance for display.

Same sort of answer: take! on a RemoteChannel propagates the error.

My setup normally looks like the following:

using Distributed

@everywhere begin
    function worker(jobs, results)
        worker_id = myid()
        println("Worker started ", string(worker_id))

        for i in 1:2
            if worker_id == 2
                job = take!(jobs)
                # error occurs while doing the job
                error("This is a thrown, uncatched error from worker"*string(worker_id))
                put!(results, "Worker "*string(worker_id)*" did "*job) # this line will not be executed
            else
                job = take!(jobs)
                # no error occurs
                put!(results, "Worker "*string(worker_id)*" did "*job)
            end
        end
        println("Worker finished ", string(worker_id))
    end

end

function main()

    jobs = RemoteChannel(()->Channel{String}(2),1)
    results = RemoteChannel(()->Channel{String}(2),1)

    current_workers = workers()
    for i in 1:length(current_workers)
        @spawnat current_workers[i] worker(jobs, results)
    end
    @sync begin
        @async for i in 1:3
            put!(jobs, "Job "*string(i))
        end

        timeout = 2.0
        @async begin
            last_result = time()
            while time()-last_result < timeout
                if isready(results)
                    result = take!(results)
                    println(result)
                    last_result = time()
                else
                    sleep(0.01)
                end
            end
        end
    end


end

main()

When I run this, I do not get informed about the error in worker 2. The output is the following with 3 workers:

From worker 2: Worker started 2
From worker 3: Worker started 3
From worker 4: Worker started 4
Worker 3 did Job 2
Worker 4 did Job 3

I do not see how fetching or taking will promote errors here as they “do not see them” - the worker process just dies and that’s it.

A general question is, why e.g. print statements from worker processes are forwarded to the master process but errors aren’t? Is this a design choice?

This looks promising. I will have a look, if I can implement this :slight_smile:

EDIT: So you suggest catching the error in the worker process. I would like to handle all exceptions in the master process. Nevertheless, I like your exception handling function!

I am stupid… @jpsamaroo, your first answer was exactly what I wanted! I just didn’t see it! I changed the initial code I posted, so it handles the errors:

using Distributed

@everywhere begin
   function worker()
       worker_id = myid()
       println("This is a test print from worker ", string(worker_id))

       error("This is a thrown, uncatched error from worker"*string(worker_id))

       println("This is another test print from worker ", string(worker_id))
   end

end

function main()

   current_workers = workers()
   futures = Array{Future}(undef, length(current_workers))
   for i in 1:length(current_workers)
       futures[i] = @spawnat i worker()
   end

   sleep(0.01)
   
   for f in futures
       try
           fetch(f)
       catch ex
           println(ex)
       end
   end

end

main()

The output of 3 workers is

From worker 2: This is a test print from worker 2
From worker 3: This is a test print from worker 3
From worker 4: This is a test print from worker 4
RemoteException(2, CapturedException(ErrorException(“This is a thrown, uncatched error from worker2”), Any[(error at error.jl:33, 1), (worker at parallel_stdio.jl:19, 1), (#59 at macros.jl:73, 1), (#109 at process_messages.jl:265, 1), (run_work_thunk at process_messages.jl:56, 1), (run_work_thunk at process_messages.jl:65, 1), (#102 at task.jl:259, 1)]))
RemoteException(3, CapturedException(ErrorException(“This is a thrown, uncatched error from worker3”), Any[(error at error.jl:33, 1), (worker at parallel_stdio.jl:19, 1), (#59 at macros.jl:73, 1), (#109 at process_messages.jl:265, 1), (run_work_thunk at process_messages.jl:56, 1), (run_work_thunk at process_messages.jl:65, 1), (#102 at task.jl:259, 1)]))
RemoteException(4, CapturedException(ErrorException(“This is a thrown, uncatched error from worker4”), Any[(error at error.jl:33, 1), (worker at parallel_stdio.jl:19, 1), (#59 at macros.jl:73, 1), (#109 at process_messages.jl:265, 1), (run_work_thunk at process_messages.jl:56, 1), (run_work_thunk at process_messages.jl:65, 1), (#102 at task.jl:259, 1)]))

Just one question remaining: When I run this with no additional workers, it does not print the error message. I get this result:

This is a test print from worker 1

What is the reason for this?

2 Likes

Try removing the try ... catch block and just let it throw the error. It might somehow be getting swallowed by the exception machinery.

@jpsamaroo This works somehow. I do not understand why - but that is not a big deal. Thanks for your help. I will accept your first answer!

1 Like