Threaded map and reduce problem

I have a bottleneck matrix-vector product and reduction that I’m trying to speed up. Any multithreading gurus that could point me to a suitable parallel mapreduce implementation? The current code is

function nonthreaded_mvs(A, vv, K)
    mapreduce(k->@view(A[:,:,k])*vv[k], +, 1:K)
end

Typical size for A might be N,M,K = 16,10,100, in which case each element of vv is length M and the result is a length N vector.

I found some related threads but they were looking at splitting up one large matrix-vector product into multiple threads.

I’ve tried variations of the following,

A = rand(N,M,K)
vv = [rand(M) for k in 1:K]

function threaded_mvs(A, vv, K)
    tmp = Matrix{eltype(A)}(undef, N, K)
    Threads.@threads for k in 1:K
        tmp[:, k] = @view(A[:,:,k])*vv[k]
    end
    sum(tmp, dims=2)
end

which works but makes things slower. Any ideas?

You can try using Folds.jl. It should just work.

Mmmm, timing gets much worse using

Folds.mapreduce(k->@view(A[:,:,k])*vv[k], +, 1:K)

I’ll read up on how to use Folds.jl more effectively.

Unless the array-of-arrays input (vv) is the hard-requirement, why not write it with more high-level indexing notation like GitHub - mcabbott/Tullio.jl: ⅀?


Having said that, of course, you can use Folds.jl etc. from JuliaFolds for this type of calculation.

On my machine, K = 100 takes about 14 μs which is very close to task spawn+wait overhead (a few μs). So, I’ll use K = 1000 for clearer demonstration. Here’s the single-thread run-time for me:

julia> N,M,K = 16,10,1000;

julia> A = rand(N,M,K);

julia> vv = [rand(M) for k in 1:K];

julia> @btime let A = A, vv = vv, K = K
           mapreduce(k->@view(A[:,:,k])*vv[k], +, 1:K)
       end;
  139.949 μs (2002 allocations: 406.14 KiB)

With Folds, I get:

julia> Threads.nthreads()
4

julia> @btime let A = A, vv = vv, N = N
           Folds.mapreduce(k-> @view(A[:,:,k]) * vv[k], +, 1:K)
       end;
  46.030 μs (2022 allocations: 408.50 KiB)

We get 3.1x speedup which is not perfect for 4 threads but not so bad since it’s pretty close to task spawn/wait overhead.


But, we can do better. Actually, the first thing to do before going multi-threading is to make single-thread code allocate less (and fast).

For example, + on vectors always allocate new one. We can avoid this by doing a .+= b instead:

julia> add!(a, b) = a .+= b
add! (generic function with 1 method)

julia> @btime let A = A, vv = vv
           mapfoldl(k-> @view(A[:,:,k]) * vv[k], add!, 1:K; init = zeros(N))
       end;
  117.569 μs (1007 allocations: 203.47 KiB)

It’s slightly faster. Note that I switched to mapfoldl since Base's mapreduce does not define behavior for op doing in-place mutation.

However, there are still allocations due to *. We can avoid this by lazily evaluating the second argument:

ulia> using LinearAlgebra

julia> add2!(a, b::AbstractArray) = a .+= b
add2! (generic function with 1 method)

julia> add2!(a, (M, b)) = mul!(a, M, b, 1.0, 1.0)
add2! (generic function with 2 methods)

julia> @btime let A = A, vv = vv
           mapfoldl(k-> (@view(A[:,:,k]), vv[k]), add2!, 1:K; init = zeros(N))
       end;
  68.940 μs (7 allocations: 352 bytes)

Here, we define add2! which is almost like add! but add2(a, (M, b)) is equivalent to add2(a, M * b). This fuses the addition and the matrix multiplication so that there’s no more big allocations.

Now, finally, let’s try using Folds.mapreduce:

julia> using Folds, Transducers  # Transducers for OnInit

julia> @btime let A = A, vv = vv, N = N
           Folds.mapreduce(add2!, 1:K; init = OnInit(() -> zeros(N))) do k
               (@view(A[:, :, k]), vv[k])
           end
       end;
  26.130 μs (32 allocations: 3.41 KiB)

We get 2.5x speedup, compared to the equivalent single-thread code. If I bump up K once more:

julia> N,M,K = 16,10,10000;

julia> A = rand(N,M,K);

julia> vv = [rand(M) for k in 1:K];

julia> @btime let A = A, vv = vv
           mapfoldl(k-> (@view(A[:,:,k]), vv[k]), add2!, 1:K; init = zeros(N))
       end;
  815.991 μs (7 allocations: 352 bytes)

julia> @btime let A = A, vv = vv, N = N
           Folds.mapreduce(add2!, 1:K; init = OnInit(() -> zeros(N))) do k
               (@view(A[:, :, k]), vv[k])
           end
       end;
  187.358 μs (31 allocations: 3.38 KiB)

We get 4.3x speedup! (which is actually very strange… I’m not sure why it’s larger than Threads.nthreads()).

Note that I can use add2! (or add!) for Folds.mapreduce since mapreduce/reduce in JuliaFolds have clear ownership convention for op; it is allowed to mutate the first argument. See: Folds.jl · Folds

Note also that OnInit is essential for using in-place mutation in multi-threaded reduction. Otherwise, multiple tasks will be mutating the same array concurrently.

4 Likes

This is fantastic, thank you. It turns out single threaded seems to outperform everything else for my relatively small calculations, especially using the mapfoldl approach. I learned a lot from studying your post.

It’s a shame there’s no way to speed this calculation up by parallelizing, as it runs frequently.

2 Likes

One thing to keep in mind is that in general, the best place to put your threading is at the highest level possible. That way, the overhead of threading is best minimized.