Function pmap multi-argument

Hi I would like to use pmap for this function:

function simulate(
        sim_type::SIS,
        df::DataFrame,
        intervals::Dict{Int, Pair{DateTime, DateTime}},
        user2vertex::Dict{String, Int},
        loc2he::Dict{String, Int},
        δ::Dates.Millisecond;
        Δ::Union{Int,TimePeriod,Nothing} = nothing,
        vstatus::Union{Array{Int, 1}, Nothing} = nothing,
        per_infected::Float64 = 0.2,
        c::Union{Int, Nothing} = 5,
        βd::Union{Int,Float64} = 0.2,
        βₑ::Union{Int,Float64} = 0.06,
        βᵢ::Union{Int,Float64} = 0.1,
        γₑ::Union{Int,Float64} = 0.06,
        γₐ::Union{Int,Float64} = 0.1,
        niter::Int = 10,
        output_path::Union{AbstractString, Nothing} = nothing,
        print_me::Bool = true,
        store_me::Bool = true,
        kwargs...
)

But for now I can use pmap with functions that have only one integer argument.
(like this)

        @everywhere begin
            function rand_det(n)
                det(rand(n,n))
                println("Ciao Ciao")
            end
           end

         determinants = pmap(rand_det, 1)

To start I tried to do it with two arguments:


        @everywhere begin
            function hello(x,y)
                println(x," I'm ",myid())
                println(y)
            end

        end

            X=pmap((x,y)->hello(x,y),{"Hola","Hello","Ciao"},{"1","2","3"})

But i have this error:

ERROR: LoadError: syntax: { } vector syntax is discontinued around d:\Tesi\Julia\Terza_Parte\multi_pmap.jl:27
Stacktrace:
[1] top-level scope
@ d:\Tesi\Julia\Terza_Parte\multi_pmap.jl:27

Someone can help me?

The problem is that {..} is invalid syntax for a collection. Use () or []:

X=pmap((x,y)->hello(x,y), ["Hola","Hello","Ciao"],["1","2","3"])

1 Like

I put these brackets [ ] and solved it but now I have this other error:

undefvar

Make sure you use @everywhere for any packages (i.e. @everywhere using MyPkg) or any other variables you may have. Could you paste your MWE?

1 Like

What do you mean by MWE?
I tried to put everything under “everywhere” and this is the result:

ERROR: LoadError: On worker 2:
UndefVarError: #196#197 not defined

My code:

using Distributed



ip_m2 ="192.168.5.128"
nprocess_m2 = 1

machine2 = [(ip_m2,nprocess_m2)]
addprocs(machine2; topology=:master_worker, 
            exename="/usr/local/bin/julia",
            exeflags=`--threads 4`, 
            enable_threaded_blas=true, 
            dir="/home/luigi/")

        @everywhere using Distributed



        @everywhere begin
              function hello(x,y)
                println(x," I'm ",myid())
                println(y)
            end

            hello_a=("Hola","Hello","Ciao")
            number=("1","2","3")
            X=pmap((x,y)->hello(x,y),hello_a,number)
        end

     
           
  #  rmprocs(workers())


Try this.

using Distributed
ip_m2 ="192.168.5.128"
nprocess_m2 = 1
machine2 = [(ip_m2,nprocess_m2)]
addprocs(machine2; topology=:master_worker, 
            exename="/usr/local/bin/julia",
            exeflags=`--threads 4`, 
            enable_threaded_blas=true, 
            dir="/home/luigi/")

#no need to do @everywhere using Distributed, sinnce we don't make use of this library in the workers

# define a function `hello` on all of the workers
@everywhere function hello(x,y)
    println(x," I'm ",myid())
    println(y)
end

# define these variables on the main process since they will be passed in as arguments 
hello_a=("Hola","Hello","Ciao")
number=("1","2","3")

# run pmap from main process... this will launch the defined function (here an anonymous function that further calls hello)
# X is stored on the main process (so keep in mind memory management)
X=pmap((x,y)->hello(x,y),hello_a,number)

Results:

  From worker 2:	Hola I'm 2
      From worker 2:	1
      From worker 5:	Hello I'm 5
      From worker 5:	2
      From worker 4:	Ciao I'm 4
      From worker 4:	3
1 Like

Same error:

luigi@192.168.5.128's password:
ERROR: LoadError: On worker 2:
UndefVarError: #217#218 not defined
Stacktrace:
  [1] deserialize_datatype
    @ /buildworker\worker\package_linux64\build\usr\share\julia\stdlib\v1.7\Serialization\src\Serialization.jl:1332
  [2] handle_deserialize
    @ /buildworker\worker\package_linux64\build\usr\share\julia\stdlib\v1.7\Serialization\src\Serialization.jl:854
  [3] deserialize
    @ /buildworker\worker\package_linux64\build\usr\share\julia\stdlib\v1.7\Serialization\src\Serialization.jl:801
  [4] handle_deserialize
    @ /buildworker\worker\package_linux64\build\usr\share\julia\stdlib\v1.7\Serialization\src\Serialization.jl:861
  [5] deserialize
    @ /buildworker\worker\package_linux64\build\usr\share\julia\stdlib\v1.7\Serialization\src\Serialization.jl:801 [inlined]
  [6] deserialize_msg
    @ /buildworker\worker\package_linux64\build\usr\share\julia\stdlib\v1.7\Distributed\src\messages.jl:87
  [7] #invokelatest#2
    @ .\essentials.jl:716 [inlined]
  [8] invokelatest
    @ .\essentials.jl:714 [inlined]
  [9] message_handler_loop
    @ /buildworker\worker\package_linux64\build\usr\share\julia\stdlib\v1.7\Distributed\src\process_messages.jl:169
 [10] process_tcp_streams
    @ /buildworker\worker\package_linux64\build\usr\share\julia\stdlib\v1.7\Distributed\src\process_messages.jl:126
 [11] #99
    @ .\task.jl:423
