How to improve code to compute the mean of values grouped by DateTime?

I have written the piece of code below. It takes a vector of DateTime, a vector of Float64, and an integer g as input to calculate the mean of the the type Float64 by grouped DateTime type given time granularity g.

In this example, the data starts from Jan. 1st, 2024 at midnight and goes for seven days in increments of 11 seconds. I choose g = 15 and set out to find out how many rows the grouped data will have. I do that with a while-loop. Once I have the number of rows, I create an array to store the output and compute the mean per groups. I use the for-loop to do that. Specifically, I begin with the first available DateTime and increment it by 15 seconds. Then, I group the date (Or should I say filter? Subset?) and the compute the mean, assigning results to the array I created. You’ll notice I also an if-statement to deal with groups/filtrations/subsets that are empty.

This works as intended, but it takes a lot of time. Can you think of a way to make this code more efficient so it runs faster? All help is appreciated.

# Data as is 
timestamp = collect(DateTime(2024,01,01,00,00,00):Second(11):DateTime(2024,01,07,00,00,10));
data = rand(length(timestamp))

# Define granularity by which to aggregate data 
g = 15 # 15s

# Define number of groups to be aggregated. 
# With row_count, set dimensions of agg_data Array

x = DataFrame(date=timestamp; value=data)
test = true
row_count = 0
a = first(x.date)
b = last(x.date)

while test
   if a + Second(g) <= b
      row_count = row_count + 1
      a = a + Second(g)
   else
      test = false
   end
end

# Initialize arrays and parameters
agg_data = Array{Any}(undef, row_count, 2)
start = first(x.date)

for i in 1:row_count
    # Define the time interval and filter data
    agg_data[i, 1] = start
    x_subset = x[start.<=x.date.<start+Second(g), :]

    # If said interval is empty, ie, nrow = 0, then assume the mean for 
    # time interval is zero. 
    if nrow(x_subset) != 0
        agg_data[i, 2] = mean(x_subset.value)
    else
        agg_data[i, 2] = 0
    end

    # Update start
    start = start + Second(g)
end



Running in the global scope is usually does not produce good performance, putting your method in functions

using DataFrames
using Statistics
using Dates
using BenchmarkTools

function generate_test_data()
    # Data as is
    timestamp = collect(
        DateTime(2024, 01, 01, 00, 00, 00):Second(11):DateTime(2024, 01, 07, 00, 00, 10),
    )
    data = rand(length(timestamp))

    # Define granularity by which to aggregate data
    g = 15 # 15s

    x = DataFrame(date = timestamp; value = data)
    return x, g
end

function count_bins(x, g)

    # Define number of groups to be aggregated.
    # With row_count, set dimensions of agg_data Array


    test = true
    row_count = 0
    a = first(x.date)
    b = last(x.date)

    while test
        if a + Second(g) <= b
            row_count = row_count + 1
            a = a + Second(g)
        else
            test = false
        end
    end
    return row_count
end

function aggregate(x, row_count, g)
    # Initialize arrays and parameters
    agg_data = Array{Any}(undef, row_count, 2)
    start = first(x.date)

    for i = 1:row_count
        # Define the time interval and filter data
        agg_data[i, 1] = start
        x_subset = x[start .<= x.date .< start + Second(g), :]

        # If said interval is empty, ie, nrow = 0, then assume the mean for
        # time interval is zero.
        if nrow(x_subset) != 0
            agg_data[i, 2] = mean(x_subset.value)
        else
            agg_data[i, 2] = 0
        end

        # Update start
        start = start + Second(g)
    end
    return agg_data
end

function runtest()
    x, g = generate_test_data()
    row_count = count_bins(x, g)

    agg = aggregate(x, row_count, g)
end

produces a result that ~3s on my computer.

julia> @btime runtest();
  3.355 s (1244201 allocations: 731.14 MiB)

However, there is a much simper way using the tools in DataFrames.jl

