Variable sized windows for moving average

Hi, I saw some libraries for moving averages (RollingFunctions.jl), but I did not find one with the option to make the window size variable.

My specific case would be a moving average over a “time series”, averaging the last x days.

something like this:

using DataFrames
using Statistics
using BenchmarkTools

LEN = 10^4 # 10^7

data = DataFrame(
  timestamp=cumsum(rand(1:3,LEN)),
  values = cumsum(rand(1:0.1:2,LEN))
);

coalesceNothing(x,default) = isnothing(x) ? default : x
getFirstIndex(x,days,i) = coalesceNothing(findfirst((days .+ x .- x[i]).>0),1)
@benchmark data.mean10 = [mean(data.values[i:-1:getFirstIndex(data.timestamp,10,i)]) for i in 1:LEN]

#BenchmarkTools.Trial: 194 samples with 1 evaluation.
# Range (min … max):  24.961 ms …  28.124 ms  ┊ GC (min … max): 0.00% … 0.00%
# Time  (median):     25.533 ms               ┊ GC (median):    0.00%
# Time  (mean ± σ):   25.886 ms ± 756.979 μs  ┊ GC (mean ± σ):  1.92% ± 2.37%
#
#    ▁▆█▆▆█   ▁▆▆                  ▃                             
#  ▇▇██████▄▆▄████▆▆▄▁▁▁▁▁▁▁▁▁▄▃▁▃▄█▃█▇▇▆▆█▇▄▃▁▄▆▇▇█▃▇▄▇▃▃▁▁▁▁▃ ▃
#  25 ms           Histogram: frequency by time         27.5 ms <
#
# Memory estimate: 55.97 MiB, allocs estimate: 78981.


# Version with reduced search space (did not help as I introduced new allocations)

getFirstIndex2(x,days,i) = i+1-coalesceNothing(findfirst([x[i] - xj >= days for xj in x[i-1:-1:1]]),i)
@benchmark data.mean10_2 = [mean(data.values[i:-1:getFirstIndex2(data.timestamp,10,i)]) for i in 1:LEN]

#BenchmarkTools.Trial: 114 samples with 1 evaluation.
# Range (min … max):  41.810 ms … 49.274 ms  ┊ GC (min … max): 4.48% … 7.84%
# Time  (median):     44.207 ms              ┊ GC (median):    5.90%
# Time  (mean ± σ):   44.215 ms ±  1.350 ms  ┊ GC (mean ± σ):  5.87% ± 0.68%
#
#          ▁ ▁  ▄ ▃     ▆▄▃█   ▆                                
#  ▄▁▄▇▆▆▆▄█▇█▆▇█▇█▇▁▄▆▇████▇▁▇█▆▁▇▄▆▄▄▆▄▆▆▁▁▁▆▄▁▁▁▄▁▁▁▁▁▄▁▁▁▄ ▄
#  41.8 ms         Histogram: frequency by time        48.3 ms <
#
# Memory estimate: 433.19 MiB, allocs estimate: 76932.

Is there a nicer / faster way of doing this? When going to bigger datasets this does not scale very well.

Especially the index search on a sorted timestamp can be optimized, I think.

UPDATE: Benchmarks

I often use joins for this kind of operations – from FlexiJoins.jl. It’s a very general mechanism, potentially less efficient than fully optimized specialized implementations, but still performant and flexible:

julia> using FlexiJoins, StructArrays, DataPipes, IntervalSets, Statistics

julia> @p let
    data
    leftjoin((__, __), by_pred(x->x.timestamp-10..x.timestamp, ∋, :timestamp), groupby=1)
    map((;_[1]..., mean10=mean(_[2].values)))
end
# 1.272 ms (98 allocs: 2.894 MiB)
2 Likes

I think thing the grouping does not work with the newer DataFrames.

julia>  gdf = leftjoin((data2,data2), by_pred(x->(x.timestamp-10)..x.timestamp, ∋, :timestamp),groupby=1)
ERROR: ArgumentError: `DataFrame` constructor from a `Vector` of vectors requires passing :auto as a second argument to automatically generate column names: `DataFrame(vecs, :auto)`
Stacktrace:

as a workaround I can use the groupby function from DataFrames.

