Multithreaded broadcast?

So, with the new improvements to threading, I’m wondering how that can be exposed easily to users. One of the most common parallelizable pattern is broadcasting (discussed in https://github.com/JuliaLang/julia/issues/19777). The conservative approach is to make @threads work for these, so we can do @threads a.= b.+ c. This is fine, but adding annotations gets annoying really fast (“so, what combination of @threads @simd @inbounds @. do I need this time?”), especially in matlab/numpy vector code with a lot of such operations. The other non-breaking approach is to have a MultithreadedArray type that implements a custom broadcast; this is also annoying. The disrupting approach is to do that automatically, which would really be an awesome feature. As far as I know matlab also does this (for a subset of builtin functions).

There are two aspects to whether that is doable and desirable: correctness and performance.

Correctness is an issue in general, because there is no way to know whether the function calls will be thread safe. However, it doesn’t seem that bad in practice. I did a for i in $(find . -name *.jl) ; do grep "\.(" $i ; done in my .julia/packages, and most of the dot calls are of simple functions: exp/log/sin/cos/abs/min, type conversion and so on. There are a couple of more complex functions, but all I’ve checked look OK. Of course some existing code will likely break, but that feels like the kind of major change for which it’s worth it to break stuff.

Regarding performance, I seem to be getting a constant overhead of about 5 microseconds. For a simple x .= x .+ y, the crossover point is about N=50_000 with two threads on my laptop. For a more complex x .= abs.(cos.(sqrt.(x))) it was about N=3_000. That’s tricky, because broadcast is commonly used both for small and large arrays, so some kind of thresholding would have to be implemented for generic code to make use of multithreading.

2 Likes

I’m not really in favor of automatic multithreading if it is not 100% certain that it will make things faster. Having it opt-in would allow me to choose for myself. How about

@threads begin
    a .= b .+c
    ...
    more code ...
end

?

3 Likes

There are really two questions here : should threading for broadcast be automatic or should it be opt-in, and should it threshold. Even if threading is opt-in, there still is a dilemma for package writers (eg diff eq, optim, etc) : without some kind of thresholding, adding threads speeds up on large arrays, but slows down on small ones.

I’m hoping it’ll be eventually both automatic and opt-in. We have quite a bit of information about the expression that we can use to determine whether it’s safe and/or big enough. But that will necessarily be over-conservative.

On the safety front, it’s much worse than function calls that immediately appear within a broadcasted expression. There’s the BitArray problem — arrays can expose multiple “independent” elements that are actually backed by the same byte.

9 Likes

FWIW, I just tested the overhead of C/OpenMP threading; for a simple sum the crossover point appears to be at N=3_000, which would seems to suggest that the overhead in julia is ~20x larger than in OpenMP.

1 Like

I’m guessing that it’d be a challenging project as you’d need to know if the function is pure-ish (I mean not @pure-pure). For example, multi-threading xs .+ rand.(Ref(rng)) is a bad idea. Maybe the only way to automate “thread safety” detection would be to implement a bunch of function traits like ispure(::Type{Tuple{typeof(+),Float64,Float64}}) = true?

By the way, speaking of RNG, I suppose a proper way to use it with threaded broadcasting is to define thread-able RNG where you create different states for each thread using the jump function. I think the threaded copyto! needs to be aware of this thread-able RNG so that appropriate state can be used for each thread.

Given the generality of broadcasting and heuristics needed to solve the problem, I think it makes sense for “thread-ability hinting” mechanism to evolve outside the Base. Thanks to the very flexible broadcasting mechanism, it’s pretty simple to implement a basic version based on BroadcastStyle:

module ThreadHints

export threaded

using Base.Broadcast: Broadcasted, BroadcastStyle, throwdm, preprocess

function threaded end

struct ThreadedStyle <: BroadcastStyle end

Broadcast.broadcasted(f::typeof(threaded), x) =
    Broadcasted{ThreadedStyle}(identity, (x,))

@inline function Base.copyto!(dest::AbstractArray, bc::Broadcasted{ThreadedStyle})
    axes(dest) == axes(bc) || throwdm(axes(dest), axes(bc))
    @assert bc.f === identity
    @assert bc.args isa Tuple{Any}
    bc′ = preprocess(dest, bc.args[1])
    Threads.@threads for I in eachindex(bc′)
        @inbounds dest[I] = bc′[I]
    end
    return dest
end

end # module
threaded = ThreadHints.threaded


using BenchmarkTools
N = 50_000
xs = randn(N)
ys = randn(N)
zs = randn(N)
@btime $xs .= $threaded.($ys .+ $zs)
#  26.005 μs (13 allocations: 928 bytes)  # 1 thread
#  15.832 μs (34 allocations: 3.13 KiB)   # 4 threads
2 Likes

For regular arrays, or, more generally, arrays with a strided memory layout, you can use the @strided macro from the Strided.jl package.

It will make broadcasting multithreaded, and do a few other things, like automatic views (if they are still strided, that is, if they are indexed by ranges, not by arbitrary vectors) and a view-behaviour for permutedims and certain reshape operations.

6 Likes

Awesome! Your examples are for quite large arrays, do you have plans to deal with small arrays by disabling multithreading?

There is a minimal block size which is just handled by a single thread, so it won’t use multiple threads for small arrays. But I am sure all the infrastructure to analyse the operations involved introduce some overhead which can dominate the actual computation for small arrays.

It will be interesting to see if using the new @spawn in Julia 1.3 allows to simplify/improve the implementation.

2 Likes