Question on multithreading/vectorizing loops

Consider a classic loop parallelization scheme: the outer loop multithreads, and the inner loop vectorizes:

    tmp = [Vector{T}(undef, length(B)) for _ in 1:Threads.nthreads()]

    @inbounds Threads.@threads for ii in eachindex(A)
        @inbounds @simd for jj in eachindex(B)
            tmp[Threads.threadid()][jj] = fun(A[ii], B[jj])
        end
        C[ii] = sum(tmp[Threads.threadid()])
    end

I have the following questions:

  1. What is the scope of @inbounds and similar marcos? i.e., is the 2nd @inbounds here redundant? Does it extends into the called function fun and sum?
  2. Is there a way to conditionally use (or not use) multithreading at runtime (essentially the IF directive in OpenMP)? when A is small, the overhead may not be worth it.
  3. According to the manual, Threads.threadid() may change even within a single iteration, so the above code is actually not thread safe. What will be the proper way to create private scratch spaces like tmp?

Thanks

1 Like

One way to solve 2 and 3 is to split your work manually in chunks, and run with 1 chunk if the parallelization is not effective.

With this small package: GitHub - m3g/ChunkSplitters.jl: Simple chunk splitters for parallel loop executions, doing that is simple as rewriting the loop as:

    nchunks = Threads.nthreads() # or any other number

    tmp = [Vector{T}(undef, length(B)) for _ in 1:nchuncks]

    Threads.@threads for (iirange, ichunk) in chunks(eachindex(A), nchunks)
        for ii in iirange
            @inbounds @simd for jj in eachindex(B)
                tmp[ichunk][jj] = fun(A[ii], B[jj])
            end
            C[ii] = sum(tmp[ichunk])
        end
    end

Note that tmp has nchunks length and that it is updated by the ichunk index, which is not dependent on treadid(). Then the indices of ii to be run where automatically split into ranges of equal size, contained in the irange ranges.

Or you can go higher level and use Floops.jl, for example. In

3 Likes

Thanks!

I have tried doing the similar manually, but that resulted in many allocation and slow speeds.

it turns out that you cannot do for ii = ((chunkidx-1)*chunksz+firstindex(A)):min(chunkidx * chunksz + firstindex(A) - 1, lastindex(A)), the range has to be calculated beforehand.

1 Like

I’m not sure exactly what you mean there. Can you show a more complete example? Are you nesting @threads loops?

Chunking with variable ranges should work, in principle:

julia> nchunks = 4;

julia> x = rand(10);

julia> k = 2
2

julia> Threads.@threads for (irange, ichunk) in chunks(firstindex(x)+k:lastindex(x)-1, nchunks)
           for i in irange
               @show ichunk, i
           end
       end
(ichunk, i) = (1, 1)
(ichunk, i) = (3, 5)
(ichunk, i) = (4, 7)
(ichunk, i) = (3, 6)
(ichunk, i) = (1, 2)
(ichunk, i) = (2, 3)
(ichunk, i) = (2, 4)

edit: I see what you probably mean. The issue is that when on use a range, for example:

r = 2:2:5

the indexes of the array are not the values of the array:

julia> r = 2:2:5
2:2:4

julia> collect(eachindex(r))
2-element Vector{Int64}:
 1
 2

and that can definitely cause some confusion when trying to pass a slice of an original array to be chunk, because one could expect that the indexes of the original array where retained. An alternative is to wrap the range in an OffsetArray, but is arguably cumbersome as well:

julia> Threads.@threads for (irange, ichunk) in 
               chunks(OffsetArray(firstindex(x)+k:lastindex(x)-1,firstindex(x)+k-1), nchunks)
           for i in irange
               @show ichunk, i
           end
       end
(ichunk, i) = (2, 5)
(ichunk, i) = (3, 7)
(ichunk, i) = (1, 3)
(ichunk, i) = (4, 9)
(ichunk, i) = (2, 6)
(ichunk, i) = (3, 8)
(ichunk, i) = (1, 4)

