Forcing data points containing one sinusoidal signal to be symetric around zero and multithreading

Starting point are real world measurements containing one unique sinusoidal signal.
For signal processing purposes it might be beneficial to reduce the offset around zero before evaluating the signal either with fft() or with a parameter fit.
This was the reason to write a function, that eliminates / reduces the offset point by point, by means of a moving window.
The function works fine, but the time needed for processing is quite high, if the sampling rate goes up.
Now I wonder, if a similar function already exists that does the job in a more efficient manner, e.g. by spreading tasks on several cores.

In case others are looking for something similar or have suggestions, I put my example code below:

using PlotlyJS, Printf
using RobustModels
using Debugger

b_plt           = false
b_trunc_signal  = false
num_periods     = 5  
sampling_rate   = 100000
frequ1          = .1
ampl1           = 2.1
phase1          = 10.0 # in degree
offset_sp       = 0.1
grad_offset     = 0.0001 # gradient of the offset
# ---
ppp_frequ1    = trunc(Int, sampling_rate / frequ1)
window_length = trunc(Int, num_periods * ppp_frequ1) 

# --- time and data vector:
vec_t    = collect(range(0, step = 1.0 / sampling_rate, length = window_length)) 
td_data_ = offset_sp .+ (grad_offset * ampl1) .* vec_t .+ ( ampl1 .* cos.( (2 * pi * frequ1 .* vec_t) .+ deg2rad(phase1)) )

# --- Functions ---------------------------------------------------------------------------------------------
function _MyLibCutOffset(_signal::Vector{<:Number}, _sizeMV::Int, _b_truncSignal::Bool=true, _b_verbose::Bool=true)
    b_test_without_mean_command = true
    b_DBG = true
    _signal = Float64.(_signal) 
    _n_pts_signal = length(_signal)
    if _n_pts_signal <= _sizeMV
        error("Signal too short!")
    # max_periods = trunc(Int, _n_pts_signal / _sizeMV) 
    pts_MV_correted = _n_pts_signal - _sizeMV
    if _b_truncSignal
        signal_symetric = Vector{Float64}(undef, pts_MV_correted)
        idx_shift = 0
        signal_symetric = Vector{Float64}(undef, _n_pts_signal)
        if b_test_without_mean_command
            i_mean = 0.0
            i_mean = RobustModels.mean(_signal[1:_sizeMV])
        signal_symetric[1:floor(Int,_sizeMV/2)] = _signal[1:floor(Int,_sizeMV/2)] .- i_mean
        idx_shift = floor(Int,_sizeMV/2)
    # ---
    if _b_verbose
        @info(string("MyLibCutOffset || _b_truncSignal: ", _b_truncSignal, ",\t_n_pts_signal: ", _n_pts_signal, ",\t_sizeMV: ", _sizeMV, ",\tpts_MV_correted: ", pts_MV_correted, ",\tidx_shift: ", idx_shift))
    for i_pts = 1:pts_MV_correted
        _range = range(i_pts, length = _sizeMV)
            if b_test_without_mean_command
                i_mean = 0.0
                i_mean = RobustModels.mean(_signal[_range])
        catch e
            println(string("i_pts: ", i_pts,", \ti_pts + idx_shift: ", i_pts + idx_shift, ", _range[end]: ", _range[end], ", length(_range): ", length(_range)))
        signal_symetric[i_pts + idx_shift] = _signal[i_pts + floor(Int,_sizeMV/2)] - i_mean
    # --- DBG ---
    if b_DBG
        println(string("pts_MV_correted - ceil(Int,_sizeMV/2): ", pts_MV_correted - ceil(Int,_sizeMV/2)))
    if ~_b_truncSignal
        diff_last_corr = signal_symetric[pts_MV_correted + idx_shift] - _signal[pts_MV_correted + floor(Int,_sizeMV/2)]
        signal_symetric[pts_MV_correted+idx_shift+1:end] = _signal[pts_MV_correted+idx_shift+1:end] .- (i_mean + diff_last_corr)
        if b_DBG
            println(string("pts_MV_correted - ceil(Int,_sizeMV/2)+1: ", pts_MV_correted - ceil(Int,_sizeMV/2)+1,
            ", idx_shift+floor(Int,_sizeMV/2): ", idx_shift+floor(Int,_sizeMV/2)))

            println(string("diff_last_corr: ", diff_last_corr))
    if b_test_without_mean_command
        if _b_truncSignal
            idx_start = ceil(Int, pts_MV_correted/2)
            idx_end = _n_pts_signal - floor(Int, pts_MV_correted/2)
            signal_symetric = _signal[idx_start : idx_end]
            signal_symetric = _signal
    # ---
    return signal_symetric

