Forcing data points containing one sinusoidal signal to be symetric around zero and multithreading

Starting point are real world measurements containing one unique sinusoidal signal.
For signal processing purposes it might be beneficial to reduce the offset around zero before evaluating the signal either with fft() or with a parameter fit.
This was the reason to write a function, that eliminates / reduces the offset point by point, by means of a moving window.
The function works fine, but the time needed for processing is quite high, if the sampling rate goes up.
Now I wonder, if a similar function already exists that does the job in a more efficient manner, e.g. by spreading tasks on several cores.

In case others are looking for something similar or have suggestions, I put my example code below:

using PlotlyJS, Printf
using RobustModels
using Debugger

b_plt           = false
b_trunc_signal  = false
num_periods     = 5  
sampling_rate   = 100000
frequ1          = .1
ampl1           = 2.1
phase1          = 10.0 # in degree
offset_sp       = 0.1
grad_offset     = 0.0001 # gradient of the offset
# ---
ppp_frequ1    = trunc(Int, sampling_rate / frequ1)
window_length = trunc(Int, num_periods * ppp_frequ1) 

# --- time and data vector:
vec_t    = collect(range(0, step = 1.0 / sampling_rate, length = window_length)) 
td_data_ = offset_sp .+ (grad_offset * ampl1) .* vec_t .+ ( ampl1 .* cos.( (2 * pi * frequ1 .* vec_t) .+ deg2rad(phase1)) )


# --- Functions ---------------------------------------------------------------------------------------------
function _MyLibCutOffset(_signal::Vector{<:Number}, _sizeMV::Int, _b_truncSignal::Bool=true, _b_verbose::Bool=true)
    b_test_without_mean_command = true
    b_DBG = true
    _signal = Float64.(_signal) 
    _n_pts_signal = length(_signal)
    if _n_pts_signal <= _sizeMV
        error("Signal too short!")
    end
    # max_periods = trunc(Int, _n_pts_signal / _sizeMV) 
    pts_MV_correted = _n_pts_signal - _sizeMV
    if _b_truncSignal
        signal_symetric = Vector{Float64}(undef, pts_MV_correted)
        idx_shift = 0
    else
        signal_symetric = Vector{Float64}(undef, _n_pts_signal)
        if b_test_without_mean_command
            i_mean = 0.0
        else
            i_mean = RobustModels.mean(_signal[1:_sizeMV])
        end
        signal_symetric[1:floor(Int,_sizeMV/2)] = _signal[1:floor(Int,_sizeMV/2)] .- i_mean
        idx_shift = floor(Int,_sizeMV/2)
    end
    # ---
    if _b_verbose
        @info(string("MyLibCutOffset || _b_truncSignal: ", _b_truncSignal, ",\t_n_pts_signal: ", _n_pts_signal, ",\t_sizeMV: ", _sizeMV, ",\tpts_MV_correted: ", pts_MV_correted, ",\tidx_shift: ", idx_shift))
    end
    for i_pts = 1:pts_MV_correted
        _range = range(i_pts, length = _sizeMV)
        try
            if b_test_without_mean_command
                i_mean = 0.0
            else
                i_mean = RobustModels.mean(_signal[_range])
            end
        catch e
            println(string("i_pts: ", i_pts,", \ti_pts + idx_shift: ", i_pts + idx_shift, ", _range[end]: ", _range[end], ", length(_range): ", length(_range)))
        end
        signal_symetric[i_pts + idx_shift] = _signal[i_pts + floor(Int,_sizeMV/2)] - i_mean
    end
    # --- DBG ---
    if b_DBG
        println(string("pts_MV_correted - ceil(Int,_sizeMV/2): ", pts_MV_correted - ceil(Int,_sizeMV/2)))
    end
    if ~_b_truncSignal
        diff_last_corr = signal_symetric[pts_MV_correted + idx_shift] - _signal[pts_MV_correted + floor(Int,_sizeMV/2)]
        signal_symetric[pts_MV_correted+idx_shift+1:end] = _signal[pts_MV_correted+idx_shift+1:end] .- (i_mean + diff_last_corr)
        if b_DBG
            println(string("pts_MV_correted - ceil(Int,_sizeMV/2)+1: ", pts_MV_correted - ceil(Int,_sizeMV/2)+1,
            ", idx_shift+floor(Int,_sizeMV/2): ", idx_shift+floor(Int,_sizeMV/2)))

            println(string("diff_last_corr: ", diff_last_corr))
        end
    end
    if b_test_without_mean_command
        if _b_truncSignal
            idx_start = ceil(Int, pts_MV_correted/2)
            idx_end = _n_pts_signal - floor(Int, pts_MV_correted/2)
            signal_symetric = _signal[idx_start : idx_end]
        else
            signal_symetric = _signal
        end
    end
    # ---
    return signal_symetric
end

