I understand that remotecall_fetch
will return the error. However, the function that is going to be executed on the remote process will never return and will always be waiting for tasks from the master process:
mutable struct ParallelFunction
jobs::RemoteChannel
results::RemoteChannel
nreceived::Int64
ntaken::Int64
end
function do_work(f, jobs, results, args...)
while true
x = take!(jobs)
result = f(x, args...)
put!(results, (x, result))
end
end
function ParallelFunction(f, args...; len=128)
jobs = RemoteChannel(()->Channel{Any}(len))
results = RemoteChannel(()->Channel{Any}(len))
for p in workers()
remote_do(do_work, p, f, jobs, results, args...)
end
return ParallelFunction(jobs, results, 0, 0)
end
function give!(pf::ParallelFunction, x)
put!(pf.jobs, x)
pf.nreceived += 1
end
function get!(pf::ParallelFunction)
result = take!(pf.results)
if result[2] isa ErrorException
throw(result[2])
end
pf.ntaken += 1
return result
end
Suppose a function f
has four arguments x
, a
, b
and c
. The purpose is to evaluate the function f
at several different x
s with fixed argument a
, b
and c
.
A ParallelFunction
can be constructed as
pf = ParallelFunction(f, a, b, c; len=128)
To evaluate the function f
at x1
, x2
and x3
, the following statement can be used
give!(pf, x1)
give!(pf, x2)
give!(pf, x3)
The function will be evaluated in parallel on any possible processes. To obtain the result:
for i in 1:3
println(get!(pf))
end
get!(pf)
returns a tuple, the first element of the tuple is the argument x
and the second argument is the corresponding result.
However, it is possible that the user forget to define the function f
on the remote process. In this case, the get!(pf)
will hang indefinitely, which is not a desirable result. I want to throw an error if this is the case.