Stacktrace:
  [1] (::Base.var"#837#839")(x::Task)
    @ Base .\asyncmap.jl:177
  [2] foreach(f::Base.var"#837#839", itr::Vector{Any})
    @ Base .\abstractarray.jl:2141
  [3] maptwice(wrapped_f::Function, chnl::Channel{Any}, worker_tasks::Vector{Any}, c::Base.Iterators.Zip{Tuple{Tuple{String, String, String}, Tuple{String, String, String}}})
    @ Base .\asyncmap.jl:177
  [4] wrap_n_exec_twice
    @ .\asyncmap.jl:153 [inlined]
  [5] #async_usemap#822
    @ .\asyncmap.jl:103 [inlined]
  [6] #asyncmap#821
    @ .\asyncmap.jl:81 [inlined]
  [7] pmap(f::Function, p::WorkerPool, c::Base.Iterators.Zip{Tuple{Tuple{String, String, String}, Tuple{String, String, String}}}; distributed::Bool, batch_size::Int64, on_error::Nothing, retry_delays::Vector{Any}, retry_check::Nothing)
    @ Distributed C:\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.6\Distributed\src\pmap.jl:126
  [8] pmap(f::Function, p::WorkerPool, c::Base.Iterators.Zip{Tuple{Tuple{String, String, String}, Tuple{String, String, String}}})
    @ Distributed C:\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.6\Distributed\src\pmap.jl:101
  [9] pmap(f::Function, c::Base.Iterators.Zip{Tuple{Tuple{String, String, String}, Tuple{String, String, String}}}; kwargs::Base.Iterators.Pairs{Union{}, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
    @ Distributed C:\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.6\Distributed\src\pmap.jl:156
 [10] pmap
    @ C:\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.6\Distributed\src\pmap.jl:156 [inlined]
 [11] pmap(f::Function, c1::Tuple{String, String, String}, c::Tuple{String, String, String}; kwargs::Base.Iterators.Pairs{Union{}, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
    @ Distributed C:\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.6\Distributed\src\pmap.jl:157
 [12] pmap(f::Function, c1::Tuple{String, String, String}, c::Tuple{String, String, String})
    @ Distributed C:\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.6\Distributed\src\pmap.jl:157
 [13] top-level scope
    @ d:\Tesi\Julia\Terza_Parte\multi_pmap_c.jl:25
in expression starting at d:\Tesi\Julia\Terza_Parte\multi_pmap_c.jl:25

Do you have to enter a password when sshing into your machine? In general, the library is designed to work with ssh that do not require passwords (e.g. in cluster environments). Maybe this part of the documentation is useful for you:

  • If you specify multiplex=true as an option to addprocs , SSH multiplexing is used to create a tunnel between the master and workers. If you have configured SSH multiplexing on your own and the connection has already been established, SSH multiplexing is used regardless of multiplex option. If multiplexing is enabled, forwarding is set by using the existing connection ( -O forward option in ssh). This is beneficial if your servers require password authentication; you can avoid authentication in Julia by logging in to the server ahead of addprocs . The control socket will be located at ~/.ssh/julia-%r@%h:%p during the session unless the existing multiplexing connection is used. Note that bandwidth may be limited if you create multiple processes on a node and enable multiplexing, because in that case processes share a single multiplexing TCP connection.

Mainly the part here:

This is beneficial if your servers require password authentication; you can avoid authentication in Julia by logging in to the server ahead of [ addprocs ]

Basically, try logging into your ssh using an other terminal instance with options -O forward and then launch your script, perhaps that works.

All of this assuming you need a password to your ssh server.

1 Like

I am using a windows machine that connects with addprocs to a linux vm on my computer.
So should I ssh connect to my virtual machine with windows terminal and then run my script separately with addprocs?
How do you use the keyword “-O forword” in the ssh command?

Do you require a password when you do this? Set it up so that you don’t need a password when you login to the VM.

If you can’t turn it into a password-less ssh connection, then trying the multiplexing technique. Basically, open up a new tab in Windows Terminal, and login into your VM by

ssh luigi@192.168.5.128 -O forward

(and enter your password when prompted).

I think best way to do this is to remove the password requirement its only a local VM on your local machine. Some other alternatives are

  • Running your julia script on the linux VM itself, so you don’t need to deal with ssh and machine files, you can simply do addprocs(4) which will add 4 worker processes. If you are using vscode, you can use the remote ssh extension to make your workflow easier.

I switched to the “remotecall_fetch” function and was fine with it.(in my case)