Use case is that on LSF (and probably many other cluster systems) jobs to fire up workers are queued and started individually at the whim of the cluster scheduler. This basically means that the time between the first worker is started and the last can be long, especially when one requests many workers.
From looking at the source code of dagger, it seems like the Context
holds all the processes which the scheduler can make use of. Assuming the scheduler runs locally on the ‘master’ (does it?) it seems like it should be possible to just push workers to the context when they become available, something like this:
- Create a dagger Context
- launch
addprocs_lsf
in an@asynch
block - Poll
procs()
and collect any new workers, then
a. call@everywhere newworkers expresssion
b. push! newworkers to the context from 1
c. The first time new workers show up, start the job with the context from 1 (e.g.collect(ctx, thunks)
) in another@async
block - ???
- Profit
My attempts to do this so far have unfortunately failed with the following exception (wrapped in a number of RemoteException(xx, CapturedException(RemoteException(yy,etc…)):
julia> exception.ex.captured.ex.captured.ex.captured.ex
KeyError(Dagger [d58978e5-989f-55fb-8d15-ea34adc7bf54])
julia> exception.ex.captured.ex.captured.ex.captured.processed_bt
16-element Array{Any,1}:
(getindex at dict.jl:467 [inlined], 1)
(root_module at loading.jl:968 [inlined], 1)
(deserialize_module at Serialization.jl:953, 1)
(handle_deserialize at Serialization.jl:855, 1)
(deserialize at Serialization.jl:773, 1)
(deserialize_datatype at Serialization.jl:1251, 1)
(handle_deserialize at Serialization.jl:826, 1)
(deserialize at Serialization.jl:773, 1)
(handle_deserialize at Serialization.jl:833, 1)
(deserialize at Serialization.jl:773 [inlined], 1)
(deserialize_msg at messages.jl:99, 1)
(#invokelatest#1 at essentials.jl:710 [inlined], 1)
(invokelatest at essentials.jl:709 [inlined], 1)
(message_handler_loop at process_messages.jl:185, 1)
(process_tcp_streams at process_messages.jl:142, 1)
(#99 at task.jl:356, 1)
The @everywhere
expression contains using Dagger
, but I have verified that the expression is completed successfully on each worker. I have also verified that the same code runs to completion when 1) waiting for all workers to be available or 2) not attempting to push new workers to the context (which obviouly only makes the first set of started workers, uhm, work).
Code for experiments
julia> using Dagger, Distributed
function lsfasync(f, n, expr)
proctask = @async addprocs_lsf(n)
restask = nothing
try
launched = filter(!=(myid()), procs())
oldlaunched = [myid()]
ctx = Context(vcat(launched, myid()))
@show launched
while true
if isempty(launched)
@show istaskdone(proctask)
(istaskdone(proctask) && !isnothing(restask)) && break
sleep(1)
end
launched = filter(p -> p ∉ oldlaunched, procs())
@show launched
if !isempty(launched)
@everywhere launched $expr
append!(ctx.procs, map(OSProc, launched))
if isnothing(restask)
#ctx = Context() #swap this line for the append! line above to make the code not crash
@info "Start job"
restask = @async f(ctx)
end
launched = []
append!(oldlaunched, launched)
end
end
return fetch(restask)
catch e
return e.task.exception
finally
@async rmprocs(fetch(proctask))
end
end;
julia> ts = delayed(vcat)([delayed(() -> (sleep(0.5); myid()))() for i in 1:10]...);
julia> ids_or_ex = LsfTest.lsfasync(10, quote
@info "setup $(myid())"
using Dagger, Distributed
@info "setup done"
end) do ctx
collect(ctx, ts)
end;