Type-stable threaded map?

Can anyone think of a way to write a threaded map function that is type stable? The only way I have come up with so far uses return_type which I understand is not a good idea.

function tmap(f, args...)
    cargs = collect(zip(args...))
    n = length(cargs)
    T = Core.Compiler.return_type(f, Tuple{map(typeof,first(cargs))...})
    ans = Vector{T}(undef,n)
    Base.Threads.@threads for i=1:n
        ans[i] = f(cargs[i]...)
    end
    ans
end

Two option for ideas could be https://github.com/madsjulia/RobustPmap.jl and https://github.com/mohamed82008/KissThreading.jl

1 Like

KissThreading mapreduce appears to only be type-stable because it forces you to give the init argument, which it then takes the type from.

julia> @code_warntype KissThreading.tmapreduce(x->x^2, vcat, 1:10, init=Int[])
Body::Array{Int64,1}

julia> @code_warntype KissThreading.tmapreduce(x->x^2, vcat, 1:10, init=Any[])
Body::Array{Any,1}

Similarly, RobustPmap also looks like it just lets you give the return type by hand if you want.

Here, I’m looking for something that is automatically inferred like how Base.map works.

I think the only way this is really feasible without having the user specify the type is using return_type (either explicitly or implicitly) because fundamentally, you want your function to create a container that has an element type that must be determined via inferring the return type of f on the elements of your container.

Also, my understanding is that it’s a bad idea to have code that will break if an update makes inference more or less powerful, but it seems that your code will not break if inference becomes more or less precise, it would just mean that the type of vector you allocate could be more or less efficient for your purposes. You’re not relying on the compiler always giving a concrete type. Even Vector{Any} would be fine.


Edit:
However, there is still a problem with this:

 T = Core.Compiler.return_type(f, Tuple{map(typeof,first(cargs))...})

consider the following contrived example:

function tmap(f, args...)
    cargs = collect(zip(args...))
    n = length(cargs)
    T = Core.Compiler.return_type(f, Tuple{map(typeof,first(cargs))...})
    ans = Vector{T}(undef,n)
    Base.Threads.@threads for i=1:n
        ans[i] = f(cargs[i]...)
    end
    ans
end

f(x) = rand(Bool) == true ? 1.0 : 1
f(s::String) = "boo!"

julia> tmap(f, [1, 2, "hi"])
Error thrown in threaded loop on thread 2: MethodError(f=typeof(Base.convert)(), args=(Union{Float64, Int64}, "boo!"), world=0x0000000000006408)
 3-element Array{Union{Float64, Int64},1}:
  1.0
  1.0
  0.0

I don’t think its quite the same. Vector{SomeConcreteType} vs. Vector{Any} will dispatch differently when fed into various other functions, so here the functionality of my subsequent code would really change depending on if inference worked fully. As I understand it, Base.map uses some method of starting with a vector given the return type of the first element, then widening the type if necessary for subsequent elements, but not relying on inference to determine the eltype (I admit I’ve not read the code though; I guess really that’s what my tmap would have to emulate somehow…)

Yes good catch, should have mentioned my functions was only meant to work in the case of a homogenously typed array.

This is a very good question. If you look at the way Base.map works, it actually does a reduction over the types of the values, so a parallel version of the algorithm would need a parallel reduction. A key missing piece for that is the ability to infer the result of a Task. We haven’t really needed that before since Tasks did not tend to be used for performance-critical code, but in the multithreaded future they will be. So we’ll need to work on that. Then you could use a tree reduction approach.

10 Likes

Ok, in practice the cases you want to be really fast (such that threads are the limiting factor) will probably be homogeneous arrays, so you could also use the type of the first result as the element type and just assume they are all the same.

2 Likes

Hello, is this related (map type-stable but pmap not) ?

julia> using BenchmarkTools
julia> using Distributed
julia> a = rand(1:35,100)
julia> addprocs(3)
julia> @everywhere function fib(n)
         if n == 0 return 0 end
         if n == 1 return 1 end
         return fib(n-1) + fib(n-2)
       end
julia> @code_warntype fib(35) # Body::Int64
julia> @code_warntype map(fib,a)
Body::Array{Int64,1}
1 ─ %1 = %new(Base.Generator{Array{Int64,1},typeof(fib)}, fib, A)::Base.Generator{Array{Int64,1},typeof(fib)}
│   %2 = invoke Base._collect(_3::Array{Int64,1}, %1::Base.Generator{Array{Int64,1},typeof(fib)}, $(QuoteNode(Base.EltypeUnknown()))::Base.EltypeUnknown, $(QuoteNode(Base.HasShape{1}()))::Base.HasShape{1})::Array{Int64,1}
└──      return %2
julia> @code_warntype pmap(fib,a)
Body::Any
1 ─ %1  = invoke Distributed.default_worker_pool()::Union{Nothing, WorkerPool}
│   %2  = Distributed.pmap::typeof(pmap)
│   %3  = (isa)(%1, WorkerPool)::Bool
└──       goto #3 if not %3
2 ─ %5  = π (%1, WorkerPool)
│   %6  = invoke %2(_2::Function, %5::WorkerPool, _3::Array{Int64,1})::Any
└──       goto #6
3 ─ %8  = (isa)(%1, Nothing)::Bool
└──       goto #5 if not %8
4 ─ %10 = invoke Distributed.:(#pmap#226)($(QuoteNode(Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}()))::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}, Distributed.pmap::Function, fib::Function, nothing::Nothing, _3::Array{Int64,1})::Any
└──       goto #6
5 ─       (Core.throw)(ErrorException("fatal error in type inference (type bound)"))
└──       $(Expr(:unreachable))
6 ┄ %14 = φ (#2 => %6, #4 => %10)::Any
└──       goto #7
7 ─       return %14

From this S.O. question.

With Julia 1.3, I’ve been using this:

function tmap(op, lst)
    refs = [@spawn(op(itm)) for itm in lst]
    return fetch.(refs)
end

function tmapreduce(op, red, lst)
    refs = [@spawn(op(itm)) for itm in lst]
    result = fetch(refs[1])
    for i = 2:length(lst)
        result = red(result, fetch(refs[i]))
    end
    return result
end

Its definitely not ideal, but It works very well, and I’m getting ~3-4x speedup from map() and mapreduce().

Maybe someone has an idea on how to improve easily? I tried dividing the work into batches and @spawn mapreduce(), but it was actually slower for my simple case.

In Transducers.jl master you can do

using Transducers
using BangBang
using StaticArrays

tmap(f, xs) = reduce(append!!, Map(x -> SVector(f(x))), xs; init=Union{}[])

to implement threaded map using mutate-or-widen strategy.

4 Likes