How can I call pmap(f,x)
where f
is a function that takes additional parameters?
I have a function f(x,y)
where y
is always known and I want to run pmap
on elements of x
.
Here is an example that works (without distributed):
infil> map(mean,collect(eachrow(SharedArray(rand(1:10,10,10)))))
10-element Vector{Float64}:
4.2
4.9
6.9
6.5
5.0
5.3
5.9
6.1
5.4
4.4
But the distributed version has errors:
infil> pmap(x->x.>1,collect(eachrow(SharedArray(rand(1:10,10,10)))))
ERROR: On worker 34:
UndefVarError: #340#341 not defined
Stacktrace:
[1] deserialize_datatype
@ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Serialization/src/Serialization.jl:1280
[2] handle_deserialize
@ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Serialization/src/Serialization.jl:827
[3] deserialize
@ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Serialization/src/Serialization.jl:774
[4] handle_deserialize
@ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Serialization/src/Serialization.jl:834
[5] deserialize
@ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Serialization/src/Serialization.jl:774 [inlined]
[6] deserialize_msg
@ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/messages.jl:87
[7] #invokelatest#2
@ ./essentials.jl:708 [inlined]
[8] invokelatest
@ ./essentials.jl:706 [inlined]
[9] message_handler_loop
@ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/process_messages.jl:169
[10] process_tcp_streams
@ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/process_messages.jl:126
[11] #99
@ ./task.jl:406
Stacktrace:
[1] (::Base.var"#837#839")(x::Task)
@ Base ./asyncmap.jl:177
[2] foreach(f::Base.var"#837#839", itr::Vector{Any})
@ Base ./abstractarray.jl:2141
[3] maptwice(wrapped_f::Function, chnl::Channel{Any}, worker_tasks::Vector{Any}, c::Vector{SubArray{Int64, 1, SharedArrays.SharedMatrix{Int64}, Tuple{Int64, Base.Slice{Base.OneTo{Int64}}}, true}})
@ Base ./asyncmap.jl:177
[4] wrap_n_exec_twice
@ ./asyncmap.jl:153 [inlined]
[5] #async_usemap#822
@ ./asyncmap.jl:103 [inlined]
[6] #asyncmap#821
@ ./asyncmap.jl:81 [inlined]
[7] pmap(f::Function, p::WorkerPool, c::Vector{SubArray{Int64, 1, SharedArrays.SharedMatrix{Int64}, Tuple{Int64, Base.Slice{Base.OneTo{Int64}}}, true}}; distributed::Bool, batch_size::Int64, on_error::Nothing, retry_delays::Vector{Any}, retry_check::Nothing)
@ Distributed /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/pmap.jl:126
[8] pmap(f::Function, p::WorkerPool, c::Vector{SubArray{Int64, 1, SharedArrays.SharedMatrix{Int64}, Tuple{Int64, Base.Slice{Base.OneTo{Int64}}}, true}})
@ Distributed /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/pmap.jl:101
[9] pmap(f::Function, c::Vector{SubArray{Int64, 1, SharedArrays.SharedMatrix{Int64}, Tuple{Int64, Base.Slice{Base.OneTo{Int64}}}, true}}; kwargs::Base.Iterators.Pairs{Union{}, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
@ Distributed /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/pmap.jl:156
[10] pmap(f::Function, c::Vector{SubArray{Int64, 1, SharedArrays.SharedMatrix{Int64}, Tuple{Int64, Base.Slice{Base.OneTo{Int64}}}, true}})
@ Distributed /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/pmap.jl:156
[11] top-level scope
@ none:1
Note that when f
takes a single argument, this problem does not arise:
infil> pmap(mean,collect(eachrow(SharedArray(rand(1:10,10,10)))))
10-element Vector{Float64}:
5.7
5.2
5.1
4.7
6.2
4.4
6.8
5.7
4.3
5.3