Hi, I am trying to implement a custom stateful transducer, but I am not happy with performance I got.
using Transducers
using Transducers: Transducer, R_, next, inner, xform
using Transducers: start, complete, wrap, unwrap, wrapping
using DataStructures
struct MovingMaximum{T<:Real} <: Transducer
windowlength::Int64
end
IndexValuePair{T} = NamedTuple{(:index, :value),Tuple{Int64,T}}
function Transducers.start(rf::R_{MovingMaximum{T}}, result) where T<:Real
buffer = CircularBuffer{IndexValuePair{T}}(xform(rf).windowlength)
private_state = (buffer, 1)
return wrap(rf, private_state, start(inner(rf), result))
end
function Transducers.next(rf::R_{MovingMaximum{T}}, result, input) where T<:Real
wrapping(rf, result) do (buffer, k), iresult
while !isempty(buffer) && last(buffer).value < input
pop!(buffer)
end
if !isempty(buffer) && first(buffer).index == k
popfirst!(buffer)
end
push!(buffer, (index=k, value=input))
iresult = next(inner(rf), iresult, first(buffer).value)
return (buffer, ifelse(k == xform(rf).windowlength, 1, k+1)), iresult
end
end
function Transducers.complete(rf::R_{MovingMaximum{T}}, result) where T<:Real
_private_state, inner_result = unwrap(rf, result)
return complete(inner(rf), inner_result)
end
For reference this simpler approach with state kept in closures is more performant:
using Transducers
using DataStructures
IndexValuePair{T} = NamedTuple{(:index, :value),Tuple{Int64,T}}
function movingmaximum(::Type{T}, window::Int) where {T<:Real}
buffer = CircularBuffer{IndexValuePair{T}}(window)
k::Int64 = 1
return ScanEmit(nothing) do _, x
while !isempty(buffer) && last(buffer).value < x
pop!(buffer)
end
if !isempty(buffer) && first(buffer).index == k
popfirst!(buffer)
end
push!(buffer, (index=k, value=x))
value = first(buffer).value
k = ifelse(k == window, 1, k+1)
return value, nothing
end
end
x = rand(10000000);
K = 100;
using BenchmarkTools
@btime scan_emit = collect(movingmaximum(Float64, $K), $x);
@btime custom_transducer = collect(MovingMaximum{Float64}($K), $x);
julia> @btime scan_emit = collect(movingmaximum(Float64, $K), $x);
372.746 ms (34 allocations: 129.00 MiB)
julia> @btime custom_transducer = collect(MovingMaximum{Float64}($K), $x);
582.371 ms (20000035 allocations: 675.35 MiB)
I followed this tutorial. Any ideas if I did something wrong, or is it by design? I ran it on Julia 1.5