Help improving the speed of a DataFrames operation

Hi all, quick question (hopefully) related to doing an operation on DataFrames.
I am trying to improve the transformation below so any help would be great. More context and MWE are at the bottom.

Thanks for all the help.

df_flows_subsetting_matching_to(rp, a) =
    @view df_flows[(df_flows.rp .== rp) .&& (df_flows.to .== a), :]

sum_matching(df, time_block, rp) =
    coalesce(sum(df.flow .* length.(Ref(time_block) .∩ df.time_block) * 3.14rp), AffExpr(0.0))

transform!(
    df_cons,
    [:rp, :asset, :time_block] =>
        ByRow((rp, a, T) -> sum_matching(
            df_flows_subsetting_matching_to(a, rp, T), T, rp)
        ) => :incoming_term,
)

The number of elements in the time_block columns grows to ~8000 in our current cases.

More context:

I am essentially trying to compute

\displaystyle \sum_{T_f} f_{(u,a), rp, T_f} \times |T_f \cap T_a| \times \phi(rp) \qquad \forall a, rp, T_a \in \mathcal{P}_{a, rp}

The set \mathcal{P}_{a, rp} depends on a and rp. Each variation is a line of df_cons below. Each different value of f is on df_flows.

This is related to Speeding up JuMP model creation with sets that depend on other indexes - #6 by slwu89

MWE: Including previous code

using DataFrames, JuMP

df_flows = DataFrame(;
    from = ["p1", "p1", "p2", "p2", "p1", "p2"],
    to = ["d", "d", "d", "d", "d", "d"],
    rp = [1, 1, 1, 1, 2, 2],
    time_block = [1:3, 4:6, 1:2, 3:6, 1:6, 1:6],
    index = 1:6,
)
model = Model()
df_flows.flow = [
    @variable(
        model,
        base_name = "flow[($(row.from),$(row.to)), $(row.rp), $(row.time_block)]"
    ) for row in eachrow(df_flows)
]

df_cons = DataFrame(;
    asset = ["p1", "p1", "p1", "p2", "p2", "d", "d", "d"],
    rp = [1, 1, 2, 1, 2, 1, 1, 2],
    time_block = [1:3, 4:6, 1:6, 1:6, 1:6, 1:4, 5:6, 1:6],
    index = 1:8,
)

df_flows_subsetting_matching_to(rp, a) =
    @view df_flows[(df_flows.rp .== rp) .&& (df_flows.to .== a), :]

sum_matching(df, time_block, rp) =
    coalesce(sum(df.flow .* length.(Ref(time_block) .∩ df.time_block) * 3.14rp), AffExpr(0.0))

transform!(
    df_cons,
    [:rp, :asset, :time_block] =>
        ByRow((rp, a, T) -> sum_matching(
            df_flows_subsetting_matching_to(rp, a), T, rp)
        ) => :incoming_term,
)

Good news! You will be able to get very big performance gains!

You have two problems: (1) Type instability from accessing columns inside function and (2) using df as a global variable.

Type instability:

DataFrames are “type-unstable”. That is when you write get_a(df) = df.a, just can’t infer what type of vector get_a returns.

This has a lot of benefits in terms of interactive use and general overhead. But it has performance downsides. Fortunately, the solution is simple. Write functions which accept vectors directly, not the data frame. Instead of foo(df) = df.a + df.b, write foo(a, b) and call foo(df.a, df.b)

Global variable:

Don’t use anything inside functions that are not passed to the functions themselves. These “global variables” also hurt performance.

3 Likes

Hi @pdeffebach, I changed it to the following:

transform!(
        df_cons,
        [:rp, :asset, :time_block] =>
            ByRow(
                (rp, a, time_block) -> begin
                    df = @view df_flows[(df_flows.rp.==rp).&&(df_flows.to.==a), :]
                    coalesce(
                        sum(df.flow .* length.(Ref(time_block) .∩ df.time_block) * 3.14rp),
                        AffExpr(0.0),
                    )
                end,
            ) => :incoming_term,
    )

But there are no improvements. The code is wrapped in a function.

No, you aren’t writing functions which take in vectors.

                    df = @view df_flows[(df_flows.rp.==rp).&&(df_flows.to.==a), :]

When you refer to df_flows here, that is still a global variable.

I would try to avoid working with transform and dataframes at the moment. Just write a function which takes in vectors only and only after you’ve written that, apply it to a dataframe.

Also, a plug for DataFramesMeta.jl which exports the macro @with might make the syntax a bit easier.

Also,

                    df = @view df_flows[(df_flows.rp.==rp).&&(df_flows.to.==a), :]

