Use pmap(f,x) when f takes multiple arguments?

How can I call pmap(f,x) where f is a function that takes additional parameters?

I have a function f(x,y) where y is always known and I want to run pmap on elements of x.

Here is an example that works (without distributed):

infil> map(mean,collect(eachrow(SharedArray(rand(1:10,10,10)))))
10-element Vector{Float64}:
 4.2
 4.9
 6.9
 6.5
 5.0
 5.3
 5.9
 6.1
 5.4
 4.4

But the distributed version has errors:

infil> pmap(x->x.>1,collect(eachrow(SharedArray(rand(1:10,10,10)))))
ERROR: On worker 34:
UndefVarError: #340#341 not defined
Stacktrace:
  [1] deserialize_datatype
    @ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Serialization/src/Serialization.jl:1280
  [2] handle_deserialize
    @ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Serialization/src/Serialization.jl:827
  [3] deserialize
    @ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Serialization/src/Serialization.jl:774
  [4] handle_deserialize
    @ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Serialization/src/Serialization.jl:834
  [5] deserialize
    @ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Serialization/src/Serialization.jl:774 [inlined]
  [6] deserialize_msg
    @ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/messages.jl:87
  [7] #invokelatest#2
    @ ./essentials.jl:708 [inlined]
  [8] invokelatest
    @ ./essentials.jl:706 [inlined]
  [9] message_handler_loop
    @ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/process_messages.jl:169
 [10] process_tcp_streams
    @ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/process_messages.jl:126
 [11] #99
    @ ./task.jl:406
Stacktrace:
  [1] (::Base.var"#837#839")(x::Task)
    @ Base ./asyncmap.jl:177
  [2] foreach(f::Base.var"#837#839", itr::Vector{Any})
    @ Base ./abstractarray.jl:2141
  [3] maptwice(wrapped_f::Function, chnl::Channel{Any}, worker_tasks::Vector{Any}, c::Vector{SubArray{Int64, 1, SharedArrays.SharedMatrix{Int64}, Tuple{Int64, Base.Slice{Base.OneTo{Int64}}}, true}})
    @ Base ./asyncmap.jl:177
  [4] wrap_n_exec_twice
    @ ./asyncmap.jl:153 [inlined]
  [5] #async_usemap#822
    @ ./asyncmap.jl:103 [inlined]
  [6] #asyncmap#821
    @ ./asyncmap.jl:81 [inlined]
  [7] pmap(f::Function, p::WorkerPool, c::Vector{SubArray{Int64, 1, SharedArrays.SharedMatrix{Int64}, Tuple{Int64, Base.Slice{Base.OneTo{Int64}}}, true}}; distributed::Bool, batch_size::Int64, on_error::Nothing, retry_delays::Vector{Any}, retry_check::Nothing)
    @ Distributed /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/pmap.jl:126
  [8] pmap(f::Function, p::WorkerPool, c::Vector{SubArray{Int64, 1, SharedArrays.SharedMatrix{Int64}, Tuple{Int64, Base.Slice{Base.OneTo{Int64}}}, true}})
    @ Distributed /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/pmap.jl:101
  [9] pmap(f::Function, c::Vector{SubArray{Int64, 1, SharedArrays.SharedMatrix{Int64}, Tuple{Int64, Base.Slice{Base.OneTo{Int64}}}, true}}; kwargs::Base.Iterators.Pairs{Union{}, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
    @ Distributed /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/pmap.jl:156
 [10] pmap(f::Function, c::Vector{SubArray{Int64, 1, SharedArrays.SharedMatrix{Int64}, Tuple{Int64, Base.Slice{Base.OneTo{Int64}}}, true}})
    @ Distributed /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/pmap.jl:156
 [11] top-level scope
    @ none:1

Note that when f takes a single argument, this problem does not arise:

infil> pmap(mean,collect(eachrow(SharedArray(rand(1:10,10,10)))))
10-element Vector{Float64}:
 5.7
 5.2
 5.1
 4.7
 6.2
 4.4
 6.8
 5.7
 4.3
 5.3

Try declaring the anonymous function as a named function instead, and make sure its definition is made available to all worker processes. See Multi-processing and Distributed Computing · The Julia Language.

Hi @sidpatil, the issue with the approach of putting a named function inside a module, then exporting the module to workers (as described in the docs you linked), is that the something like:

module foo
export f
f(x) = x .> 1 # f(x,y) where y=1
end

would need to be loaded outside of any existing functions in my code. but the value of y in my application is local to one particular function.

thanks in advance for any workaround that can accommodate this.

You need a currying function to build a partial application of f(x, y)

julia> @everywhere curry(f, y) = x->f(x, y) 
julia> @everywhere addxy(x, y) = x + y         
julia> pmap(curry(addxy, 5), 1:5)
5-element Vector{Int64}:
  6
  7
  8
  9
 10
julia> pmap(curry(addxy, 15), 1:5)
5-element Vector{Int64}:
  16
  17
  18
  19
  20

2 Likes