ProgressMeter and @threads

Trying to use ProgressMeter.jl for simple multi-threaded loops, I came up with the following. I am surprised that this works, since I usually get error messages when I try to print anything from inside a @threads for. So I ask (1) is what I am doing actually “ok” (i.e., correct, safe, …); and (2) why can ProgressMeter print from within the loop?

module MtProgress
   using ProgressMeter, Base.Threads

   function mysleep(t)
      t0 = time_ns()
      while (time_ns() - t0) * 1e-9 < t
         nothing
      end
   end

   function workerfun(n)
      mysleep(1.0)
      return rand()
   end

   function mt_progress(N)
      A = zeros(N)
      P = Progress(N)
      idx = 0
      P_lock = SpinLock()
      @threads for n = 1:N
         A[n] = workerfun(n)
         lock(P_lock)
         idx += 1
         ProgressMeter.update!(P, idx)
         unlock(P_lock)
      end
   end
end

t1 = time_ns()
MtProgress.mt_progress(30)
println("Elapsed = $((time_ns()-t1)*1e-9)s")

So I ask (1) is what I am doing actually “ok” (i.e., correct, safe, …);

This seems completely fine for me, but the way to implemented it seems very suboptimal since you introduce a costly lock in each iteration just to update the ProgressMeter.

and (2) why can ProgressMeter print from within the loop?

There is not particular reason you cannot print inside an @threads, but the problem is rather that in 0.6 printing is not thread-safe. Therefore you get the error messages usually.

I had a similar problem and solved it by disassembling the @threads macro in to something like this

function tforeach(f::F, src; blocksize=20) where {F<:Function}
    i = Threads.Atomic{Int}(1)
    blocksize = min(blocksize, div(blocksize, length(solvers)))
    function threads_fun()
        tid = Threads.threadid()
        n = length(src)
        loop!(i, n, blocksize) do k
            f(tid, src[k])
        end
    end

    ccall(:jl_threading_run, Ref{Void}, (Any,), threads_fun)

    nothing
end

function loop!(f::F, i, n, blocksize) where {F<:Function}
    while true
        k = Threads.atomic_add!(i, blocksize)
        if k > n
            break
        end
        upper = min(k + blocksize, n)
        while k ≤ upper
            f(k)
            k += 1
        end
    end
end

Two things are different here than in the @threads macro. First I have a poor mans version of load balancing, i.e., the workload is not split up a priori (as in @threads) but during the run packages a distributed in in chunks of a certain size. The other thing which macs the progress meter easier to update is that I pass the current thread id
tid to the callback as well.

With this you could do

function mt_progress(N)
      A = zeros(N)
      P = Progress(N)
      tforeach(1:N) do tid, n
         A[n] = workerfun(n)
         if tid == 1 # check that we are on the main thread!
             ProgressMeter.update!(P, n)
         end
      end
      # if another thread processed the last chunk we have to tell P that we are done
      ProgressMeter.update!(P, N)
end

Thank you for the very useful reply.

This seems completely fine for me, but the way to implemented it seems very suboptimal since you introduce a costly lock in each iteration just to update the ProgressMeter.

the cost of each iteration is so high in my case that I actually don’t care. But point taken nevertheless!

There is not particular reason you cannot print inside an @threads, but the problem is rather that in 0.6 printing is not thread-safe. Therefore you get the error messages usually.

Then why does the printing that ProgressMeter does not causing problems?

I had a similar problem and solved it by disassembling the @threads macro in to something like this

thank you for this! it is very useful for me since it will solve another problem I had! I love it.

You mean in your example? You synchronize the printing through the lock statements and thus the ProgessMeter acts like it is single threaded.

I see - so the errors only come up if I try to print or @show from multiple threads at once. Thank you for clarifying.