Parallel reductions

My actual use case involves this type:

struct For{F,N,X} 
    f :: F  
    θ :: NTuple{N,Int}
end

This is a way to build general array-valued distributions; the type parameters are

  • X gives the sample space for each element of the array
  • N gives its rank
  • F is the type of the function mapping coordinates to a distribution

For example (no pun intended),

julia> d = For(10000000) do j
       Normal(1/j,1.0)
       end
For{var"#153#154",1,Float64}(var"#153#154"(), (10000000,))

The reduction comes in computing logpdf:

@inline function logpdf(d::For{F,N,X1},xs::AbstractArray{X2,N}) where {F,N, X1,  X2 <: X1}
    s = 0.0
    for θ in CartesianIndices(d.θ)
        @inbounds s += logpdf(d.f(Tuple(θ)...), xs[θ])
    end
    s
end

julia> x = rand(d);

julia> @btime logpdf($d,$x)
  65.662 ms (0 allocations: 0 bytes)
-1.4190706223203065e7

I don’t really understand when atomic operations make sense. They’re like very lightweight locks, right? I think we’re seeing memory contention in this case:

@inline function logpdf(d::For{F,N,X1},xs::AbstractArray{X2,N}) where {F,N, X1,  X2 <: X1}
    s = Threads.Atomic{Float64}(0.0)
    for θ in CartesianIndices(d.θ)
        @inbounds Threads.atomic_add!(s, logpdf(d.f(Tuple(θ)...), xs[θ]))
    end
    s.value
end

julia> @btime logpdf($d,$x)
  87.053 ms (1 allocation: 16 bytes)
-1.4190706223203065e7

Here’s my current best:

@inline function logpdf(d::For{F,N,X1},xs::AbstractArray{X2,N}) where {F,N, X1,  X2 <: X1}
    s = zeros(Float64, Threads.nthreads())
    @threads for θ in CartesianIndices(d.θ)
        tid = Threads.threadid()
        @inbounds s[tid] += logpdf(d.f(Tuple(θ)...), xs[θ])
    end
    sum(s)
end

julia> @btime logpdf($d,$x)
  27.363 ms (227 allocations: 24.14 KiB)
-1.4190706223201526e7

I had thought it might be better to allocate a constant at the top-level and re-use it for each reduction. I’m surprised it’s not:

const s =  zeros(Float64, Threads.nthreads())

@inline function logpdf(d::For{F,N,X1},xs::AbstractArray{X2,N}) where {F,N, X1,  X2 <: X1}
    s .= 0.0
    @threads for θ in CartesianIndices(d.θ)
        tid = Threads.threadid()
        @inbounds s[tid] += logpdf(d.f(Tuple(θ)...), xs[θ])
    end
    sum(s)
end

julia> @btime logpdf($d,$x)
  28.936 ms (228 allocations: 23.83 KiB)
-1.4188093456982236e7

How would I do this with SIMDPirates et al?

1 Like