function dataframes_agg()
    x, g = generate_test_data()
    # Add a new column, rounded_date, your timestamp rounded to the desired
    # granularity.
    x_ = transform(x, :date=>ByRow(x->round(x, Second(g)))=>:rounded_date)
    # Generate groups with the same rounded_date
    grps = groupby(x_, :rounded_date)
    # Combine the groups using the first date and the mean of the values
    agg_ = combine(grps, :date=>first, :value=>mean; renamecols=false)
    # select only the date and mean value columns
    agg = select(agg_, [:date, :value])
end
julia> @btime dataframes_agg();
  2.509 ms (540 allocations: 5.68 MiB)
2 Likes

There must be a few ways of achieving this. And writing an explicit loop is a good and efficient method. Here is another:

### initializing test data:

# Data as is 
timestamp = collect(DateTime(2024,01,01,00,00,00):Second(11):DateTime(2024,01,07,00,00,10));
data = rand(length(timestamp))

g = 15 # 15s

### the suggested method:

using MappedArrays
import IterTools as Itr

uts = mappedarray(datetime2unix, timestamp)
agg_data = map(x->mean(data[x]),
  Itr.groupby(i->(uts[i]-uts[1])÷g, eachindex(uts)))

Explanation: The mappedarray construct allows looking at timestamps as seconds since epoch (1970) and this making division by g easy. The IterTools groupby partitions indices according to division by g of timestamp. The final map looks up the indices in data and finds the mean.

For better speed, we can define a function:

function find_agg(uts, data, g)
    N = 1+floor(Int,(last(uts)-first(uts))÷g)
    agg_data = zeros(N)
    agg_count = fill(-1, N)
    for i in eachindex(uts)
        idx = 1+floor(Int,(uts[i]-uts[1]))÷g
        agg_data[idx] += data[i]
        agg_count[idx] = ifelse(agg_count[idx] < 0, 1, agg_count[idx]+1)
    end
    agg_data ./= agg_count
    return agg_data
end

and this benchmarks:

julia> @btime find_agg($uts, $data, $g);
  865.197 μs (4 allocations: 540.22 KiB)

which is about 5x the other solution here.

3 Likes

I know functions are incentivized as a way to write code, but I didn’t know one would make the code run that much faster. I also appreciate your answer using DataFrames.jl. For years, R was my go-to tool to wrangle data, and I’m still far from proficient in using Julia for that end. I’ll go through your answer carefully to get the gist of it. Thank you.

Thank you, Dan, for your contribution.

I tried running the line x_ = transform(x, :date=>ByRow(x->round(x, Second(g)))=>:rounded_date) and I got the following error back.

The error message reads a bit cryptic to me. Any idea of what’s going on?

ERROR: MethodError: no method matching transform(::Matrix{Any}, ::Pair{Symbol, Pair{ByRow{var"#12#13"}, Symbol}})

Closest candidates are:
  transform(::AbstractDataFrame, ::Any...; copycols, renamecols, threads)
   @ DataFrames C:\Users\abc\.julia\packages\DataFrames\58MUJ\src\abstractdataframe\selection.jl:1379
  transform(::GroupedDataFrame, ::Union{Regex, AbstractString, Function, Signed, Symbol, Unsigned, Pair, Type, All, Between, Cols, InvertedIndex, AbstractVecOrMat}...; copycols, keepkeys, ungroup, renamecols, threads)
   @ DataFrames C:\Users\abc\.julia\packages\DataFrames\58MUJ\src\groupeddataframe\splitapplycombine.jl:912

Stacktrace:
 [1] top-level scope
   @ REPL[20]:1

Looks like x is a Matrix, not a DataFrame in this traceback. How did you create x? I followed your example and used

x = DataFrame(date = timestamp; value = data)

in generate_test_data()

That’s what got me. I created x as a DataFrame. I’ll look into it and see what is going on. If I find the error, I’ll post it here.