makes me think maybe you want either a grouped operation or maybe to leftjoin together df_cons and df_flows. But I’m not totally sure your use-case.

The code is defined inside a function, say main, that I have not written here.
The df_flows variable is defined inside that function.
Instead of writing functions, I am using the explicit access to the variable df_flows.rp. Since df_flows is a local variable, I think that using a function is not necessary here, right? I was just trying to make the example readable.

About the use case, it is a left join that I then group. I can try to explain below what I want:

Explanation in join terms:

  • leftjoin(df_cons, df_flows, on = [:rp, :asset => :to])
  • Compute the intersection of the time_block of left and right
  • Multiply the resulting value by the flow column
  • Sum flow by grouping by df_cons’ index

Per row explanation:

  • For each row of df_cons
  • Select/filter df_flows by matching rp = row.rp and to = row.asset
  • Compute the intersection of the time blocks
  • Multiply the resulting value by the flow column
  • Sum flow and return

My current solution is to not do any use of DataFrames, and just use Dictionaries to store the indices of the non-zero flows. It is slow, but around 10x faster that this version. The full context is Speeding up JuMP model creation with sets that depend on other indexes - #6 by slwu89

Any time you do df_flows.rp, that’s slow. The goal is to avoid that at all costs. Pass rp directly to a function. Keeping a lookup dict of these indices would be a good idea.

Try:

transform!(
  df_cons,
  [:rp, :asset, :time_block] =>
    ByRow((rp, a, T) -> begin
        s = AffExpr(0.0)
        for r in eachrow(df_flows)
            r.rp != rp && continue
            r.to != a && continue
            t = length(T ∩ r.time_block)
            s += r.flow * t * 3.14 * rp
        end
        s
    end
  ) => :incoming_term,
)

instead of the transform! in the OP. Sometimes all the indirection and @views cause too many unneeded copies. Especially when @viewing calculated masks.

As I mentioned above, any time you have df_flows.time_block, you are losing performance.

My suggestions are orthogonal to your comments. So yes, all the global/stability issues are important.

Peter is suggesting that instead of

sum_matching(df, time_block, rp) =
    coalesce(sum(df.flow .* length.(Ref(time_block) .∩ df.time_block) * 3.14rp), AffExpr(0.0))

you have

sum_matching(df_flow, df_time_block, time_block, rp) =
    coalesce(sum(df_flow .* length.(Ref(time_block) .∩ df_time_block) * 3.14rp), AffExpr(0.0))

and call the function as sum_matching(df.flow, df.time_block, time_block, rp) instead of sum_matching(df, time_block, rp). When you provide the DataFrame columns as arguments, sum_matching can specialise on the types of df.flow and df.time_block, while sum_matching(df,...) can’t do that as it doesn’t know what the types of df.flow and df.time_block are when the function gets called with just a DataFrame as argument.

This is a design decision in DataFrames to allow dynamism and make sure compile times don’t blow up (as they would if you had a type stable DataFrame{::TypeOfColumn1, ::TypeOfColumn2,...} object instead). If you insist on passing tables to your functions you either need to have a function barrier (so essentially something like they 4-arg sum_matching above inside the 3-arg version) or use a different kind of table, e.g. GitHub - JuliaData/TypedTables.jl: Simple, fast, column-based storage for data analysis in Julia

1 Like

Another way to phrase this:

sort(   # keep order by index
  combine(   # combine flow sub-expressions
    groupby(    # group subexpression for each con
      leftjoin(df_cons, df_flows, on=[:rp, :asset => :to];
        makeunique=true),    # associate flows with cons
      [:asset, :rp, :time_block, :index]),
    [:time_block, :time_block_1, :flow, :rp] => ((t,t1, fl, rp) ->
      sum(3.14 * length(t[i] ∩ t1[i]) * rp[i] * fl[i] for 
        i in 1:length(fl) if !ismissing(fl[i]); # create expression
        init=AffExpr(0.0))) => :incoming_term), # start with 0.0
  :index)

or using Chains.jl package:

@chain df_cons begin
    leftjoin(df_flows, on=[:rp, :asset => :to], makeunique=true)
    groupby([:asset, :rp, :time_block, :index])
    combine([:time_block, :time_block_1, :flow, :rp] => ((t,t1, fl, rp) ->
      sum(3.14 * length(t[i] ∩ t1[i]) * rp[i] * fl[i] for 
      i in 1:length(fl) if !ismissing(fl[i]);
      init=AffExpr(0.0))) => :incoming_term)
    sort(:index)
end

That certainly is another way to phrase it… but it would be better to write this out in a series of discrete steps to make it more readable.

Because groupby - combine is multithreaded, this new form may offer a performance advantage.