julia> @benchmark data.mean10_3 = @p let
           select(data,:timestamp,:values)
           leftjoin((__, __), by_pred(x->Interval{:open,:closed}(x.timestamp-10,x.timestamp), ∋, :timestamp))
           groupby(__,[:timestamp,:values])
           combine(__,[:values_1]=>mean=>:mean)
           __.mean
       end
#BenchmarkTools.Trial: 1458 samples with 1 evaluation.
# Range (min … max):  2.762 ms … 18.380 ms  ┊ GC (min … max): 0.00% …  0.00%
# Time  (median):     3.033 ms              ┊ GC (median):    0.00%
# Time  (mean ± σ):   3.423 ms ±  1.089 ms  ┊ GC (mean ± σ):  7.82% ± 12.95%
#
#  ▃▇██▇▆▅▄▃▃▂▁                      ▁ ▁▁▁▁▁ ▁                ▁
#  ██████████████▇▇█▆▆▇▆▇█▆▆▄▆▅▁▁▇▄▄▄█████████▇▇▇█▆▇▇▄▄▆▅▁▄▄▅ █
#  2.76 ms      Histogram: log(frequency) by time     6.36 ms <
#
# Memory estimate: 8.96 MiB, allocs estimate: 53895.

Yeah I don’t think it ever worked with dataframes… Overall, dataframes support in FlexiJoins.jl is pretty basic as I never really use them myself, preferring basic tables with Julian interface (vectors, structarrays). Feel free to make a PR adding grouping support for DFs!

The current df support was added because someone requested it and implementation turned out to be very simple – FlexiJoins.jl/ext/DataFramesExt.jl at master · JuliaAPlavin/FlexiJoins.jl · GitHub.

My guess is that one would have to replace

DataFrame(x isa base_eltype ? x : empty_row for x in xs)

by

DataFrame([x isa base_eltype ? x : empty_row for x in xs],:auto)

Example:

julia> a = [[1,2,3],[3,4,5]]
2-element Vector{Vector{Int64}}:
 [1, 2, 3]
 [3, 4, 5]

julia> DataFrame([x for x in a],:auto)
3×2 DataFrame
 Row │ x1     x2
     │ Int64  Int64
─────┼──────────────
   1 │     1      3
   2 │     2      4
   3 │     3      5

julia> DataFrame([x for x in a])
ERROR: ArgumentError: `DataFrame` constructor from a `Vector` of vectors requires passing :auto as a second argument to automatically generate column names: `DataFrame(vecs, :auto)`
Stacktrace:
 [1] DataFrame(vecs::Vector{Vector{Int64}})
   @ DataFrames C:\Users\pusterhofer\.julia\packages\DataFrames\58MUJ\src\dataframe\dataframe.jl:405
 [2] top-level scope
   @ REPL[58]:1

I’m not sure if the script does exactly what you ask, but… it does it quickly

from 100x to 600x Faster, but ...
julia> using DataFrames

julia> using Statistics

julia> using BenchmarkTools

julia> LEN = 10^4 # 10^7
10000

julia> data = DataFrame(
         timestamp=cumsum(rand(1:3,LEN)),
         values = cumsum(rand(1:0.1:2,LEN))
       );

julia> coalesceNothing(x,default) = isnothing(x) ? default : x    
coalesceNothing (generic function with 1 method)

julia> getFirstIndex(x,days,i) = coalesceNothing(findfirst((days .+ x .- x[i]).>0),1)
getFirstIndex (generic function with 1 method)

julia> function findls(t, ls, le)
           for idx in ls:le
               t[le]-t[idx]<10 && return idx
           end
       end
findls (generic function with 1 method)

julia> function windows(t,v,len,tr)
           r=Vector{Float64}(undef,len)
           ls=1
           for i in 1:len
               if t[i]-t[ls] >= tr
                   ls=findls(t, ls, i)
               end
               r[i] = mean(v[ls:i])
           end
           r
       end
windows (generic function with 1 method)

julia> @benchmark data.mean10 = [mean(data.values[i:-1:getFirstIndex(data.timestamp,10,i)]) for i in 1:LEN]
BenchmarkTools.Trial: 97 samples with 1 evaluation.
 Range (min … max):  32.966 ms … 83.545 ms  ┊ GC (min … max):  5.18% … 14.00%
 Time  (median):     47.397 ms              ┊ GC (median):    10.13%
 Time  (mean ± σ):   50.598 ms ± 10.511 ms  ┊ GC (mean ± σ):  11.83% ±  5.08%

               █▇▄ ▂▂
  ▅▃▁▁▁▃▁▃▆▅▅█▅███▅██▅▁▅█▆▅▃▃▆▁▆▁▅▁▁▅▃▃▁▃▃▃▁▁▃▃▃▁▃▃▁▁▁▁▃▁▁▃▁▃ ▁
  33 ms           Histogram: frequency by time        81.7 ms <

 Memory estimate: 55.97 MiB, allocs estimate: 78980.

