Multithreaded broadcast?

So, with the new improvements to threading, I’m wondering how that can be exposed easily to users. One of the most common parallelizable pattern is broadcasting (discussed in https://github.com/JuliaLang/julia/issues/19777). The conservative approach is to make @threads work for these, so we can do @threads a.= b.+ c. This is fine, but adding annotations gets annoying really fast (“so, what combination of @threads @simd @inbounds @. do I need this time?”), especially in matlab/numpy vector code with a lot of such operations. The other non-breaking approach is to have a MultithreadedArray type that implements a custom broadcast; this is also annoying. The disrupting approach is to do that automatically, which would really be an awesome feature. As far as I know matlab also does this (for a subset of builtin functions).

There are two aspects to whether that is doable and desirable: correctness and performance.

Correctness is an issue in general, because there is no way to know whether the function calls will be thread safe. However, it doesn’t seem that bad in practice. I did a for i in $(find . -name *.jl) ; do grep "\.(" $i ; done in my .julia/packages, and most of the dot calls are of simple functions: exp/log/sin/cos/abs/min, type conversion and so on. There are a couple of more complex functions, but all I’ve checked look OK. Of course some existing code will likely break, but that feels like the kind of major change for which it’s worth it to break stuff.

Regarding performance, I seem to be getting a constant overhead of about 5 microseconds. For a simple x .= x .+ y, the crossover point is about N=50_000 with two threads on my laptop. For a more complex x .= abs.(cos.(sqrt.(x))) it was about N=3_000. That’s tricky, because broadcast is commonly used both for small and large arrays, so some kind of thresholding would have to be implemented for generic code to make use of multithreading.

4 Likes

I’m not really in favor of automatic multithreading if it is not 100% certain that it will make things faster. Having it opt-in would allow me to choose for myself. How about

@threads begin
    a .= b .+c
    ...
    more code ...
end

?

7 Likes

There are really two questions here : should threading for broadcast be automatic or should it be opt-in, and should it threshold. Even if threading is opt-in, there still is a dilemma for package writers (eg diff eq, optim, etc) : without some kind of thresholding, adding threads speeds up on large arrays, but slows down on small ones.

I’m hoping it’ll be eventually both automatic and opt-in. We have quite a bit of information about the expression that we can use to determine whether it’s safe and/or big enough. But that will necessarily be over-conservative.

On the safety front, it’s much worse than function calls that immediately appear within a broadcasted expression. There’s the BitArray problem — arrays can expose multiple “independent” elements that are actually backed by the same byte.

9 Likes

FWIW, I just tested the overhead of C/OpenMP threading; for a simple sum the crossover point appears to be at N=3_000, which would seems to suggest that the overhead in julia is ~20x larger than in OpenMP.

3 Likes

I’m guessing that it’d be a challenging project as you’d need to know if the function is pure-ish (I mean not @pure-pure). For example, multi-threading xs .+ rand.(Ref(rng)) is a bad idea. Maybe the only way to automate “thread safety” detection would be to implement a bunch of function traits like ispure(::Type{Tuple{typeof(+),Float64,Float64}}) = true?

By the way, speaking of RNG, I suppose a proper way to use it with threaded broadcasting is to define thread-able RNG where you create different states for each thread using the jump function. I think the threaded copyto! needs to be aware of this thread-able RNG so that appropriate state can be used for each thread.

Given the generality of broadcasting and heuristics needed to solve the problem, I think it makes sense for “thread-ability hinting” mechanism to evolve outside the Base. Thanks to the very flexible broadcasting mechanism, it’s pretty simple to implement a basic version based on BroadcastStyle:

module ThreadHints

export threaded

using Base.Broadcast: Broadcasted, BroadcastStyle, throwdm, preprocess

function threaded end

struct ThreadedStyle <: BroadcastStyle end

Broadcast.broadcasted(f::typeof(threaded), x) =
    Broadcasted{ThreadedStyle}(identity, (x,))

@inline function Base.copyto!(dest::AbstractArray, bc::Broadcasted{ThreadedStyle})
    axes(dest) == axes(bc) || throwdm(axes(dest), axes(bc))
    @assert bc.f === identity
    @assert bc.args isa Tuple{Any}
    bc′ = preprocess(dest, bc.args[1])
    Threads.@threads for I in eachindex(bc′)
        @inbounds dest[I] = bc′[I]
    end
    return dest
end

end # module
threaded = ThreadHints.threaded


using BenchmarkTools
N = 50_000
xs = randn(N)
ys = randn(N)
zs = randn(N)
@btime $xs .= $threaded.($ys .+ $zs)
#  26.005 μs (13 allocations: 928 bytes)  # 1 thread
#  15.832 μs (34 allocations: 3.13 KiB)   # 4 threads
3 Likes

For regular arrays, or, more generally, arrays with a strided memory layout, you can use the @strided macro from the Strided.jl package.

It will make broadcasting multithreaded, and do a few other things, like automatic views (if they are still strided, that is, if they are indexed by ranges, not by arbitrary vectors) and a view-behaviour for permutedims and certain reshape operations.

8 Likes

Awesome! Your examples are for quite large arrays, do you have plans to deal with small arrays by disabling multithreading?

There is a minimal block size which is just handled by a single thread, so it won’t use multiple threads for small arrays. But I am sure all the infrastructure to analyse the operations involved introduce some overhead which can dominate the actual computation for small arrays.

It will be interesting to see if using the new @spawn in Julia 1.3 allows to simplify/improve the implementation.

2 Likes

Is there any news on this?

2 Likes

Nope

4 Likes

I think @chriselrod is planning on adding multithreaded broadcasting to LoopVectorization.jl.

1 Like

Now in LoopVectorization v0.12.5:

julia> x = rand(3000); y = similar(x);

julia> using LoopVectorization

julia> @benchmark @avxt $y .= abs.(cos.(sqrt.($x))) # multithreaded
BenchmarkTools.Trial:
  memory estimate:  0 bytes
  allocs estimate:  0
  --------------
  minimum time:     988.300 ns (0.00% GC)
  median time:      1.016 μs (0.00% GC)
  mean time:        1.021 μs (0.00% GC)
  maximum time:     13.133 μs (0.00% GC)
  --------------
  samples:          10000
  evals/sample:     10

julia> y ≈ abs.(cos.(sqrt.(x)))
true

julia> @benchmark @avx $y .= abs.(cos.(sqrt.($x))) # single-threaded
BenchmarkTools.Trial:
  memory estimate:  0 bytes
  allocs estimate:  0
  --------------
  minimum time:     2.870 μs (0.00% GC)
  median time:      3.088 μs (0.00% GC)
  mean time:        3.108 μs (0.00% GC)
  maximum time:     7.177 μs (0.00% GC)
  --------------
  samples:          10000
  evals/sample:     9

julia> @benchmark $y .= abs.(cos.(sqrt.($x))) # base
BenchmarkTools.Trial:
  memory estimate:  0 bytes
  allocs estimate:  0
  --------------
  minimum time:     14.779 μs (0.00% GC)
  median time:      15.066 μs (0.00% GC)
  mean time:        15.167 μs (0.00% GC)
  maximum time:     50.096 μs (0.00% GC)
  --------------
  samples:          10000
  evals/sample:     1

julia> versioninfo()
Julia Version 1.7.0-DEV.713
Commit ae26fc6d5f* (2021-03-16 06:50 UTC)
Platform Info:
  OS: Linux (x86_64-linux-gnu)
  CPU: 11th Gen Intel(R) Core(TM) i7-1165G7 @ 2.80GHz
  WORD_SIZE: 64
  LIBM: libopenlibm
  LLVM: libLLVM-11.0.1 (ORCJIT, tigerlake)
Environment:
  JULIA_NUM_THREADS = 8

At the cost of composability, overhead is substantially smaller than 5 microseconds, which is why on a 4-core CPU it can speed up the abs.(cos.(sqrt.(x))) example to 1 microsecond.
15x faster than Base broadcasting.

25 Likes

I would like you to know that you personally have made it so I have to spend as much time precompiling 1.6 as I did 1.5 since the 12.x release of LoopVectorization has been getting incredible releases multiple times a week.

1 Like

Awesome! I’m marking this as a solution until the day we have fast composable automatic threading of broadcasts :slight_smile:

6 Likes

Is noncomposability a temporary issue or a necessary consequence of LoopVectorization’s design?

1 Like

as far as i understand, LoopVectorization only works with cpu numbers (floats, ints). base broadcast can receive anything as input

1 Like

I believe “composability” in @Elrod’s post refers to composable multithreaded parallelism.

@avxt already composes with CheapThreads.batch.

Should be temporary. tkf made a good suggestion on how to fix it, so I’ll probably do that fairly soon.

The fix would be to make the wait function it uses for synchronization check if tasks were started yet. If they weren’t, it’ll take the tasks back.
The tasks get sent to specific threads, so if that thread is running an @spawned task, it’d have to yield before LV’s thread can actually begin. If it never does, the task will never start, and the thread running the @avxt code would wait forever.
So the solution is check if it’s been started. If not, remove the task and have this main thread begin. That’d mean it runs serially, but that’d be a lot better than deadlocking or waiting who-knows-how-long before the task can start.

3 Likes

Is there a tracking issue for this somewhere?