Progressbar for multiple workers -- connecting workers to RemoteChannel

I am trying to expand the example given on the ProgressMeter.jl documentation to get a progress bar for a few slow processes that run in parallel. Since there are only few processes, but each of them take a long time I want to get updates not only when a process finishes, but for every iteration within the process. Here is my attempt so far:

using ProgressMeter
using Distributed

# addprocs()
const channel = RemoteChannel(()->Channel{Bool}(55))


@everywhere function myslowfunc(n)
    # println("myslowfunc called with $(n) on $(myid())")
    for _ in 1:n
		sleep(0.1)
		put!(channel, true)
    end
    # println("myslowfunc finished on $(myid())")
    return 2n
end

p = Progress(55)
@sync begin
    # this task prints the progress bar
    @async while take!(channel)
        next!(p)
    end

    # this task does the computation
    @async begin
	data = @distributed (vcat) for i in 1:10
	    [(myid(), i, myslowfunc(i))]
        end
        put!(channel, false) # this tells the printing task to finish
	println(data)
    end
end

Above code works as expected. But if I uncomment addprocs() and the two debug messages
within myslowfunc I get the output

      From worker 8:    myslowfunc called with 9 on 8
      From worker 6:    myslowfunc called with 7 on 6
      From worker 2:    myslowfunc called with 1 on 2
      From worker 9:    myslowfunc called with 10 on 9
      From worker 5:    myslowfunc called with 6 on 5
      From worker 3:    myslowfunc called with 3 on 3
      From worker 7:    myslowfunc called with 8 on 7
      From worker 4:    myslowfunc called with 5 on 4

and then the program hangs. I assume that the problem is, that the other workers (except) for worker 1 don’t know about channel and hence the put!(channel, true) part fails.

How do I connect the other workers to channel or how do I make it available for all workers in myslowfunc?

The package ProgressMeter.jl can be used to get a progress bar for parallel computation with pmap. I currently run long-running parallel code by

    p = Progress(nsims, barglyphs=BarGlyphs("[=> ]"))
    bl = progress_map(1:nsims, progress = p, mapfun = pmap) do x
        run_long_function()
    end

which updates the progress bar everytime run_long_function is finished. However, if I am understanding you correctly, you are looking for more granular progress per simulation. For example, if you have 10 simulations with 10 time steps each, you want a progress bar with 100 units and have each simulation update the progress bar. I was not able to figure out how to do this using the ProgressMeter pacakge and pmap, but I did use for a number of years the older package PmapProgressMeter.jl that allowed for this functionality directly. This package is not updated anymore, and a lot of functionality was moved to ProgressMeter

It seems you are on the right path to use remote channels to update the progress bar. Maybe the source code of that package can help you solve your other problem.

Yes, the more granular progress bar is exactly what I am looking for, since I have only O(10) long running functions, but each takes a couple thousand iterations.

Looking into the sources of PmapProgressMeter.jl sounds like a good suggestion!

Your remote workers don’t know about your variable channel. Just put it as an argument into the function definition:

@everywhere function myslowfunc(n, ch)
....
		put!(ch, true)
....
end

and call your function with it:

....
	        [(myid(), i, myslowfunc(i, channel))]
....
1 Like

Yes, that solved it. Thanks!

Hi, could you please post a MWE of your solution? I would love to have this feature for my own simulations.

Sure, here you go:

using ProgressMeter
using Distributed

addprocs()
const channel = RemoteChannel(()->Channel{Int}(55))


@everywhere function myslowfunc(n, channel)
    for _ in 1:n
        sleep(0.1)
	    put!(channel, 1)
    end
    return 2n
end

totaltasks = 55
p = Progress(totaltasks)
@sync begin
    # this task prints the progress bar
    @async begin
	    tasksdone = 0
        while tasksdone < totaltasks
	        tasksdone += take!(channel)
	        update!(p, tasksdone)
        end
    end

    # this task does the computation
    @async begin
        data = @distributed (vcat) for i in 1:10
	        [(myid(), i, myslowfunc(i, channel))]
        end
	println(data)
    end
end

Note that I also replaced the while take!(channel) with a while tasksdone < totaltasks and used a RemoteChannel{Int} instead of a RemoteChannel{Bool}. This is because in my real use case the inner loops in myslowfunc do more than just one task. My original code with myslowfunc(n, channel) and [(myid(), i, myslowfunc(i, channel))] also works.

1 Like