julia> @benchmark data.w = windows(data.timestamp,data.values,LEN, 10)
BenchmarkTools.Trial: 10000 samples with 1 evaluation.
 Range (min … max):  322.300 μs …   8.207 ms  ┊ GC (min … max): 0.00% …  0.00%        
 Time  (median):     398.100 μs               ┊ GC (median):    0.00%
 Time  (mean ± σ):   488.760 μs ± 273.640 μs  ┊ GC (mean ± σ):  5.37% ± 10.45%        

   ▃▆█▅▄▃▄▄▃▃▂▁                                                 ▁
  ████████████████▇▇▇▇█▇██████▇▇▇▇▇▇▆▇▇▇▇▅▅▅▆▃▆▆▇▆▆▆▅▆▆▆▆▅▅▅▅▅▅ █
  322 μs        Histogram: log(frequency) by time       1.55 ms <

 Memory estimate: 1.06 MiB, allocs estimate: 10003.

julia> LEN = 10^5
100000

julia> data = DataFrame(
         timestamp=cumsum(rand(1:3,LEN)),
         values = cumsum(rand(1:0.1:2,LEN))
       );

julia> @benchmark data.mean10 = [mean(data.values[i:-1:getFirstIndex(data.timestamp,10,i)]) for i in 1:LEN]
BenchmarkTools.Trial: 2 samples with 1 evaluation.
 Range (min … max):  2.504 s …    3.208 s  ┊ GC (min … max): 2.56% … 2.65%
 Time  (median):     2.856 s               ┊ GC (median):    2.61%
 Time  (mean ± σ):   2.856 s ± 497.907 ms  ┊ GC (mean ± σ):  2.61% ± 0.07%

  █                                                        █
  █▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁█ ▁
  2.5 s          Histogram: frequency by time         3.21 s <

 Memory estimate: 1.60 GiB, allocs estimate: 798980.

julia> @benchmark data.w = windows(data.timestamp,data.values,LEN, 10)
BenchmarkTools.Trial: 1004 samples with 1 evaluation.
 Range (min … max):  3.795 ms … 26.344 ms  ┊ GC (min … max): 0.00% … 0.00%
 Time  (median):     4.354 ms              ┊ GC (median):    0.00%
 Time  (mean ± σ):   4.967 ms ±  1.828 ms  ┊ GC (mean ± σ):  5.41% ± 8.36%

  ▇█▅▃▁ ▁▄▆▅▄▃▂  ▁▁▁
  █████████████▇███████▆▇▆▆▄▁▁▄▆▁▁▄▆▁▅▄▅▄▁▄▆▆▆▁▆▇▄▁▁▄▁▁▁▁▄▁▄ █
  3.8 ms       Histogram: log(frequency) by time       12 ms <

 Memory estimate: 10.53 MiB, allocs estimate: 100003.

julia> 2856/4.354
655.9485530546624

julia> LEN = 10^6
1000000
      

julia> data = DataFrame(
         timestamp=cumsum(rand(1:3,LEN)),
         values = cumsum(rand(1:0.1:2,LEN))
       );

julia> @benchmark data.w = windows(data.timestamp,data.values,LEN, 10)
BenchmarkTools.Trial: 104 samples with 1 evaluation.
 Range (min … max):  41.656 ms … 117.542 ms  ┊ GC (min … max): 5.18% … 4.44%
 Time  (median):     45.485 ms               ┊ GC (median):    6.18%
 Time  (mean ± σ):   48.892 ms ±  10.045 ms  ┊ GC (mean ± σ):  5.93% ± 1.55%

    █▆▅▁
  ▃█████▆▃▃▃▃▃▃▃▃▃▃▅▃▁▃▃▃▁▁▃▃▁▁▁▁▁▁▃▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▃ ▃
  41.7 ms         Histogram: frequency by time         93.9 ms <

 Memory estimate: 105.48 MiB, allocs estimate: 1000003.

