Why I am getting TaskFailedException for package ParallelUtilities?

I did not have much success using Threads.@threads to accelerate code execution. The code would sometimes run marginally faster and sometimes even slower.

So now I am trying packages like ParallelUtilities that use the Distributed package. But I have not even managed to get the code to run. This example is extracted from the documentation for ParallelUtilities and it generates a TaskFailedException. Any hints on what is going wrong?

using Distributed
using ParallelUtilities

addprocs(2)

@everywhere f(x) = ones(10_000, 1_000);

A = @time @distributed (+) for i=1:nworkers()
    f(i)
end;
1.931315 seconds (1.51 M allocations: 310.286 MiB, 3.94% gc time, 42.46% compilation time)

B = @time pmapreduce(f, +, 1:nworkers());   

ERROR: TaskFailedException

    nested task error: On worker 2:
    KeyError: key ParallelUtilities [fad6cfc8-4f83-11e9-06cc-151124046ad0] not found
    Stacktrace:
      [1] getindex
        @ .\dict.jl:481 [inlined]
      [2] root_module
        @ .\loading.jl:1056 [inlined]
      [3] deserialize_module
        @ C:\Users\fsald\AppData\Local\Programs\Julia-1.7.0\share\julia\stdlib\v1.7\Serialization\src\Serialization.jl:981
      [4] handle_deserialize
        @ C:\Users\fsald\AppData\Local\Programs\Julia-1.7.0\share\julia\stdlib\v1.7\Serialization\src\Serialization.jl:883
      [5] deserialize
        @ C:\Users\fsald\AppData\Local\Programs\Julia-1.7.0\share\julia\stdlib\v1.7\Serialization\src\Serialization.jl:801
      [6] deserialize_datatype
        @ C:\Users\fsald\AppData\Local\Programs\Julia-1.7.0\share\julia\stdlib\v1.7\Serialization\src\Serialization.jl:1331
      [7] handle_deserialize
        @ C:\Users\fsald\AppData\Local\Programs\Julia-1.7.0\share\julia\stdlib\v1.7\Serialization\src\Serialization.jl:854
      [8] deserialize
        @ C:\Users\fsald\AppData\Local\Programs\Julia-1.7.0\share\julia\stdlib\v1.7\Serialization\src\Serialization.jl:801
      [9] handle_deserialize
        @ C:\Users\fsald\AppData\Local\Programs\Julia-1.7.0\share\julia\stdlib\v1.7\Serialization\src\Serialization.jl:861
     [10] deserialize
        @ C:\Users\fsald\AppData\Local\Programs\Julia-1.7.0\share\julia\stdlib\v1.7\Serialization\src\Serialization.jl:801 [inlined]
     [11] deserialize_msg
        @ C:\Users\fsald\AppData\Local\Programs\Julia-1.7.0\share\julia\stdlib\v1.7\Distributed\src\messages.jl:87
     [12] #invokelatest#2
        @ .\essentials.jl:716 [inlined]
     [13] invokelatest
        @ .\essentials.jl:714 [inlined]
     [14] message_handler_loop
        @ C:\Users\fsald\AppData\Local\Programs\Julia-1.7.0\share\julia\stdlib\v1.7\Distributed\src\process_messages.jl:169
     [15] process_tcp_streams
        @ C:\Users\fsald\AppData\Local\Programs\Julia-1.7.0\share\julia\stdlib\v1.7\Distributed\src\process_messages.jl:126
     [16] #99
        @ .\task.jl:423
    Stacktrace:
     [1] remotecall_fetch(::Function, ::Distributed.Worker; kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
       @ Distributed C:\Users\fsald\AppData\Local\Programs\Julia-1.7.0\share\julia\stdlib\v1.7\Distributed\src\remotecall.jl:452
     [2] remotecall_fetch
       @ C:\Users\fsald\AppData\Local\Programs\Julia-1.7.0\share\julia\stdlib\v1.7\Distributed\src\remotecall.jl:444 [inlined]
     [3] #remotecall_fetch#158
       @ C:\Users\fsald\AppData\Local\Programs\Julia-1.7.0\share\julia\stdlib\v1.7\Distributed\src\remotecall.jl:479 [inlined]
     [4] remotecall_fetch
       @ C:\Users\fsald\AppData\Local\Programs\Julia-1.7.0\share\julia\stdlib\v1.7\Distributed\src\remotecall.jl:479 [inlined]
     [5] (::ParallelUtilities.ClusterQueryUtils.var"#1#3"{Vector{String}, Int64, Int64})()
       @ ParallelUtilities.ClusterQueryUtils .\task.jl:423

...and 1 more exception.

Stacktrace:
  [1] sync_end(c::Channel{Any})
    @ Base .\task.jl:381
  [2] macro expansion
    @ .\task.jl:400 [inlined]
  [3] hostnames(procs::Vector{Int64})
    @ ParallelUtilities.ClusterQueryUtils C:\Users\fsald\.julia\packages\ParallelUtilities\1gSmb\src\clusterquery.jl:25
  [4] procs_node
    @ C:\Users\fsald\.julia\packages\ParallelUtilities\1gSmb\src\clusterquery.jl:51 [inlined]
  [5] chooseworkers
    @ C:\Users\fsald\.julia\packages\ParallelUtilities\1gSmb\src\clusterquery.jl:91 [inlined]
  [6] maybetrimmedworkerpool
    @ C:\Users\fsald\.julia\packages\ParallelUtilities\1gSmb\src\clusterquery.jl:115 [inlined]
  [7] pmapreduce(f::Function, op::Function, iterators::UnitRange{Int64}; reducekw::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
    @ ParallelUtilities C:\Users\fsald\.julia\packages\ParallelUtilities\1gSmb\src\mapreduce.jl:285
  [8] pmapreduce
    @ C:\Users\fsald\.julia\packages\ParallelUtilities\1gSmb\src\mapreduce.jl:284 [inlined]
  [9] top-level scope
    @ .\timing.jl:220 [inlined]
 [10] top-level scope
    @ .\REPL[9]:0

Hi, It seems you need to load the package on all workers.

julia> using Distributed

julia> addprocs(2)
2-element Vector{Int64}:
 2
 3

julia> @everywhere using ParallelUtilities

julia> @everywhere f(x) = ones(10, 10);

julia> @time pmapreduce(f, +, 1:nworkers());
  8.675272 seconds (2.02 M allocations: 109.401 MiB, 0.60% gc time, 36.94% compilation time)

julia> @time pmapreduce(f, +, 1:nworkers()); # second run should be faster
  0.124944 seconds (10.09 k allocations: 580.779 KiB, 13.87% compilation time)

You might also need to activate the current project on all workers. eg. something like

@everywhere begin
  using Pkg; Pkg.activate(".")  # may be required if you're using a custom project
  using ParallelUtilities
  ...
end

Btw I’m super excited to see that people might find this package useful :slight_smile:

1 Like

Terrific answer. Clarified quite a few things for me. I am now trying your suggestions.

I have been trying different things related to parallelization with package Distributed and I realized that I have some basic questions I need to understand before I dig deeper into ParallelUtilities.

Basic Questions:

  1. I have seen recommendations to call Pkg.instantiate in this context. Is it necessary or helpful to do this?
@everywhere Pkg.instantiate
  1. If I have a block of code like
const a = 1
function myfunc(x) = x + 1

in my main program do I have to put @everywhere before every command in the block? That is, is there a way to apply a single @everywhere to all commands in the block? I think this can be done by putting the block in a file, say myblock.jl and then doing

@everywhere include myblock.jl

This is Ok but somewhat inconvenient. I suppose this could work:

@everywhere begin
    const a = 1
    function myfunc(x) = x + 1
end

Is this equivalent to adding @everywhere before every command in the block?

  1. I may want to run the code sequentially? For example, running the code sequentially may simplify debugging. What is the easiest way of toggling parallelization on and off? Can I just have an if statement that skips the addprocs(2) command depending on a boolean?
if someboolean
   addprocs(2)
  1. What do I do if I have a variable definition inside an if statement? For example, if my original code was
if mybool
    x = 1
end

should I change it in each of these two ways?

a)

@everywhere if mybool
    x = 1
end

b)

