Two-sided moving average with intermittent missing data:

I think I’m over-thinking this issue I’m having when trying to use a two-sided moving average.

All I want to do is create a two-sided moving window that “doesn’t care” about missing data in my GPS timeseries.

As long as there is data within the window, it can fill in the missing data from the existing data within my window, assuming that the gap in data is not greater than the size of the window.

My last attempt was this:

   function centered_average(data, n)
        avg = Vector{Union{Missing, Float64}}(missing, length(data))
        for i in eachindex(data)
            start_idx = max(1, i - div(n - 1, 2))
            end_idx = min(length(data), i + div(n - 1, 2))
            window = data[start_idx:end_idx]
            valid_values = [x for x in window if !ismissing(x)] 
            if !isempty(valid_values)
                avg[i] = round(mean(valid_values), digits=5)
            end
        end
        return avg
    end

But I am still ending up with missing data in the middle of some of my GPS data for gaps that are definitely less than 365 days, which you can see below for the brown-orange line with the big gap. I’m not sure what to do.

It feels like it should be a fairly straight forward process, but alas, here I am.

I leftjoin my GPS data with my decimal year file, that contains every epoch within my desired timeframe:

    df_merged = sort(leftjoin(all_dates_df, df, on = :decyr), [:decyr])

and then all I want is to run my two-sided window over the entire timeseries, gaps included. My window length is 365 days, which will give me 182-days on either side.

I feel like this shouldn’t have been taking up as much brainpower as it has lol

I can’t upload my .txt or .csv files, and I’m not sure how to share the data via Juliahub, but, here what my dataset should be under in JuliaHub if you’re inclined to try for yourself. AZU1 is a GPS station in Southern California with a gap occurring between 2003.3949 and 2005.5749. And the decimal year file has every day between 2000.000 and 2009.9986 – to enable the leftjoin.

dataset(“robfuller924/AZU1”)
dataset(“robfuller924/decimal_year”)

Thank you in advance for any help or insight to my clear skill issue at hand.

I typically use joins for this kind of moving average:

using FlexiJoins, IntervalSets, Statistics, GLMakie

# data:
tbl = [
	(x=10*rand(), y=randn())
	for _ in 1:50
]

# target x coordinates:
xs = range(0, 10, length=100)

# join xs + tbl when x values are within 0.2 from each other:
J = leftjoin(
	(;xs, tbl),
	by_pred(x->x±0.2, ∋, r->r.x),
	groupby=:xs
)

# average within each window, putting NaN if no data:
# (can filter and remove those rows, alternatively)
Jagg = map(J) do j
	(x=j.xs, y=isempty(j.tbl) ? NaN : mean(r -> r.y, j.tbl))
end

# plot data and averages:
scatter(map(Tuple, tbl), markersize=5)
lines!(map(Tuple, Jagg))

instead of filtering or putting NaN is there a way to basically fill-in the gaps in the the data with whatever data is contained within the window still? That’s the issue I’m having currently.

Your moving average seems to be based on an integer index. Are the samples equally spaced in time and at what rate?

In my example, whenever there is any data present in the window, it gets averaged. NaNs are only for cases without any datapoints.

Each data point represents a single day. And I’m looking at GPS data collected over a 10-year time span (2000 - 2010).

Your data gaps appear to be less than 365 days after the application of your moving average, but are you sure they were before?

This can be solved relatively easily with skipmissing and emptymissing in Missings.jl

julia> function centered_average(data, n)
            avg = Vector{Union{Missing, Float64}}(missing, length(data))
            for i in eachindex(data)
                start_idx = max(1, i - div(n - 1, 2))
                end_idx = min(length(data), i + div(n - 1, 2))
                window = @view data[start_idx:end_idx]
                avg[i] = emptymissing(mean)(skipmissing(window))
            end
            return avg
        end;

julia> x = [rand() < 0.9 ? missing : (rand() - 1 + i / 100) for i in 1:1000];

julia> t = centered_average(x, 5);

julia> lines(t)

emptymissing(fun) wraps fun and does a quick check to see if the input is empty and, if empty, returns missing. skipmissing is function that returns an iterator of the non-missing values in a collection.

I attempted to implement your strategy, but, I’m not sure where I’m going wrong. I do see that in yours you’re not using DataFrames, would that be the reason my doesn’t work? Sorry if I’m missing something simple here. I’m not great at this. lol

Here is what I attempted to do:

