Blend pmap with dynamic addprocs() rmprocs() on shared High Performance Computing Cluster

I have a “trivially parallel” computation to run. The function is loosely like this:

function process_one_iteration(i::Int64, input_directory::String)
    x = read_input_from_directory(i, input_directory::String) 
        #E.g. reads input_directory/Input-1, Input-2, etc.
    y = do_something_intensive_to(x) #Takes about 10 minutes per core
    return y
end

Right now I handle this by doing

addprocs(100) #say 100 or so
all_inputs  = collect(1:1000) #or so
all_results = pmap( i -> process_one_iteration(i, input_directory), all_inputs)

This works fine and in the above example (1000 inputs, 100 cores, 10 minutes per input), takes approximately 10 * 10 = 100 minutes. Fine.

But this limits me to using exactly n in addprocs(n) cores, when it’s possible the HPC has additional resources available. What I would like to do is change pmap to a function like this:

function pmap_dynamically(...)
    Start with workers already allocated
    Kick off all available jobs to those workers
    Once a worker finishes, *don't* send it a new job right away. Rather,
        Check the resources on the cluster
        If there are sufficient unused resources,
            addprocs(a reasonable # of workers)
        If resources are running very tight,
            rmprocs(the recently finished worker, and/or a reasonable # of other idle workers)
        End
    *Then* resume allocating jobs to workers as per usual (until one finishes and this scaling repeats).
end

I have already written the functions that check resources, so this part is fine.

What I can’t quite figure out is how to make pmap play nice with this workflow. I looked at the source code for pmap and asyncmap but am stymied by my ignorance of what this is really doing. It seems though that it is assuming a constant resource pool…?

Since the per-iteration function takes about 10 minutes, this resource pool could be adjusted relatively safely (the code is not going to try to add / rm procs multiple times per minute, let alone second).

Is this doable?

This does not seem to be the intended way to work with distributed, but I suppose that doesn’t stop a man from trying.

Now, it was a while since I tried it so it might no longer work, but creating an own WorkerPool and pushing new workers to it as they become available was afaik enough for them to picked up by the scheduler during an ongoing pmap using that workerpool. Don’t forget to set them up first.

I haven’t tried removing them and that might be a bit trickier, but I would try to 1) remove them from the pool and 2) have some kind of callback for when their current task is completed (and you probably need something to tell when they are started too). If jobs take some 10 minutes I guess the overhead from the callback might be insignificant.

1 Like

Interesting; thanks for the suggestion, which I will try.

To the first point (not intended use)–I agree…the interesting thing here is that this particular cluster has certain broadly (though not precisely) predictable fluctuations in resource availability, so e.g. on weekdays from 9-5, there are almost no cores available, while on weekends there are over a hundred. So it’s greedy (and infeasible) to allocate 100+ cores and just wait for the jobs to finish, because something kicked off Friday night might not be done by Monday morning. Meanwhile it’d take much too long to allocate 10 cores and run them for weeks, skipping the benefits of 2-ish days when 100+ cores are available.

(I suppose the thought would be I should rewrite the driver script to defer to the cluster’s already available resource managers; the challenge here (predictably) is that the script does some processing on the result of a first pmap call before deciding what parameters to fire off to the next call, etc.)

Can you please explain to me what you mean regarding callback/tell when they started? I.e. when the job finishes, it calls the “maybe remove myself from the global WorkerPool” function–why does this need to know when the process was started?

the interesting thing here is that this particular cluster has certain broadly (though not precisely) predictable fluctuations

Similar situation for me. If I ask for 100 workers, I might get them all at once, or perhaps the scheduler says 37 is my lucky number and I will not get anything more (until maybe the weekend or next month when the load decreases).

Fwiw, the LsfClusterManager I happen to be using makes a request for 100 slots which are scheduled individually. This allows for asking to the full 100 and then just waiting in an async task for new workers to pop up. If it is similar for your cluster (i.e workers come online one-by-one) you could probably skip the resource availability logic and just make a large request.

Anyways, for the callback: Note that pmap doesn’t return anything until is has gone through all the all_inputs jobs, so returning “I’m ready” from process_one_iteration is not likely to help you. Even the “maybe remove myself from the global WorkerPool” function most likely needs to be implemented as a callback.

It might work to try to remove it from the workerpool in the way you describe, but I’m not sure you have full control over the sequence of events at this point. Perhaps the scheduler gets the notification that the worker is done first and schedules something new on it, and then when you shut down the process (through rmprocs) your program will crash.

Safer option might be to remove it from the pool when it is running, thereby securing that it will not be rescheduled.

The reason I think (upon not so careful analysis mind you :slight_smile: ) that you need also a callback for when the worker is started is simply that otherwise if you happen to remove it from the pool before anything is scheduled on it you will never get the callback that it is done.

1 Like

Hi, you might find the epmap method in the Schedulers.jl package fits your description. It’s goal is to allow for a dynamic/elastic cluster while doing the parallel map.

2 Likes

This definitely looks interesting. I’ll give it a whirl.

1 Like

So I edited around line 363 https://github.com/ChevronETC/Schedulers.jl/blob/master/src/Schedulers.jl to add a condition so that it doesn’t downsize workers whenever possible, but rather calls out to a function that I define to tell it whether it should try to downsize. Tested on my local machine and it seems to be working, which judging by history means it will probably still fail on the actual cluster, but documenting here for posterity.

#In my driver script
@everywhere global function should_reduce_workers()
    ...        
end

#In Schedulers.jl
if Main.should_reduce_workers()
    if _nworkers > _epmap_minworkers
        rmprocs(pid)
    end
end

Edit: I think it suffice to write a function target_num_workers and pass this same function as both kwargs epmap_minworkers and epmap_maxworkers. This way the loop downsizes to the target number, or upsizes as appropriate, and the target number can be set dynamically. I still need to test on the cluster.

Hi Philip,

Thanks so much for giving it a try. Yes, you should be able to use the epmap_minworkers and epmap_maxworkers to accomplish what you want. In particular you could do something like:

nworkers = 10

r = @async epmap(myfunction, 1:100; epmap_maxworkers=()->nworkers, epmap_minworkers=()->nworkers)

sleep(10)
nworkers = 5

sleep(10)
nworkers = 20

fetch(r)

In fact you can see an example in the unit tests here. However that tests a growing cluster rather than a shrinking cluster, so please let me know if a shrinking cluster causes any trouble.

Thanks!