# --- Main --------------------------------------------------------------------------------------------------
signal_symetric_ = _MyLibCutOffset(td_data_, ppp_frequ1, b_trunc_signal)
n_pts_signal = length(signal_symetric_)
vec_t_symetric = vec_t[floor(Int, (window_length - n_pts_signal)/2) + 1 : window_length-ceil(Int, (window_length - n_pts_signal)/2)]

# --- info --------------------------------------------------------------------------------------------------
@info(string("window_length: ", window_length, ", n_pts_signal: ", n_pts_signal, ", vec_t_symetric: ", length(vec_t_symetric)))
@info(string("Mean input: ", RobustModels.mean(td_data_), ", Mean output: ", RobustModels.mean(signal_symetric_)))
# --- Plot --------------------------------------------------------------------------------------------------
if b_plt
    if window_length > 1e6 
        error("Too many points!")
    end
    PlotlyJS.PlotlyJSDisplay()
    hdl1 = PlotlyJS.scatter(; x = vec_t,            y = td_data_,           name = "raw")
    hdl2 = PlotlyJS.scatter(; x = vec_t_symetric,   y = signal_symetric_,   name = "corr")
    hdl_plt = PlotlyJS.Plot([hdl1, hdl2])
end

First thing to check is if you are allocating a lot of temporary arrays. Does @time or @btime or @benchmark report a lot of allocations? (I would suggest the latter two as they avoid common pitfalls like reporting compilation time) This would be the first thing to optimize, as it is a good way to detect silly small temporary objects that significantly slow you down (e.g. it might be ok to have a small number of big initial allocations, but if the number of allocations scales with the size of your problem, removing the would probably drastically speed up your code).

Any reason you do a moving window instead of one single run of: input_array .-= mean(input_array)?

You have a lot of debugging branches in your code. Removing them from your MVE would make it easier for us to discuss the specifics of your code (at least for me that is the case, as it is easier to say what is important and what is inconsequential in your code).

Hi,
thank for your comment! In the real world it could be that the offset is not constant over time. This is especially an issue, if you would like to use parameter fit techniques. One example are investigation on batteries.
I have not much experience with benchmarking and performance tuning, what I have already checked is, if the two variants of mean: RobustModels.mean() and Statistics.mean() make a difference.

The try-catch-end structure is for sure a time critical element, I am still in the debugging phase.
Before spending more effort in performace aspects I wonder, if sommething similar might already exist,
it should be e.g. possible to start in parallel more than one for-loops.

It seems you just need some high-pass filter. There are probably packages that do this better than what I am writing below, but here are two simple options:

An exponential filter around would be good enough:

function my_exp_high_pass_filter!(signal,decay)
    low_pass = 0
    @inbounds for i in eachindex(signal)
        low_pass = decay*low_pass + (1-decay)*signal[i]
        signal[i] -= low_pass
    end
    signal
end

Or if you want to do mean by segments (and this one can be multithreaded easily). Note that this is not really a moving window solution, as there is no overlap between windows.

function mean_by_segment(signal, segment_length)
    total_length = length(signal)
    @threads for i in 1:segment_length:total_length
        segment = @view signal[i:min(i+segment_length,total_length)]
        segment .-= mean(segment)
    end
end

Benchmarks

On four threads.

julia> r = rand(1_000_000);

julia> @benchmark my_exp_high_pass_filter!($r, 0.99)
BenchmarkTools.Trial: 1654 samples with 1 evaluation.
 Range (min â€Ķ max):  2.767 ms â€Ķ   4.495 ms  ┊ GC (min â€Ķ max): 0.00% â€Ķ 0.00%
 Time  (median):     2.956 ms               ┊ GC (median):    0.00%
 Time  (mean Âą σ):   3.018 ms Âą 227.938 Ξs  ┊ GC (mean Âą σ):  0.00% Âą 0.00%

      ███▆▆▆▆▇▅▄▄▄▄▃▃▂ ▁▁                                     ▁
  █▄▇███████████████████████▇█▇▇▇▆▄▇▆▆▆▅▄▅▄▆▄▇▆▄▆▆▄▅▁▇█▄▇█▆▅▅ █
  2.77 ms      Histogram: log(frequency) by time      3.94 ms <

 Memory estimate: 0 bytes, allocs estimate: 0.

BenchmarkTools.Trial: 10000 samples with 1 evaluation.
 Range (min â€Ķ max):  257.369 Ξs â€Ķ   4.383 ms  ┊ GC (min â€Ķ max): 0.00% â€Ķ 0.00%
 Time  (median):     371.026 ξs               ┊ GC (median):    0.00%
 Time  (mean Âą σ):   423.487 Ξs Âą 181.874 Ξs  ┊ GC (mean Âą σ):  0.00% Âą 0.00%

      ▂█▆▃▂                                                      
  ▂▄▃▅██████▅▅▄▃▄▃▃▃▂▂▂▂▂▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁ ▂
  257 Ξs           Histogram: frequency by time         1.16 ms <

 Memory estimate: 1.98 KiB, allocs estimate: 21.
1 Like

Thanks for your snippets :slight_smile: I am not sure about the option of a lowpass-filter, I have to check this idea.