const WIN_HALF = 182

# Function to process a single file
function process_file(file, output_path, all_dates_df)
    stat = (basename(file))[1:4]

    df = CSV.read(file, DataFrame)
    tbl = [df.decyr df.value]
    xs = df_all_dates

    dropmissing!(df)

    J = leftjoin(
        (;xs, tbl),
        by_pred(x -> x ± WIN_HALF, in, r -> r.tbl[1]),
        groupby = :xs,
        mode = FlexiJoins.Mode.NestedLoop()
    )

    Jagg = map(J) do j
        (j.xs, isempty(j.tbl) ? missing : mean(r -> r.tbl[2], j.tbl))
    end
    
    df_merged = leftjoin(all_dates_df, DataFrames.DataFrame(Jagg), on = :decyr)
    sort!(df_merged, [:decyr])
    CSV.write(joinpath(output_path, "$(stat)_medave.csv"), df_merged)
end

Which throws me this error:

ERROR: MethodError: no method matching keytype(::DataFrame)
The function `keytype` exists, but no method is defined for this combination of argument types.

Closest candidates are:
  keytype(::Type{Union{}}, Any...)
   @ Base abstractarray.jl:189
  keytype(::Type{DataStructures.SortedMultiDict{K, D, Ord}}) where {K, D, Ord<:Base.Order.Ordering}
   @ DataStructures ~/.julia/packages/DataStructures/95DJa/src/sorted_multi_dict.jl:292
  keytype(::Type{DataStructures.SortedSet{K, Ord}}) where {K, Ord<:Base.Order.Ordering}
   @ DataStructures ~/.julia/packages/DataStructures/95DJa/src/sorted_set.jl:184
  ...

Stacktrace:
  [1] (::FlexiJoins.var"#108#109"{1})(i::Int64, X::DataFrame, nms::FlexiJoins.Drop)
    @ FlexiJoins ~/.julia/packages/FlexiJoins/OB7he/src/ix_compute.jl:65
  [2] map
    @ ./tuple.jl:406 [inlined]
  [3] create_ix_array(datas::Tuple{…}, nonmatches::Tuple{…}, _groupby::Val{…})
    @ FlexiJoins ~/.julia/packages/FlexiJoins/OB7he/src/ix_compute.jl:64
  [4] _joinindices(datas::Tuple{…}, cond::FlexiJoins.ByPred{…}, multi::Tuple{…}, nonmatches::Tuple{…}, groupby::Val{…}, cardinality::Tuple{…}, mode::FlexiJoins.Mode.NestedLoop, cache::Nothing, loop_over_side::Val{…})
    @ FlexiJoins ~/.julia/packages/FlexiJoins/OB7he/src/joins.jl:83
  [5] _joinindices(datas::Tuple{…}, cond::FlexiJoins.ByPred{…}, multi::Tuple{…}, nonmatches::Tuple{…}, groupby::Val{…}, cardinality::Tuple{…}, mode::FlexiJoins.Mode.NestedLoop, cache::Nothing, loop_over_side::Nothing)
    @ FlexiJoins ~/.julia/packages/FlexiJoins/OB7he/src/joins.jl:60
  [6] _joinindices(datas::@NamedTuple{…}, cond::FlexiJoins.ByPred{…}; multi::Nothing, nonmatches::Tuple{…}, groupby::Symbol, cardinality::Nothing, mode::FlexiJoins.Mode.NestedLoop, cache::Nothing, loop_over_side::Nothing)
    @ FlexiJoins ~/.julia/packages/FlexiJoins/OB7he/src/joins.jl:45
  [7] joinindices(datas::@NamedTuple{…}, cond::FlexiJoins.ByPred{…}; kwargs::@Kwargs{…})
    @ FlexiJoins ~/.julia/packages/FlexiJoins/OB7he/src/joins.jl:35
  [8] _flexijoin(datas::@NamedTuple{…}, cond::FlexiJoins.ByPred{…}; kwargs::@Kwargs{…})
    @ FlexiJoins ~/.julia/packages/FlexiJoins/OB7he/src/joins.jl:30
  [9] flexijoin(datas::@NamedTuple{…}, args::FlexiJoins.ByPred{…}; kwargs::@Kwargs{…})
    @ FlexiJoins ~/.julia/packages/FlexiJoins/OB7he/src/joins.jl:27
 [10] leftjoin(datas::@NamedTuple{…}, args::FlexiJoins.ByPred{…}; kwargs::@Kwargs{…})
    @ FlexiJoins ~/.julia/packages/FlexiJoins/OB7he/src/joins.jl:13
 [11] process_file(file::String, output_path::String, all_dates_df::DataFrame)
    @ Main ~/Documents/julia2/julia_scripts/average/centered_average.jl:32
 [12] top-level scope
    @ ~/Documents/julia2/julia_scripts/average/centered_average.jl:51
