ProgressMeter and @threads

So I ask (1) is what I am doing actually “ok” (i.e., correct, safe, …);

This seems completely fine for me, but the way to implemented it seems very suboptimal since you introduce a costly lock in each iteration just to update the ProgressMeter.

and (2) why can ProgressMeter print from within the loop?

There is not particular reason you cannot print inside an @threads, but the problem is rather that in 0.6 printing is not thread-safe. Therefore you get the error messages usually.

I had a similar problem and solved it by disassembling the @threads macro in to something like this

function tforeach(f::F, src; blocksize=20) where {F<:Function}
    i = Threads.Atomic{Int}(1)
    blocksize = min(blocksize, div(blocksize, length(solvers)))
    function threads_fun()
        tid = Threads.threadid()
        n = length(src)
        loop!(i, n, blocksize) do k
            f(tid, src[k])
        end
    end

    ccall(:jl_threading_run, Ref{Void}, (Any,), threads_fun)

    nothing
end

function loop!(f::F, i, n, blocksize) where {F<:Function}
    while true
        k = Threads.atomic_add!(i, blocksize)
        if k > n
            break
        end
        upper = min(k + blocksize, n)
        while k ≤ upper
            f(k)
            k += 1
        end
    end
end

Two things are different here than in the @threads macro. First I have a poor mans version of load balancing, i.e., the workload is not split up a priori (as in @threads) but during the run packages a distributed in in chunks of a certain size. The other thing which macs the progress meter easier to update is that I pass the current thread id
tid to the callback as well.

With this you could do

function mt_progress(N)
      A = zeros(N)
      P = Progress(N)
      tforeach(1:N) do tid, n
         A[n] = workerfun(n)
         if tid == 1 # check that we are on the main thread!
             ProgressMeter.update!(P, n)
         end
      end
      # if another thread processed the last chunk we have to tell P that we are done
      ProgressMeter.update!(P, N)
end
1 Like