if mybool
    @everywhere x = 1
end
  1. This code :
myvar0 = slowfunction(x)
@everywhere myvar = myvar0

does not seem to work since myvar0 is not visible to the workers.

If a variable is defined by a slow calculation that does not need to be parallelized can I just calculate it once and then use @everywhere to make copies?
In other words, is there a way to access a variable that has been calculated in the main worker? Example:

myvar0 = slowfunction(x)
@everywhere myvar = worker1.myvar0
  1. Other languages/packages have an export function so that one could do
y = slowfunction(x)
export_to_cluster(y)

and the value (but not the calculation) of y would be exported to all workers.

Can something like this be done with Distributed?

I have seen recommendations to call Pkg.instantiate in this context. Is it necessary or helpful to do this?

This is to download dependencies. Depending on your use case this may not be necessary. In my case I run codes on a cluster, and I need a one-time installation of dependencies and never have this in my code.

Is this equivalent to adding @everywhere before every command in the block?

Yes, @everywhere begin end is equivalent to adding @everywhere before each line.

Can I just have an if statement that skips the addprocs(2) command depending on a boolean?

Yes this is possible. Even if you have added processes, it does not mean that the code is necessarily parallel. In Julia, unlike MPI, one needs to explicitly schedule parallel tasks, eg. though pmap or pmapreduce as above. The default mapreduce is executed in serial even if workers have been added. I often execute

@fetchfrom workers()[1] mapreduce(f, op, iter)

to execute something serially on a worker.

should I change it in each of these two ways?

@everywhere if mybool
    x = 1
end

This requires the variable mybool to be defined on the remote worker. In this case the entire if block is executed on each worker. In the other case,

if mybool
    @everywhere x = 1
end

the if block is executed on the master process, whereas the x=1 step is evaluated on each worker. The variable mybool needs to only be available on the master.

If a variable is defined by a slow calculation that does not need to be parallelized can I just calculate it once and then use @everywhere to make copies?

You may interpolate the variable, as

julia> @everywhere myvar = $myvar0

the value (but not the calculation) of y would be exported to all workers. Can something like this be done with Distributed ?

You may interpolate values if that’s convenient. You may also use something like GitHub - ChrisRackauckas/ParallelDataTransfer.jl: A bunch of helper functions for transferring data between worker processes

Hope this helps!