Hi All,
So I’m trying to set up a script to run in parallel on my machine and I’m having some trouble. I’m still new to doing distributed jobs in Julia, so any advice is appreciated
What I want to have happen is to include a number of scripts, a single data file, and a few variables globally for all workers to use. I then want to run through a for loop that takes an element of the data file (i.e. one row), runs a set of functions (the ones defined globally from the scripts) and then writes a file.
In a toy example I imagine it working like this:
Where each worker takes a row from the same data, runs the same function on each row, and outputs a separate file.
I have created a small reprex of this. Imagine I have a file, def_functions.jl
that has the function I want all workers to be able to use. That file looks like this:
function add_2!(a::Int8, b::Int8, c::Int8, i::Int8)
c = c + a + b
c = [c]
writedlm("./src/reprexs/data_$i.csv", c, ",")
end
In a separate file, run_in_parallel.jl
, that looks like this:
# set up cores
using Distributed
addprocs(1)
@everywhere begin
# READ IN CSV - i'll aproximate it with a vector
# what I want to have here is vec_to_pass be read in as data from the local
# machine that then is available to all workers
vec_to_pass = [1,2,3,4,5]
# jenky way to get the path to the file for the include()
path = pwd()
path = path*"/src/reprexs/def_functions.jl"
include(path)
# variables I want to be the same across the different cores
a = 4
b = 3
end
@distributed (+) for i in 1:10
add_2!(a, b, vec_to_pass[i], i)
end
But I get the following error:
ERROR: TaskFailedException
Stacktrace:
[1] wait
@ ./task.jl:322 [inlined]
[2] fetch
@ ./task.jl:337 [inlined]
[3] preduce(reducer::Function, f::Function, R::UnitRange{Int64})
@ Distributed /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/macros.jl:278
[4] top-level scope
@ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/macros.jl:289
nested task error: On worker 2:
MethodError: no method matching add_2!(::Int64, ::Int64, ::Int64, ::Int64)
Stacktrace:
[1] macro expansion
@ ./REPL[4]:2 [inlined]
[2] #5
@ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/macros.jl:291
[3] #106
@ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/process_messages.jl:278
[4] run_work_thunk
@ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/process_messages.jl:63
[5] macro expansion
@ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/process_messages.jl:278 [inlined]
[6] #105
@ ./task.jl:411
Stacktrace:
[1] remotecall_fetch(::Function, ::Distributed.Worker, ::Function, ::Vararg{Any, N} where N; kwargs::Base.Iterators.Pairs{Union{}, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
@ Distributed /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:394
[2] remotecall_fetch(::Function, ::Distributed.Worker, ::Function, ::Vararg{Any, N} where N)
@ Distributed /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:386
[3] remotecall_fetch(::Function, ::Int64, ::Function, ::Vararg{Any, N} where N; kwargs::Base.Iterators.Pairs{Union{}, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
@ Distributed /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:421
[4] remotecall_fetch
@ /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:421 [inlined]
[5] (::Distributed.var"#157#158"{typeof(+), var"#5#6", UnitRange{Int64}, Vector{UnitRange{Int64}}, Int64, Int64})()
@ Distributed /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/Distributed/src/macros.jl:274
I’m not totally clear on how to parse this error, particularly the fact that on worker 2 there is a method error. I think there’s some issue with how I’m expecting the @everywhere ...
to define things globally and how it’s actually doing so.
I’ve looked through the docs but still not sure how I should go about fixing this. Any suggestions much appreciated!!