Some type information was truncated. Use `show(err)` to see complete types.

Which, based on the the error output, my error is occurring here, somewhere:

    J = leftjoin(
        (;xs, tbl),
        by_pred(x -> x ± WIN_HALF, in, r -> r.tbl[1]),
        groupby = :xs,
        mode = FlexiJoins.Mode.NestedLoop()
    )

Maybe this poorly drawn visualization I made will help illustrate what, in my mind, I was trying to make happen. lol

In my head, I have a static window that simply rolls along my data and takes the average, and if it encounters a spot that is missing or NaN it will just populate that space with data from whatever data is present in my window. Oh, and the _ at the beginning of my drawing should have zeros in them, I just didn’t think about it when I was quickly drawing it.

Please correct me if I’m wrong, but is your implementation essentially “chopping” up the dataset into chunks that fit the parameter?

Again, could you please show your larger data gap before your moving average?

Here is an example of a timeseries with a data gap before any averaging has been done. This is residual timeseries.

And here you can see my median and average of this dataset – where the flat line corresponds to the missing data.

Okay, I think I figured it out.

I ended up modifying the original script – and all I did was got rid of valid_values = [x for x in window if if !ismissing(x)].

That’s what I get for using Gemini to help me code.

Here is the complete function that takes the windowed average and median – and replaces any missing values with data can either copy from other rows, or just replaces with NaN, since all I really need are the average and median after this point:

const WIN_SIZE = 365

# Function to process a single file
function process_file(file, output_path, all_dates_df)
    stat = (basename(file))[1:4]

    df = CSV.read(file, DataFrame)
    dropmissing!(df, :decyr)

    # merge with all dates
    df_merged = sort(leftjoin(all_dates_df, df, on = :decyr), [:decyr])
    decyr = df_merged.decyr
    value = df_merged.value

    averaged = Vector{Union{Missing, Float64}}(missing, length(value))
    medianed = Vector{Union{Missing, Float64}}(missing, length(value))

    for i in eachindex(value)
        
        start_idx = max(1, i - div(WIN_SIZE - 1, 2))
        end_idx = min(length(value), i + div(WIN_SIZE - 1, 2))
        window = value[start_idx:end_idx]

        no_miss = skipmissing(window)

        if !isempty(no_miss)
            averaged[i] = round(mean(no_miss), digits = 5)
            medianed[i] = round(median(no_miss), digits = 5)
        end
    end

    if "lon" in names(df_merged) && "lat" in names(df_merged) && "value" in names(df_merged) && "sigma" in names(df_merged)
        if any(ismissing.(df_merged.lon)) || any(ismissing.(df_merged.lat)) || any(ismissing.(df_merged.value)) || any(ismissing.(df_merged.sigma))
            first_valid_lon = findfirst(!ismissing, df_merged.lon)
            first_valid_lat = findfirst(!ismissing, df_merged.lat)
            df_merged.value = coalesce.(df_merged.value, NaN)  
            df_merged.sigma = coalesce.(df_merged.sigma, NaN) 
            if first_valid_lon !== nothing
                df_merged.lon = fill(df_merged.lon[first_valid_lon], nrow(df_merged))
            end
            if first_valid_lat !== nothing
                df_merged.lat = fill(df_merged.lat[first_valid_lat], nrow(df_merged))
            end
        end
    end

    df_merged[!, :average] = averaged
    df_merged[!, :median] = medianed
    df_merged[!,:stat] .= stat

    CSV.write(joinpath(output_path, "$(stat)_medave.csv"), df_merged)
end

Then, plotted to confirm that the median and averages populated correctly:

Here, especially at the beginning, the plots are little wonky due to the amount of data within the window in the beginning, but the median and averages look to have interpolated nicely in the smaller gap in the middle.

Thank you everyone for your help :slight_smile: I really appreciate it!

As we can see, and contrary to your initial statement, the gaps are larger than 365 days.

Incorrect, the gap in AHID has a gap of .350, or approx 127 days.