julia> LEN = 10^7
10000000

julia> data = DataFrame(
         timestamp=cumsum(rand(1:3,LEN)),
         values = cumsum(rand(1:0.1:2,LEN))
       );

julia> @benchmark data.w = windows(data.timestamp,data.values,LEN, 10)
BenchmarkTools.Trial: 11 samples with 1 evaluation.
 Range (min … max):  446.053 ms … 520.803 ms  ┊ GC (min … max): 5.41% … 4.92%
 Time  (median):     455.523 ms               ┊ GC (median):    5.65%
 Time  (mean ± σ):   462.309 ms ±  20.360 ms  ┊ GC (mean ± σ):  5.63% ± 0.37%

       █ ▃
  ▇▁▁▁▁█▁█▇▁▁▇▁▁▇▁▁▁▇▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▇ ▁
  446 ms           Histogram: frequency by time          521 ms <

 Memory estimate: 1.03 GiB, allocs estimate: 10000003.

when LEN=10^5 windows/getFirstIndex = 1500X

julia> LEN = 10^5
100000

function windows(t,v,len,tr)
    r=Vector{Float64}(undef,len)
    ls=1
    for i in 1:len
        if t[i]-t[ls] >= tr
            ls=findls(t, ls, i)
        end
        r[i] = mean(@view v[ls:i])
    end
    r
end

julia> data = DataFrame(
         timestamp=cumsum(rand(1:3,LEN)),
         values = cumsum(rand(1:0.1:2,LEN))
       );

julia> @benchmark data.w = windows(data.timestamp,data.values,LEN, 10)
BenchmarkTools.Trial: 2582 samples with 1 evaluation.
 Range (min … max):  1.367 ms …  20.850 ms  ┊ GC (min … max): 0.00% … 0.00%
 Time  (median):     1.782 ms               ┊ GC (median):    0.00%
 Time  (mean ± σ):   1.932 ms ± 829.656 μs  ┊ GC (mean ± σ):  1.62% ± 6.28%

    ▃▆▆▆█▇██▇▆▆▆▅▄▃▃▂▁▂▁▁  ▁                                  ▂
  ███████████████████████████▇▆▇▇▇▇▅▆▅▆▅▆▇▇▅▇▇▆▇▆▇▆█▆▅▆▅▆▅▅▁▅ █
  1.37 ms      Histogram: log(frequency) by time      4.01 ms <

 Memory estimate: 781.31 KiB, allocs estimate: 3.

julia> @benchmark data.mean10 = [mean(data.values[i:-1:getFirstIndex(data.timestamp,10,i)]) for i in 1:LEN]
BenchmarkTools.Trial: 2 samples with 1 evaluation.
 Range (min … max):  2.654 s …    3.031 s  ┊ GC (min … max): 3.06% … 2.96%
 Time  (median):     2.843 s               ┊ GC (median):    3.00%
 Time  (mean ± σ):   2.843 s ± 266.208 ms  ┊ GC (mean ± σ):  3.00% ± 0.07%

  █                                                        █  
  █▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁█ ▁
  2.65 s         Histogram: frequency by time         3.03 s <

 Memory estimate: 1.60 GiB, allocs estimate: 798982.

julia> 2843/1.782
1595.3984287317621
1 Like

Nice, it looks similar to my solution i used today. The join approach caused an out of memory problem as the window length was increased to 60.000 elements.

I like yours better though as I missed the if in the initial phase of the search and ended up using searchsortedfirst, which when looking at your solution is not needed. :slight_smile:

In addition, I started using VectorizedStatistics, but I didn’t benchmark it yet.

Thank you!

a feature of the proposal that, in my haste, I forgot to describe is that the search for the window is done starting from lastart (ls in the script) and proceeding “forward” rather than starting from lastend and searching “backwards”.
Obviously it depends on the structure of the data (and you could construct data that makes the strategy less effective), but, from a glance, I would say that the value sought in this way is “generally” found first.

1 Like

Yes I had it working like that also, but I forgot about the case when the I am searing inside the window itself. This happens at the start and resulted in my case in a search through the whole window.

I think this algorithm looks very good, would be useful when aggregating over unevenly sorted sorted data like timeseries, cumulative values, even censored indices.

So maybe it can be wrapped in a package.