EnsembleDistributed Progress bar

I’m running several thousand ODE simulations that take ~30s each, and I’d like to be able to track the progress. I haven’t found any method for including a progress bar within EnsembleDistributed(), without resorting to some manual method, i.e. not using EnsembleProblem. See the MWE below:

using ProgressMeter
using Distributed

addprocs(6)
@everywhere using DifferentialEquations

n_trajectories = 100
progress = Progress(n_trajectories)
const channel = RemoteChannel(()->Channel{Int}(n_trajectories))

@everywhere function prob_func(prob,i,repeat)
    remake(prob,u0=rand()*prob.u0)
end

@everywhere function f(u,p,t)
    sleep(0.01)
    1.01u
end

prob = ODEProblem(f,0.5,(0.0,1.0))

@sync begin
    @async begin
	    tasksdone = 0
        while tasksdone < n_trajectories
	        tasksdone += take!(channel)
	        update!(progress, tasksdone)
        end
    end
    @async begin
        @distributed for i =1:n_trajectories
            sol = solve(prob_func(prob, i, false), Tsit5())
            put!(channel, 1)
        end
    end
end

I’d rather use the EnsembleProblem interface, if possible. Is there a method I’ve missed to incorporate progress bars with EnsembleDistributed()?
I attempted to use put!(channel, 1) within output_func with EnsembleProblem, and starting two async tasks like the MWE above, but the issue was letting the workers know what RemoteChannel channel to use:

using ProgressMeter
using Distributed

addprocs(6)
@everywhere using DifferentialEquations

n_trajectories = 100
progress = Progress(n_trajectories)
const channel = RemoteChannel(()->Channel{Int}(n_trajectories))

@everywhere function prob_func(prob,i,repeat)
    remake(prob,u0=rand()*prob.u0)
end

@everywhere function output_func(sol, i)
    put!(channel, 1)
    sol, false
end

@everywhere function f(u,p,t)
    sleep(0.01)
    1.01u
end

prob = ODEProblem(f,0.5,(0.0,1.0))
ensembleproblem = EnsembleProblem(prob, prob_func = prob_func, output_func = output_func)

@sync begin
    @async begin
        tasksdone = 0
        while tasksdone < n_trajectories
	        tasksdone += take!(channel)
	        update!(progress, tasksdone)
        end
    end
    @async begin
        sim = solve(ensembleproblem, Tsit5(), EnsembleDistributed(), trajectories=n_trajectories)
    end
end

but this obviously fails because the workers don’t know what the RemoteChannel is, i.e. channel is not defined.

Can you open an issue? It would be good to integrate this into the library itself.

1 Like

I’ve opened an issue.

1 Like

It turns out my assumption that I couldn’t just use @everywhere const channel = $channel was wrong, and this will properly create a synced channel between workers. I assumed this would just create a separate channel on each worker.

This still requires some of the boilerplate around it, but at least allows me to show the progress while solving an EnsembleProblem

Full solution:

using ProgressMeter
using Distributed

addprocs(6)
@everywhere using DifferentialEquations

n_trajectories = 100
progress = Progress(n_trajectories)
const channel = RemoteChannel(()->Channel{Int}(n_trajectories))
@everywhere const channel = $channel

@everywhere function prob_func(prob,i,repeat)
    remake(prob,u0=rand()*prob.u0)
end

@everywhere function output_func(sol, i)
    put!(channel, 1)
    sol, false
end

@everywhere function f(u,p,t)
    sleep(0.01)
    1.01u
end

prob = ODEProblem(f,0.5,(0.0,1.0))
ensembleproblem = EnsembleProblem(prob, prob_func = prob_func, output_func = output_func)

@sync begin
    @async begin
        tasksdone = 0
        while tasksdone < n_trajectories
	        tasksdone += take!(channel)
	        update!(progress, tasksdone)
        end
    end
    @async begin
        sim = solve(ensembleproblem, Tsit5(), EnsembleDistributed(), trajectories=n_trajectories)
    end
end

The sleep is just there to slow down the solution, otherwise it would solve immediately and there isn’t a nice progress bar progression.

1 Like

Nice! Yeah, we can probably take the time to bake this into the library itself.

1 Like