Using Transducers.jl instead of stateful functions for stream processing

Looks like a nice solution for processing buffered streams of signals in chunks and pass its results to next processing steps in a sequense or tree.

But can you please explain, how should stateful functions code change in order to use it in Transducers? Or just a MWE of simple moving difference stateful function rewritten with transducers?

Moving difference example
"""
Moving difference stateful function
    `y[i] = x[i] - x[i-delta]`

Inner state contains `buf` with previous delta points: `x[i-delta]:x[i-1]`
and buffer index `k` pointing to the oldest `i-delta` element.

So when passing x[i], it calculates y[i] = x[i] - buf[k],
saves new element to state `buf[k] = x[i]` and increases k in circular fashion.
"""
mutable struct MovDifFilter{T}
    need_restart::Bool
    delta::Int
    k::Int # buffer index
    buf::Vector{T}
    MovDifFilter{T}(delta::Int) where {T} =
        new{T}(true, delta, 1, Vector{T}(undef, delta))
end

# resets state to restart processing at next run
function reset!(state::MovDifFilter)
    state.need_restart = true
    state
end

# fills buffer with initial value at first run
function _restart!(state::MovDifFilter{T}, x0::T) where T
    fill!(state.buf, x0)
    state.k = 1
    state.need_restart = false
    state
end

# process one single point
function run(state::MovDifFilter{T}, x::T)::T where T
    if state.need_restart
        _restart!(state, x)
    end
    delta = state.delta
    buf = state.buf
    k = state.k

    y = x - buf[k]
    buf[k] = x
    k = ifelse(k == delta, 1, k + 1)

    state.k = k
    y
end

# process a vector x and allocate output vector y
function run(state::MovDifFilter{T}, x::AbstractVector{T}) where T
    y = similar(x)
    run!(y, state, x)
    y
end

# process a vector x and write to preallocated output vector y
function run!(y::AbstractVector{T},
              state::MovDifFilter{T}, x::AbstractVector{T}) where T
    length(x) == 0 && return y
    if state.need_restart
        _restart!(state, x[1])
    end
    delta = state.delta
    buf = state.buf
    k = state.k

    @inbounds for i in eachindex(x)
        y[i] = x[i] - buf[k]
        buf[k] = x[i]
        k = ifelse(k == delta, 1, k + 1)
    end

    state.k = k
    y
end

# stateful function notations
(state::MovDifFilter)(y, x) = run!(y, state, x)
(state::MovDifFilter)(x) = run(state, x)

"""
Usage example:
"""

N = 10000

delta = 5
chunk = 50

x = sin.(2*pi*(1:N)./20)
y = similar(x)

filt = MovDifFilter{Float64}(delta)

for i = chunk : chunk : length(x)
    yview = view(y, i-chunk+1:i)
    xview = view(x, i-chunk+1:i)
    filt(yview, xview) # or: run!(yview, filter, xview)
end

using Plots
plot(x[1:100])
plot!(y[1:100])
1 Like

I think many filters can be written in terms of what I call ScanEmit. For example:

movingdifference(delta::Int) =
    ScanEmit((1, nothing)) do (k, buf), x
        if buf === nothing
            buf = fill(x, delta)
        end
        y = @inbounds x - buf[k]
        @inbounds buf[k] = x
        k += 1
        return y, (ifelse(k == delta + 1, 1, k), buf)
    end

map!(movingdifference(delta), y, x)  # need the latest release 0.4.3
collect(movingdifference(delta), x)

Since transducers explicitly carry the state ((k, buf) in the example above), you can quite easily make it compatible with StaticAarrays.jl. It may be useful for small buffer.

Is it possible with transducers to create a similar function which is efficient for both cases: streaming (i.e. it keeps the buffer of delta values, as in your example) and regular in-memory arrays (i.e. it just uses the values from original array without holding a copy of a part of it)?

Nice! And how would that change if I need a processing chain of two consequtive movingdifference filters with different deltas?

If you want to use the same input x buffer in differrent functions, I think you can pass the same circular buffer and check inside function that if buffer size is smaller than function needs, then increase it, so you don’t have to remember buffer size outside the function, and differnt functions use the same buffer. But you would need that only for branching streams.

For chunk-by-chunk processing, you always need some external buffered x of previous delta elements. The only oiptimization here is for long chunks - buffer should be saved only on edges between long chunks:

    if len <= 2 * delta || x === y # general case for short arrays or inplace processing
        @inbounds for i in eachindex(x)
            y[i] = x[i] - buf[k]
            buf[k] = x[i]
            k = ifelse(k == delta, 1, k + 1)
        end
    else # optimized for long arrays
        i = 1
        @inbounds while i <= delta
            y[i] = x[i] - buf[k]
            buf[k] = x[i]
            k = ifelse(k == delta, 1, k + 1)
            i += 1
        end
        @inbounds while i <= len
            y[i] = x[i] - x[i - delta]
            i += 1
        end
        @inbounds for z = 1:delta
            buf[z] = x[i - delta + z]
        end
        k = 1
    end
1 Like

(I think this conversation about Transducers can be split into another thread…)

1 Like

You can process the output of one transducer by another transducer by

xf1 = movingdifference(5) |> movingdifference(10)

and you can process the same input stream with multiple transducers using Zip (“fan-out”)

xf2 = Zip(movingdifference(5), movingdifference(10))

(I think what you meant was xf1 but mentioning xf2 just in case.)

But, unfortunately, combining stateful transducers like these have performance issue due to (I think) inference limitation of the julia compiler https://github.com/tkf/Transducers.jl/issues/14

1 Like

When I have long pauses between next point / chunk arrives, how can I store transducer state, and then load it seamlessly?

Example (run it after the original moving difference example in 1st post):

using Transducers

movingdifference(delta::Int) =
    ScanEmit((1, nothing)) do (k, buf), x
        if buf === nothing
            buf = fill(x, delta)
        end
        y = @inbounds x - buf[k]
        @inbounds buf[k] = x
        k += 1
        return y, (ifelse(k == delta + 1, 1, k), buf)
    end

"""
Usage example:
"""

using BenchmarkTools

N = 100

delta = 5
chunk = 50

x = sin.(2*pi*(1:N)./20)
y1 = similar(x)
y2 = similar(x)

F = movingdifference(delta) # transducer
filt = MovDifFilter{Float64}(delta) # original form

for i = chunk : chunk : length(x)
    y1view = view(y1, i-chunk+1:i)
    y2view = view(y2, i-chunk+1:i)
    xview = view(x, i-chunk+1:i)
    filt(y1view, xview) # or: run!(yview, filter, xview)
    map!(F, y2view, xview)
end

using Plots
plot(x)
plot!(y1)
plot!(y2)

# png("tmp.png")

tmp

ATM, the official API is to define Transducers.__foldl__ of your “data source”: https://tkf.github.io/Transducers.jl/dev/examples/reducibles/

There is Transducers.AdHocFoldable to make this easier. See circularwindows example in the docstring.