ProgressMeter and @threads


#1

Trying to use ProgressMeter.jl for simple multi-threaded loops, I came up with the following. I am surprised that this works, since I usually get error messages when I try to print anything from inside a @threads for. So I ask (1) is what I am doing actually “ok” (i.e., correct, safe, …); and (2) why can ProgressMeter print from within the loop?

module MtProgress
   using ProgressMeter, Base.Threads

   function mysleep(t)
      t0 = time_ns()
      while (time_ns() - t0) * 1e-9 < t
         nothing
      end
   end

   function workerfun(n)
      mysleep(1.0)
      return rand()
   end

   function mt_progress(N)
      A = zeros(N)
      P = Progress(N)
      idx = 0
      P_lock = SpinLock()
      @threads for n = 1:N
         A[n] = workerfun(n)
         lock(P_lock)
         idx += 1
         ProgressMeter.update!(P, idx)
         unlock(P_lock)
      end
   end
end

t1 = time_ns()
MtProgress.mt_progress(30)
println("Elapsed = $((time_ns()-t1)*1e-9)s")

#3

So I ask (1) is what I am doing actually “ok” (i.e., correct, safe, …);

This seems completely fine for me, but the way to implemented it seems very suboptimal since you introduce a costly lock in each iteration just to update the ProgressMeter.

and (2) why can ProgressMeter print from within the loop?

There is not particular reason you cannot print inside an @threads, but the problem is rather that in 0.6 printing is not thread-safe. Therefore you get the error messages usually.

I had a similar problem and solved it by disassembling the @threads macro in to something like this

function tforeach(f::F, src; blocksize=20) where {F<:Function}
    i = Threads.Atomic{Int}(1)
    blocksize = min(blocksize, div(blocksize, length(solvers)))
    function threads_fun()
        tid = Threads.threadid()
        n = length(src)
        loop!(i, n, blocksize) do k
            f(tid, src[k])
        end
    end

    ccall(:jl_threading_run, Ref{Void}, (Any,), threads_fun)

    nothing
end

function loop!(f::F, i, n, blocksize) where {F<:Function}
    while true
        k = Threads.atomic_add!(i, blocksize)
        if k > n
            break
        end
        upper = min(k + blocksize, n)
        while k ≤ upper
            f(k)
            k += 1
        end
    end
end

Two things are different here than in the @threads macro. First I have a poor mans version of load balancing, i.e., the workload is not split up a priori (as in @threads) but during the run packages a distributed in in chunks of a certain size. The other thing which macs the progress meter easier to update is that I pass the current thread id
tid to the callback as well.

With this you could do

function mt_progress(N)
      A = zeros(N)
      P = Progress(N)
      tforeach(1:N) do tid, n
         A[n] = workerfun(n)
         if tid == 1 # check that we are on the main thread!
             ProgressMeter.update!(P, n)
         end
      end
      # if another thread processed the last chunk we have to tell P that we are done
      ProgressMeter.update!(P, N)
end

Print functions in a threaded loop
#4

Thank you for the very useful reply.

This seems completely fine for me, but the way to implemented it seems very suboptimal since you introduce a costly lock in each iteration just to update the ProgressMeter.

the cost of each iteration is so high in my case that I actually don’t care. But point taken nevertheless!

There is not particular reason you cannot print inside an @threads, but the problem is rather that in 0.6 printing is not thread-safe. Therefore you get the error messages usually.

Then why does the printing that ProgressMeter does not causing problems?

I had a similar problem and solved it by disassembling the @threads macro in to something like this

thank you for this! it is very useful for me since it will solve another problem I had! I love it.


#5

You mean in your example? You synchronize the printing through the lock statements and thus the ProgessMeter acts like it is single threaded.


#6

I see - so the errors only come up if I try to print or @show from multiple threads at once. Thank you for clarifying.