and afaik it won’t work if the step length is not 1.

An alternative to this is to use a Channel of temporary vectors, which ensures that there’s no race condition, as the threadid is not accessed at all.

julia> A = [1,2,3,4]; B = copy(A); C = copy(A);

julia> buf = [similar(A) for i in 1:Threads.nthreads()];

julia> ch = Channel{eltype(buf)}(length(buf));

julia> for x in buf
           put!(ch, x)
       end

julia> fun(x,y) = x + y;

julia> Threads.@threads for ii in eachindex(A)
           tmp = take!(ch)
           try
               for jj in eachindex(tmp, B)
                   tmp[jj] = fun(A[ii], B[jj])
               end
               C[ii] = sum(tmp)
           finally
               put!(ch, tmp)
           end
       end

julia> C
4-element Vector{Int64}:
 14
 18
 22
 26
3 Likes

Sorry I wasn’t being very clear. The first version I tried is:

n1 = length(A)
n2 = length(B)
nchunk = ifelse(n1 * n2 > multithread_threshold, Threads.nthreads(), 1)
chunksz = cld(n1, nchunk)

tmp = [Vector{T}(undef, n2) for _ in 1:nchunk]

Threads.@threads for chunkidx = 1:nchunk
    @inbounds for ii = ((chunkidx-1)*chunksz+firstindex(A)):min(chunkidx * chunksz + firstindex(A) - 1, lastindex(A))
        @inbounds @simd for jj in eachindex(B)
            tmp[chunkidx][jj] = fun(A[ii], B[jj])
        end
        C[ii] = sum(tmp[chunkidx])
    end
end

This has lots of allocations and is slow.

what I currently have is

n1 = length(A)
n2 = length(B)
nchunk = ifelse(n1 * n2 > multithread_threshold, Threads.nthreads(), 1)
chunksz = cld(n1, nchunk)
chunkstart = (0:(nchunk-1)) .* chunksz .+ firstindex(A)
chunkrange = map(x -> x[1]:(x[1]+chunksz-1), chunkstart)
chunkrange[end] = chunkrange[end][1]:lastindex(A)

tmp = [Vector{T}(undef, n2) for _ in 1:nchunk]

@inbounds Threads.@threads for current_chunkidx in eachindex(chunkrange)
    @inbounds for ii in chunkrange[current_chunkidx]
        @inbounds @simd for jj in eachindex(B)
            tmp[current_chunkidx][jj] = fun(A[ii], B[jj])
        end
        C[ii] = sum(tmp[current_chunkidx])
    end
end

Which is only a little slower than the thread-unsafe (OP) version.

Apparently (for the first version) the compiler did not unroll the outmost loop of the and calculates the range for ii of each task/chunk. Although how it makes millions of allocations (when n1==n2=1000) is still a mystery to me.

Do you have an actual minimal running example to share? Particularly in the first case I don’t see what can allocate, so the issue is probably somewhere else.

(ps: all those inbounds/simd are redundant and probably have minimal, if any, performance impact, besides making the code much uglier. I would remove them all, and only add that as a very final stage of performance tuning with clear evidence that they do any good. If any, you only need to add the inbounds flag to the most inner line tmp[current_chunkidx][jj] = .. )

Never mind. I made a mistake referencing global variables (the A in the example). :sweat_smile:
I blame Unicode hosting look-alike characters for that.

Thanks for the suggestion. I read about channels and it seems (correct me if wrong) to be designed to facilitate threads working cooperatively.
In my particular example there is no dependence between ii iterations so this feels like a overkill, unless there is something I wasn’t aware of.

thanks @jishnub for the suggestion to use channels! an elegant solution.

yet, in a tight loop i’m finding that channels are slower than the non-thread safe code in the OP, which is what i’ve been using to date (didn’t realize it wasn’t thread safe until recently). in this case, one alternative is to use the :static scheduler for Threads.@threads, for which “the value of threadid() is guranteed to be constant within one iteration”, thereby solving #3 in the OP.