Looks like a nice solution for processing buffered streams of signals in chunks and pass its results to next processing steps in a sequense or tree.
But can you please explain, how should stateful functions code change in order to use it in Transducers? Or just a MWE of simple moving difference stateful function rewritten with transducers?
Moving difference example
"""
Moving difference stateful function
`y[i] = x[i] - x[i-delta]`
Inner state contains `buf` with previous delta points: `x[i-delta]:x[i-1]`
and buffer index `k` pointing to the oldest `i-delta` element.
So when passing x[i], it calculates y[i] = x[i] - buf[k],
saves new element to state `buf[k] = x[i]` and increases k in circular fashion.
"""
mutable struct MovDifFilter{T}
need_restart::Bool
delta::Int
k::Int # buffer index
buf::Vector{T}
MovDifFilter{T}(delta::Int) where {T} =
new{T}(true, delta, 1, Vector{T}(undef, delta))
end
# resets state to restart processing at next run
function reset!(state::MovDifFilter)
state.need_restart = true
state
end
# fills buffer with initial value at first run
function _restart!(state::MovDifFilter{T}, x0::T) where T
fill!(state.buf, x0)
state.k = 1
state.need_restart = false
state
end
# process one single point
function run(state::MovDifFilter{T}, x::T)::T where T
if state.need_restart
_restart!(state, x)
end
delta = state.delta
buf = state.buf
k = state.k
y = x - buf[k]
buf[k] = x
k = ifelse(k == delta, 1, k + 1)
state.k = k
y
end
# process a vector x and allocate output vector y
function run(state::MovDifFilter{T}, x::AbstractVector{T}) where T
y = similar(x)
run!(y, state, x)
y
end
# process a vector x and write to preallocated output vector y
function run!(y::AbstractVector{T},
state::MovDifFilter{T}, x::AbstractVector{T}) where T
length(x) == 0 && return y
if state.need_restart
_restart!(state, x[1])
end
delta = state.delta
buf = state.buf
k = state.k
@inbounds for i in eachindex(x)
y[i] = x[i] - buf[k]
buf[k] = x[i]
k = ifelse(k == delta, 1, k + 1)
end
state.k = k
y
end
# stateful function notations
(state::MovDifFilter)(y, x) = run!(y, state, x)
(state::MovDifFilter)(x) = run(state, x)
"""
Usage example:
"""
N = 10000
delta = 5
chunk = 50
x = sin.(2*pi*(1:N)./20)
y = similar(x)
filt = MovDifFilter{Float64}(delta)
for i = chunk : chunk : length(x)
yview = view(y, i-chunk+1:i)
xview = view(x, i-chunk+1:i)
filt(yview, xview) # or: run!(yview, filter, xview)
end
using Plots
plot(x[1:100])
plot!(y[1:100])
I think many filters can be written in terms of what I call ScanEmit. For example:
movingdifference(delta::Int) =
ScanEmit((1, nothing)) do (k, buf), x
if buf === nothing
buf = fill(x, delta)
end
y = @inbounds x - buf[k]
@inbounds buf[k] = x
k += 1
return y, (ifelse(k == delta + 1, 1, k), buf)
end
map!(movingdifference(delta), y, x) # need the latest release 0.4.3
collect(movingdifference(delta), x)
Since transducers explicitly carry the state ((k, buf) in the example above), you can quite easily make it compatible with StaticAarrays.jl. It may be useful for small buffer.
Is it possible with transducers to create a similar function which is efficient for both cases: streaming (i.e. it keeps the buffer of delta values, as in your example) and regular in-memory arrays (i.e. it just uses the values from original array without holding a copy of a part of it)?
If you want to use the same input x buffer in differrent functions, I think you can pass the same circular buffer and check inside function that if buffer size is smaller than function needs, then increase it, so you don’t have to remember buffer size outside the function, and differnt functions use the same buffer. But you would need that only for branching streams.
For chunk-by-chunk processing, you always need some external buffered x of previous delta elements. The only oiptimization here is for long chunks - buffer should be saved only on edges between long chunks:
if len <= 2 * delta || x === y # general case for short arrays or inplace processing
@inbounds for i in eachindex(x)
y[i] = x[i] - buf[k]
buf[k] = x[i]
k = ifelse(k == delta, 1, k + 1)
end
else # optimized for long arrays
i = 1
@inbounds while i <= delta
y[i] = x[i] - buf[k]
buf[k] = x[i]
k = ifelse(k == delta, 1, k + 1)
i += 1
end
@inbounds while i <= len
y[i] = x[i] - x[i - delta]
i += 1
end
@inbounds for z = 1:delta
buf[z] = x[i - delta + z]
end
k = 1
end
(I think what you meant was xf1 but mentioning xf2 just in case.)
But, unfortunately, combining stateful transducers like these have performance issue due to (I think) inference limitation of the julia compiler https://github.com/tkf/Transducers.jl/issues/14