Adding (remote) workers to ongoing Dagger job

Use case is that on LSF (and probably many other cluster systems) jobs to fire up workers are queued and started individually at the whim of the cluster scheduler. This basically means that the time between the first worker is started and the last can be long, especially when one requests many workers.

From looking at the source code of dagger, it seems like the Context holds all the processes which the scheduler can make use of. Assuming the scheduler runs locally on the ‘master’ (does it?) it seems like it should be possible to just push workers to the context when they become available, something like this:

  1. Create a dagger Context
  2. launch addprocs_lsf in an @asynch block
  3. Poll procs() and collect any new workers, then
    a. call @everywhere newworkers expresssion
    b. push! newworkers to the context from 1
    c. The first time new workers show up, start the job with the context from 1 (e.g. collect(ctx, thunks)) in another @async block
  4. ???
  5. Profit

My attempts to do this so far have unfortunately failed with the following exception (wrapped in a number of RemoteException(xx, CapturedException(RemoteException(yy,etc…)):

julia> exception.ex.captured.ex.captured.ex.captured.ex
KeyError(Dagger [d58978e5-989f-55fb-8d15-ea34adc7bf54])

julia> exception.ex.captured.ex.captured.ex.captured.processed_bt
16-element Array{Any,1}:
 (getindex at dict.jl:467 [inlined], 1)
 (root_module at loading.jl:968 [inlined], 1)
 (deserialize_module at Serialization.jl:953, 1)
 (handle_deserialize at Serialization.jl:855, 1)
 (deserialize at Serialization.jl:773, 1)
 (deserialize_datatype at Serialization.jl:1251, 1)
 (handle_deserialize at Serialization.jl:826, 1)
 (deserialize at Serialization.jl:773, 1)
 (handle_deserialize at Serialization.jl:833, 1)
 (deserialize at Serialization.jl:773 [inlined], 1)
 (deserialize_msg at messages.jl:99, 1)
 (#invokelatest#1 at essentials.jl:710 [inlined], 1)
 (invokelatest at essentials.jl:709 [inlined], 1)
 (message_handler_loop at process_messages.jl:185, 1)
 (process_tcp_streams at process_messages.jl:142, 1)
 (#99 at task.jl:356, 1)

The @everywhere expression contains using Dagger, but I have verified that the expression is completed successfully on each worker. I have also verified that the same code runs to completion when 1) waiting for all workers to be available or 2) not attempting to push new workers to the context (which obviouly only makes the first set of started workers, uhm, work).

Code for experiments
julia> using Dagger, Distributed

function lsfasync(f, n, expr)
    proctask = @async addprocs_lsf(n)
    restask = nothing

        launched = filter(!=(myid()), procs())
        oldlaunched = [myid()]
        ctx = Context(vcat(launched, myid()))
        @show launched
        while true
            if isempty(launched)
                @show istaskdone(proctask)
                (istaskdone(proctask) && !isnothing(restask)) && break
            launched = filter(p -> p ∉ oldlaunched, procs())
            @show launched
            if !isempty(launched)
                @everywhere launched $expr  
                append!(ctx.procs, map(OSProc, launched))
                if isnothing(restask)
                    #ctx = Context() #swap this line for the append! line above to make the code not crash
                    @info "Start job"
                    restask = @async f(ctx)
                launched = []
                append!(oldlaunched, launched)
        return fetch(restask)
    catch e
        return e.task.exception
        @async rmprocs(fetch(proctask))

julia> ts = delayed(vcat)([delayed(() -> (sleep(0.5); myid()))() for i in 1:10]...);

julia> ids_or_ex = LsfTest.lsfasync(10, quote 
                                    @info "setup $(myid())"
                                    using Dagger, Distributed
                                    @info "setup done" 
                                    end) do ctx
       collect(ctx, ts)

Hmmm… in my experience all the compute servers allocated to a job will be allocated at the start of a job, and this number remains fixed.
I am more experienced with the LSF and Slurm schedulers.

Ok, so maybe there is some non-typical policy at my place then. The job is submitted as a job array, but the individual jobs in the array (not sure if this the right terminology) start at different points in time.

Aha! I just posted Slurm job arrays to the other thread. But of course - async startups.

Ok, I had some time to play with this tonight and I managed to get the example above working with the following modifications:

  1. The crash is avoided by passing the context to collect_remote(chunk::Chunk) instead of creating a new context (which by default includes all procs, including ones which haven’t run the @everywhere expression to include Dagger yet) for the move. From what I can tell, the context is not used for anything though so one could perhaps just create it from myid(). The exception was actually useful for pinpointing this once I understood how to parse it (for some reason it did not print correctly in VS Code, perhaps due to me running with the Remote extension).

  2. Add functionality in Sch.compute_dag to fire new tasks when new procs appear in ps. The existing condition to fire new jobs does not happen for this example. Here is my jury-rigged start of the while-loop which looks for new procs in ps (which appear when pushed to the context in the code in the OP):

    lastps = copy(ps)
    # Loop while we still have thunks to execute
    while !isempty(state.ready) || !isempty(state.running)
        newps = filter(!in(lastps), ps)
        append!(lastps, newps)
        if !isempty(newps) 
            for p in newps
                isempty(state.ready) && break
                task = Dagger.Sch.pop_with_affinity!(ctx, state.ready, p, false)
                if task !== nothing
                    Dagger.Sch.fire_task!(ctx, task, p, state, chan, node_order)

I will run some more thorough experiements when I find some more time, perhaps tonight.