Custom stateful transducer performance

Hi, I am trying to implement a custom stateful transducer, but I am not happy with performance I got.

using Transducers
using Transducers: Transducer, R_, next, inner, xform
using Transducers: start, complete, wrap, unwrap, wrapping
using DataStructures

struct MovingMaximum{T<:Real} <: Transducer
    windowlength::Int64
end

IndexValuePair{T} = NamedTuple{(:index, :value),Tuple{Int64,T}}

function Transducers.start(rf::R_{MovingMaximum{T}}, result) where T<:Real
    buffer = CircularBuffer{IndexValuePair{T}}(xform(rf).windowlength)
    private_state = (buffer, 1)
    return wrap(rf, private_state, start(inner(rf), result))
end

function Transducers.next(rf::R_{MovingMaximum{T}}, result, input) where T<:Real
    wrapping(rf, result) do (buffer, k), iresult
        while !isempty(buffer) && last(buffer).value < input
            pop!(buffer)
        end
        if !isempty(buffer) && first(buffer).index == k
            popfirst!(buffer)
        end
        push!(buffer, (index=k, value=input))
        iresult = next(inner(rf), iresult, first(buffer).value)
        return (buffer, ifelse(k == xform(rf).windowlength, 1, k+1)), iresult
    end
end

function Transducers.complete(rf::R_{MovingMaximum{T}}, result) where T<:Real
    _private_state, inner_result = unwrap(rf, result)
    return complete(inner(rf), inner_result)
end

For reference this simpler approach with state kept in closures is more performant:

using Transducers
using DataStructures
IndexValuePair{T} = NamedTuple{(:index, :value),Tuple{Int64,T}}
function movingmaximum(::Type{T}, window::Int) where {T<:Real}
    buffer = CircularBuffer{IndexValuePair{T}}(window)
    k::Int64 = 1
    return ScanEmit(nothing) do _, x
        while !isempty(buffer) && last(buffer).value < x
            pop!(buffer)
        end
        
        if !isempty(buffer) && first(buffer).index == k
            popfirst!(buffer)
        end
        push!(buffer, (index=k, value=x))

        value = first(buffer).value
        k = ifelse(k == window, 1, k+1)
        return value, nothing
    end
end
x = rand(10000000);
K = 100;

using BenchmarkTools
@btime scan_emit = collect(movingmaximum(Float64, $K), $x);
@btime custom_transducer = collect(MovingMaximum{Float64}($K), $x);

julia> @btime scan_emit = collect(movingmaximum(Float64, $K), $x);
  372.746 ms (34 allocations: 129.00 MiB)

julia> @btime custom_transducer = collect(MovingMaximum{Float64}($K), $x);
  582.371 ms (20000035 allocations: 675.35 MiB)

I followed this tutorial. Any ideas if I did something wrong, or is it by design? I ran it on Julia 1.5

You’d need @inline for Transducers.next(rf::R_{MovingMaximum{T}}, result, input). It’s a bit faster than the other version on my laptop if I do that. Stateful transducers typically need to use compiler at its full capability (and sometimes compiler just gives up).

julia> @btime collect(MovingMaximum{Float64}($K), $x);
  661.515 ms (20000035 allocations: 675.35 MiB)

julia> # put @inline

julia> @btime collect(MovingMaximum{Float64}($K), $x);
  301.025 ms (31 allocations: 129.00 MiB)

julia> @btime collect(movingmaximum(Float64, $K), $x);
  378.188 ms (34 allocations: 129.00 MiB)
1 Like

btw do you mean v1.0.5?

Thanks! It speeded up from 582 ms to 276 ms, I think it is really fast now, I expected such timings writing this transducer.

Nope, I mean 1.5 - nightly build:

2 Likes

Latest development branch doesn’t always mean best performance

Sure, but by this I meant that I checked it on 1.3, 1.4 and 1.5, sorry for not being precise. Adding @inline solves my problem everywhere.

2 Likes