Parallelization of simple loop: reductions, thread-private variables?


#1

Hi,

I’m looking for a way to parallelize a simple loop I have. I don’t want to get into the complexity of distributed computing, but I’d like something similar to OpenMP. My code structure is basically similar to this:

arr = randn(n)
temp = pre_allocate_stuff()
s = 0.0
for i=1:n
    for j=1:m
        temp[j] = computation(arr[i],j)
    end
    s += reduction(temp)
end

which I would do in OpenMP using a reduction on s and declaring temp as firstprivate. Reading the docs it seems my best bet is Threads.@threads (although in beta, it’s been in the language for a while now), but it doesn’t seem to have support for fancy parallel constructions like reductions or thread private variables. Do I need to do these by hand? Is this something that is in the pipeline and I just need to wait for it?


#2

I don’t know much about parallel processing, but you could try the @parallel macro. It allows to specify a reduction.

arr = randn(n)
s = @parallel (reduction) for i=1:n
        @parallel (reduction) for j=1:m
            temp = computation(arr[i],j)
        end
    end

#3

I’m confused about that but @parallel uses processes, not threads, correct? I was under the impression that threads are better for the kind of fine-grained parallelism I want to use.


#4

Yes and yes. This probably gives all of what you’re looking for:

https://github.com/JuliaLang/julia/pull/22631

If not, I’m sure a separate issue for reductions in multithreading would be very appreciated. For now the easiest thing to do is to just not multithread the reduction part if it’s not too expensive. As for thread-private variables, I think that happens naturally in a Threads.@threads loop?


#5

For thread-private, the issue in the snippet above is that I want to pre-allocate temp and fill it inside the loop. I could do without or handle threads manually, the point was to see if there was a simple modification I could do to my code that enabled multithreading. I’ll just wait for the dust to settle, good to see that this is under active development.


#6

If you do want to do it by hand, here is an example:

function tsum(a)
    nt = nthreads()
    n = length(a)
    nd,nr = divrem(n,nt)
    psums = zeros(nt)
    @threads for i=1:nt
        id = threadid()
        i0=(i-1)*nd
        s = zero(eltype(a))
        @inbounds for ii=1:nd
            s += sin(a[i0+ii])
        end
        psums[id] += s
    end
    if nr > 0
        t = sum(sin.(view(a,nd*nt+1:n)))
    else
        t = zero(eltype(a))
    end
    sum(psums)+t
end

#7

has anything moved here? It seems a shame that something as basic as a sum cannot be automatically multi-threaded?

After looking at this tsum example I also noticed a limitation of generators. Suppose in the tsum above, a is not an array but a generator, then this doesn’t seem to work at all because one cannot index into it nor jump. On the other hand most generators that I write could actually be indexed into.

To give just a very naive example:

G = ( (x[n] - x[n-1]) * (y[n] + y[n-1]) for n = 2:length(x))
trapz = 0.5 * sum(G)

I can pass G to sum, but I cannot pass it to something like tsum.

This seems a bit artificial, but suppose the object constructed at each step of the generator is expensive to compute and to store.


#8

@parallel and tsum (above) statically partition the input iterator into “chunks”. Each process/thread works on a chunk, so ability to index into iterator is required.

But maybe use can use a mapreduce approach instead.

Here’s a very basic skeleton to demonstrate:

using Base.Threads

function threaded_mapreduce(f, op, v0, itr)
    results = fill(v0, nthreads())
    @threads for x in itr
        tid = threadid()
        @inbounds results[tid] = op(results[tid], f(x))
    end

    # reduce results of each thread
    result = v0
    for x in results
        result = op(result, x)
    end
    return result
end

Test it out.

println("Num threads = ", nthreads())

n = 10000
const x = rand(n);
const y = rand(n);

G = ((x[i] - x[i-1]) * (y[i] + y[i-1]) for i = 2:n)
f(i) = (x[i] - x[i-1]) * (y[i] + y[i-1])

sum(G)
mapreduce(f, +, 2:n)
threaded_mapreduce(f, +, 0.0, 2:n)

Note that threading has some overhead, so you might not see a speed-up unless function f is relatively expensive.


#9

@greg_plowman thank you for the suggestion. I guess the question here is what is the performance of threadid() compared to the cost of evaluating f(x). Makes sense. I’ll play with this, thank you.

CC @gideonsimpson


#10

Building on @greg_plowman’s response, the version below is closer to a dynamically scheduled threaded reduction operator:

function tmapreduce(f::Function, op, v0, itr)
    @assert length(itr) > 0
    mutex = Mutex()
    output = deepcopy(v0) # make a deepcopy of starting value
    poppable_itr = Vector(deepcopy(itr)) # convert to set to be able to pop
    # array of input arguments to f, to store input for each thread
    inputs = Vector{typeof(itr[1])}(nthreads()) # deprecated soon?
    @threads for i in eachindex(itr)
        lock(mutex)
        inputs[threadid()] = pop!(poppable_itr)
        unlock(mutex)
        loop_output = f(inputs[threadid()])
        lock(mutex)
        output = op(output, loop_output)
        unlock(mutex)
    end
    return output
end

It pops an input argument from the poppable iterator, and passes that to the function. A Mutex is used to maintain thread safety.

For 4 threads, and the following test functions:

function f(i::Int)
    i <= nthreads()^2 && Libc.systemsleep(1.0)
    return 1
end

function g(i::Int)
    i%nthreads() == 0 && Libc.systemsleep(1.0)
    return 1
end

just check my logic:

julia> sum([i<=nthreads()^2 for i in 1:nthreads()^3])
16

julia> sum([i%nthreads()==0 for i in 1:nthreads()^3])
16

which means that iterating over 1:nthreads()^3 should take 16 seconds for both f and g.

A cursory glance at the timings shows:

julia> @time tmapreduce(f, +, 0, 1:nthreads()^3)
  4.012927 seconds (86 allocations: 4.016 KiB)
64
julia> @time tmapreduce(g, +, 0, 1:nthreads()^3)
  5.023034 seconds (86 allocations: 4.016 KiB)
64
julia> @time threaded_mapreduce(f, +, 0, 1:nthreads()^3)
 16.037183 seconds (7 allocations: 336 bytes)
64
julia> @time threaded_mapreduce(g, +, 0, 1:nthreads()^3)
  4.007422 seconds (7 allocations: 336 bytes)
64

But sometimes tmapreduce doesn’t work as intended:

julia> @time tmapreduce(f, +, 0, 1:nthreads()^3)
  16.030326 seconds (86 allocations: 4.016 KiB)
64

tmapreduce isn’t perfect by any means, but sometimes avoids the worst case scenario.