A variant that is entirely inside DataFrames and that calculates only “non-trivial (non-zero length)” data


df_ij0=innerjoin(df_cons, df_flows, on = [:rp, :asset => :to], makeunique=true) 


v(x)= map(x->@variable(
    model,
    base_name = "flow[($(x[1]),$(x[2])), $(x[3]), $(x[4])]"
), x)


df_ijs=select(df_ij0,[3,6]=>((x,y)->length.(x.∩ y).*3.14)=>:len,
        [5,1,2,6]=>((x...)->v(zip(x...)))=>:flow,[:index]) 


combine(groupby(df_ijs,:index), [1,2]=> (x,y)->sum(x.*y))
#or
# combine(groupby(df_ijt,:index), [1,2]=> (x,y)->x'*y)



Hi all, thanks for the suggestions. I tried them that I am still not having significant improvements. The fastest version I have right now is converting to columntable and iterating over the lines with Tables.rows. I have three versions with that, more or less similar, that end up being about twice as fast as the best normal data frame version.
Also, I am filtering the null intersection better as well.

Some replies:
@Dan, adding elements of df_flows per row is slow because of the nature of the flow column (JuMP expressions).

@pdeffebach and @nilshg , thanks for the clarification. Unfortunately, it is much different from what I have, so maybe I am still doing something wrong.

@Dan and @rocco_sprmnt21, the join approaches don’t work for the larger sizes (~100k rows for each DF, at the moment). My VSCode simply shuts down.

This is the fastest version so far:

tbl_flows = Tables.columntable(df_flows)
tbl_cons = Tables.columntable(df_cons)
incoming = Vector{AffExpr}(undef, length(tbl_cons.asset))
for row in Tables.rows(tbl_cons)
    incoming[row.index] = AffExpr(0.0)
    idx = findall(
        (tbl_flows.rp .== row.rp) .&&
        (tbl_flows.to .== row.asset) .&&
        (last.(tbl_flows.time_block) .≥ row.time_block[1]) .&&
        (first.(tbl_flows.time_block) .≤ row.time_block[end]),
    )
    if length(idx) > 0
        incoming[row.index] = sum(
            tbl_flows.flow[idx] .* length.(Ref(row.time_block) .∩ tbl_flows.time_block[idx]) *
            3.14row.rp,
        )
    end
end

I still think that a version with leftjoin and DataFramesMeta.jl’s @with is going to be the simplest and most effective.

But for your current proposal

    idx = findall(
        (tbl_flows.rp .== row.rp) .&&
        (tbl_flows.to .== row.asset) .&&
        (last.(tbl_flows.time_block) .≥ row.time_block[1]) .&&
        (first.(tbl_flows.time_block) .≤ row.time_block[end]),

This could be re-doing a lot of work over and over again. Are you sure there isn’t a better way to cache this? It’s hard to tell exactly without a full MWE>

@pdeffebach: I still think that a version with leftjoin and DataFramesMeta.jl’s @with is going to be the simplest and most effective.

See my comment above, for ~100k rows in each data frame, my VSCode crashed. Running on terminal gave me segfault.

It’s hard to tell exactly without a full MWE

Is the code I put on the first post not working or do you mean a large case?

And once more, the context is another problem posted here: Speeding up JuMP model creation with sets that depend on other indexes

I currently don’t use anything close to a data frame. Instead, I have a dictionary to compute the indexes that I have to sum for each row of df_cons.
My expectations are that using a linear structure will be better (df_cons). However, computing the incoming column of the linear structure is proving to be slower than the dictionary approach.
I.e., JuMP is slower because I have the dictionaries instead of the linear indexes, but computing the linear indexes are slower than JuMP, so maybe it is what it is.

I think the fact that your left_join is not working is indicative of some other problem that’s worth exploring. Is left_join trying to create a colossal number of rows? Why could that be? Do your indices not match as much as you thought? Are there missing or NaN values you have to worry about? Are you joining on a float column? What happens if you do inner_join?

They don’t match very often, because of the time_blocks. The number of assets/to values and rp values is small, but the number of time_blocks is very large.

Ignoring all else, you can thinks of it this way:

cons_time_blocks = [1:2, 3:4, ..., 997:998, 999:1000]
flow_time_blocks = [1:3, 4:6, ..., 996:999, 1000:1000]

I.e., each time_blocks array is a partitions of 1:1000. In this example, the cons_time_blocks is the partition taking 2 points at a time, and flow_time_blocks is a partition taking 3 points at a time.

Now, for each index of cons_time_blocks, I want every flow_time_blocks that intersect with it. This is the essence of the problem.