# --- Main --------------------------------------------------------------------------------------------------
signal_symetric_ = _MyLibCutOffset(td_data_, ppp_frequ1, b_trunc_signal)
n_pts_signal = length(signal_symetric_)
vec_t_symetric = vec_t[floor(Int, (window_length - n_pts_signal)/2) + 1 : window_length-ceil(Int, (window_length - n_pts_signal)/2)]

# --- info --------------------------------------------------------------------------------------------------
@info(string("window_length: ", window_length, ", n_pts_signal: ", n_pts_signal, ", vec_t_symetric: ", length(vec_t_symetric)))
@info(string("Mean input: ", RobustModels.mean(td_data_), ", Mean output: ", RobustModels.mean(signal_symetric_)))
# --- Plot --------------------------------------------------------------------------------------------------
if b_plt
    if window_length > 1e6 
        error("Too many points!")
    hdl1 = PlotlyJS.scatter(; x = vec_t,            y = td_data_,           name = "raw")
    hdl2 = PlotlyJS.scatter(; x = vec_t_symetric,   y = signal_symetric_,   name = "corr")
    hdl_plt = PlotlyJS.Plot([hdl1, hdl2])

First thing to check is if you are allocating a lot of temporary arrays. Does @time or @btime or @benchmark report a lot of allocations? (I would suggest the latter two as they avoid common pitfalls like reporting compilation time) This would be the first thing to optimize, as it is a good way to detect silly small temporary objects that significantly slow you down (e.g. it might be ok to have a small number of big initial allocations, but if the number of allocations scales with the size of your problem, removing the would probably drastically speed up your code).

Any reason you do a moving window instead of one single run of: input_array .-= mean(input_array)?

You have a lot of debugging branches in your code. Removing them from your MVE would make it easier for us to discuss the specifics of your code (at least for me that is the case, as it is easier to say what is important and what is inconsequential in your code).

thank for your comment! In the real world it could be that the offset is not constant over time. This is especially an issue, if you would like to use parameter fit techniques. One example are investigation on batteries.
I have not much experience with benchmarking and performance tuning, what I have already checked is, if the two variants of mean: RobustModels.mean() and Statistics.mean() make a difference.

The try-catch-end structure is for sure a time critical element, I am still in the debugging phase.
Before spending more effort in performace aspects I wonder, if sommething similar might already exist,
it should be e.g. possible to start in parallel more than one for-loops.

It seems you just need some high-pass filter. There are probably packages that do this better than what I am writing below, but here are two simple options:

An exponential filter around would be good enough:

function my_exp_high_pass_filter!(signal,decay)
    low_pass = 0
    @inbounds for i in eachindex(signal)
        low_pass = decay*low_pass + (1-decay)*signal[i]
        signal[i] -= low_pass

Or if you want to do mean by segments (and this one can be multithreaded easily). Note that this is not really a moving window solution, as there is no overlap between windows.

function mean_by_segment(signal, segment_length)
    total_length = length(signal)
    @threads for i in 1:segment_length:total_length
        segment = @view signal[i:min(i+segment_length,total_length)]
        segment .-= mean(segment)


On four threads.

julia> r = rand(1_000_000);

julia> @benchmark my_exp_high_pass_filter!($r, 0.99)
BenchmarkTools.Trial: 1654 samples with 1 evaluation.
 Range (min â€Ķ max):  2.767 ms â€Ķ   4.495 ms  ┊ GC (min â€Ķ max): 0.00% â€Ķ 0.00%
 Time  (median):     2.956 ms               ┊ GC (median):    0.00%
 Time  (mean Âą σ):   3.018 ms Âą 227.938 Ξs  ┊ GC (mean Âą σ):  0.00% Âą 0.00%

      ███▆▆▆▆▇▅▄▄▄▄▃▃▂ ▁▁                                     ▁
  █▄▇███████████████████████▇█▇▇▇▆▄▇▆▆▆▅▄▅▄▆▄▇▆▄▆▆▄▅▁▇█▄▇█▆▅▅ █
  2.77 ms      Histogram: log(frequency) by time      3.94 ms <

 Memory estimate: 0 bytes, allocs estimate: 0.

BenchmarkTools.Trial: 10000 samples with 1 evaluation.
 Range (min â€Ķ max):  257.369 Ξs â€Ķ   4.383 ms  ┊ GC (min â€Ķ max): 0.00% â€Ķ 0.00%
 Time  (median):     371.026 ξs               ┊ GC (median):    0.00%
 Time  (mean Âą σ):   423.487 Ξs Âą 181.874 Ξs  ┊ GC (mean Âą σ):  0.00% Âą 0.00%

  ▂▄▃▅██████▅▅▄▃▄▃▃▃▂▂▂▂▂▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁ ▂
  257 Ξs           Histogram: frequency by time         1.16 ms <

 Memory estimate: 1.98 KiB, allocs estimate: 21.
1 Like

Thanks for your snippets :slight_smile: I am not sure about the option of a lowpass-filter, I have to check this idea.