Run external commands in parallel

I have a computation for which the time-consuming part boils down to calling an external program, in particular with read(cmd, String) in parallel for 1-5 values of cmd.

Should I just construct the commands, and use pmap(c -> read(c, String), cmds)?

(Sorry if this is an obvious question, I am not familiar with parallel language mechanisms, and got confused with threads, asyncmap, etc).

I am not an expert, but I usually use asyncmap for this and it works fine.

A simple loop would also work:

@sync begin
   for i in 1:10
       @async println(read(`echo $i`, String))
   end
end

The docs of ?asyncmap says

Note

Currently, all tasks in Julia are executed in a single OS thread co-operatively. Consequently, asyncmap is beneficial only when the mapping function involves any I/O - disk, network, remote worker invocation, etc.

and I am not sure it if applies here as I wait for the output. I will experiment.

It depends what the task cmd does: CPU - intensive → pmap, IO intensive → asyncmap. If the task uses a resource intensively i.e (disk/network at max throughput) it may be worthwhile running the commands in a serial fashion.

1 Like

Thanks, the task is CPU intensive, but running it on multiple cores (from the command line) results in large speed gains.

AFAICT this doc refers to native julia code only. Each cmd will run in an isolated process. So as long as reading the sheer amount of output of each cmd is not the bottle neck, you should get good speed.

1 Like

The usual picture is that thread is for CPU-bound and async coroutine for I/O-bound. Here, I believe I/O means anything goes through syscall so spawning subprocesses is included, even though the work inside the subprocess is CPU-bound. So, asyncmap (or @async etc.) seems to be the right choice to me. I expect Julia to handle multiple concurrent reads since it uses libuv (which communicates with kernel to switch to the I/O that is ready automatically).

As for the pmap, I think it uses async coroutines underneath to communicate with Julia sub-/remote processes. So if you spawn a subprocesses inside pmap, I think you are adding additional syscalls and it’s likely to slow down your program (though maybe that’s negligible).

Side notes: I think I/O-bound usually means the number of concurrent works is too large compared to the number of threads that the kernel can handle, as in web services. Since a process is more expensive than a thread, if you are launching processes then maybe it’s a bit misleading to call it I/O-bound or intensive. But, as async is more stable API than threads in Julia, why not use async?

2 Likes

Do you need to exchange data between the running processes? If not, this would work:

using Distributed

@everywhere job(p) = begin
		run(`echo Process $p started`)
		sleep(4*rand())
		run(`echo Process $p finished`)
		return p
	end

function runem(howmanyprocesses)
	if nworkers() < howmanyprocesses
		addprocs(howmanyprocesses-nworkers())
	end
	results = [] # collect the remote references to be able to extract the results
	for p = workers()
		rref = @spawnat p job(p)
		push!(results, rref)
	end
	for rref = results
		if fetch(rref) <= 0
			@warn "Failure:"
			@show rref
		end
	end
	return true
end

runem(4)
2 Likes

I am also looking at running external commands in parallel. In my case, there’s no exchange of data between processes. I tried the helpful suggestion provided by PetrKryslUCSD. Running it without specifying the number of processes (without the -p option) I get the following error:
ERROR: LoadError: On worker 2:
UndefVarError: #job not defined

If I start julia with -p greater than the target number of processes (argument to runem) it works fine. Trying to understand this, I moved the job function into runem and it works fine irrespective of the -p option:

using Distributed

function runem(howmanyprocesses)
	if nworkers() < howmanyprocesses
		addprocs(howmanyprocesses-nworkers())
	end
	results = [] # collect the remote references to be able to extract the results
	for p = workers()
		rref = @spawnat p begin
                    run(`echo Process $p started`)
                    my_time = rand()
		    sleep(4*my_time)
		    run(`echo Process $p finished after $my_time`)
		    return p
	        end
		push!(results, rref)
	end
	for rref = results
		if fetch(rref) <= 0
			@warn "Failure:"
			@show rref
		end
	end
	return true
end

I’m new to Julia and am trying to get a handle on how scoping works From my naive perspective, it seems that the processes created by addprocs() in the runem function are local to the function.

1 Like

Right, I forgot about loading the function job on all the workers created with addprocs(). Sorry.