Threading support for DataFrames transforms

With regard to multi-thread support with DataFrames, I understand that it remains a WIP.

I wanted to validate whether some situations that appear, at least on the surface, like good candidates for to benefit from multi-threading were on the radar or subject to some other constraints.

Transformation involving a vector of operations.

When multiple transforms are passed as arguments to a df, wouldn’t it be safe to @threads over that vector of operators?

For example:

using DataFrames
using BenchmarkTools
nrows = 1_000_000
df = DataFrame(id=rand(["A"], nrows), v1=rand(nrows), v2=rand(nrows), v3=rand(nrows), v4=rand(nrows))

f1 = "v1" => ByRow(exp) => "new1"
f2 = "v2" => ByRow(exp) => "new2"
f3 = "v2" => ByRow(exp) => "new3"
f4 = "v2" => ByRow(exp) => "new4"

funs = [f1, f2, f3, f4]

function df_trans_A(df, funs)
    transform!(df, funs[1])
    transform!(df, funs[2])
    transform!(df, funs[3])
    transform!(df, funs[4])
end

function df_trans_B(df, funs)
    transform!(df, funs)
end

# 19.500 ms (644 allocations: 30.55 MiB)
@btime df_trans_A($df, $funs);

# 19.325 ms (410 allocations: 30.54 MiB)
@btime df_trans_B($df, $funs);

We can see that currently, calling 4 times transform!, one for each of the transform, is equivalent to calling transform! on the vector operations, which could have been safely processed in parallel.

Performance regression on grouped dataframes

Following the above example, if performing the same transformations over a dataframe grouped on the id, the same performance could reasonable be expected since there’s only a single group (id="A").
However, it appears that the sequence of transformation is almost 3x slower, while the transform on the vectors of operators takes a similar number of time.

gdf = groupby(df, "id")

function gdf_trans_A(gdf, funs)
    transform!(gdf, funs[1])
    transform!(gdf, funs[2])
    transform!(gdf, funs[3])
    transform!(gdf, funs[4])
end

function gdf_trans_B(gdf, funs)
    transform!(gdf, funs)
end

# 56.028 ms (2963 allocations: 152.76 MiB)
@btime gdf_trans_A($gdf, $funs);

# 19.652 ms (1124 allocations: 129.76 MiB)
@btime gdf_trans_B($gdf, $funs);

Also, if the same experience is repeated but with 10 groups, the results further deteriorates for both methods, although the underlying operations remain the same (byrow exp of each element):

# 10 groups
# 114.379 ms (3284 allocations: 195.40 MiB)
@btime gdf_trans_A($gdf, $funs);
# 49.737 ms (1444 allocations: 172.41 MiB)
@btime gdf_trans_B($gdf, $funs);

Multiple operators on Combine

Similar to the situation with transform! but with combine where threading seems not being taken advantage of:

using DataFrames
using BenchmarkTools
nrows = 1_000_000
df = DataFrame(id=rand(["A", "B", "C", "D", "E", "F", "G", "H" ,"I", "J"], nrows), v1=rand(nrows), v2=rand(nrows), v3=rand(nrows), v4=rand(nrows))
df = DataFrame(id=rand(["A"], nrows), v1=rand(nrows), v2=rand(nrows), v3=rand(nrows), v4=rand(nrows))

f1 = "v1" => sum => "new1"
f2 = "v2" => sum => "new2"
f3 = "v2" => sum => "new3"
f4 = "v2" => sum => "new4"

funs = [f1, f2, f3, f4]

function df_trans_A(df, funs)
    dfg = groupby(df, :id)
    agg = combine(dfg, funs[1])
    agg = combine(dfg, funs[2])
    agg = combine(dfg, funs[3])
    agg = combine(dfg, funs[4])
end

function df_trans_B(df, funs)
    dfg = groupby(df, :id)
    agg = combine(dfg, funs)
end

# 1 Group
# 26.565 ms (1067 allocations: 31.33 MiB)
@btime df_trans_A($df, $funs);
# 17.194 ms (546 allocations: 31.29 MiB)
@btime df_trans_B($df, $funs);

# 10 Groups
# 18.536 ms (1068 allocations: 31.33 MiB)
@btime df_trans_A($df, $funs);
# 15.977 ms (546 allocations: 31.29 MiB)
@btime df_trans_B($df, $funs);

function df_trans_A(df, funs)
    agg = combine(df, funs[1])
    agg = combine(df, funs[2])
    agg = combine(df, funs[3])
    agg = combine(df, funs[4])
end

function df_trans_B(df, funs)
    agg = combine(df, funs)
end

# 1.664 ms (444 allocations: 28.44 KiB)
@btime df_trans_A($df, $funs);
# 1.640 ms (382 allocations: 21.69 KiB)
@btime df_trans_B($df, $funs);

# 1.665 ms (444 allocations: 28.44 KiB)
@btime df_trans_A($df, $funs);
# 1.631 ms (382 allocations: 21.69 KiB)
@btime df_trans_B($df, $funs);

To sum up, are the above situations effectively good candidates for parallelization in DataFrames?

If the transformation is thread-safe it would be safe and design is made to ensure this is done the way you want in the future.

For GroupedDataFrame we currently use multi-threading already for a sequence of operations.
The reason is that usually operations on GroupedDataFrame are more expensive so it was a higher priority to parallelize them.

combine under the hood does not differ from transform or select. It is parallelized for GroupedDataFrame but sequential for AbstractDataFrame.


Conclusions:

  1. The operations you do are in general very cheap. Therefore the results are influenced by pre/post processing steps that we do (like setting up proper structure of DataFrame object).
  2. The assumption that some operation can be safely done in multi-threading mode is fragile. If you passed a function that is not thread safe then actually we should not run the code in parallel.
  3. When designing multi-threading we need to take composability with other packages (like Dagger.jl) into account.
  4. What is currently supported in terms of multi-threading in DataFrames.jl is described at Functions · DataFrames.jl.

In short term this means that if you want multi-threading in all cases (and you know it can be safely done) I recommend you to do @spawn or similar manually (I know it is not a 100% satisfying solution).

In the long run all you ask for will land in DataFrames.jl but it will not be soon (i.e. not in 1.4 release). However, I hope we can agree on the API in 1.4 release. The relevant discussion is in this PR Add a keyword argument to disable multithreading by nalimilan · Pull Request #3030 · JuliaData/DataFrames.jl · GitHub.

In particular, before moving forward, the crucial issue is that one mentally needs to separate two things:

  • if the operation you want to run must be run sequentially or potentially it is allowed to be run in parallel;
  • how much parallelism allow DataFrames.jl to use (i.e. even if the operation is thread safe you potentially might not want to turn on multi-threading because you are running some other operations in parallel that you do not want to be disturbed).

These are quite complex decisions to be made (with far reaching consequences for the whole ecosystem) so we do not want to rush with making some choice and regretting it later (actually we should have had this discussion before enabling multi-threading for GroupedDataFrame). You are welcome to share your thoughts/expectations in that PR so that we can end up with a solution that is useful and useable.

For the time being - as I have commented - users wanting multi-threading in all cases should implement it manually (which is not that hard, but of course it is not an ideal solution).

4 Likes

It would probably make sense to automatically spawn one task for each operation passed to transform/select/combine, like we already do when a GroupedDataFrame is passed. Though as @bkamins noted the operations you showed are cheap, so the gain will be limited due to the overhead of spawning a task. In general it’s hard to get large performance improvements with multithreading unless operations take a relatively long time.

2 Likes

Yes, this can be added, but I would wait till we make a decision on in https://github.com/JuliaData/DataFrames.jl/pull/3030 about the general API design.

Let me add a different dimension to the discussion here. Sometimes users are only interested in combining existing transforms from a collection of well-known transforms that have been carefully implemented with Transducers.jl parallelism. TableTransforms.jl fits that purpose:

Our ultimate goal is to keep adding more forms of parallelism in a way that doesn’t affect the end user. You can just expect that these transforms will get faster and faster with time and new contributors.

Would you have an example of such implementation?
I guess I misunderstand the mechanics associated with DF operations, so these 2 following trials resulted in applying only one randomly chosen transform out of the 4.

function df_trans_C(df, funs)
    @threads for fun in funs
        transform!(df, fun)
    end
    return df
end

function df_trans_D(df, funs)
    ts = Dict{Int, Task}()
    @sync for i in 1:length(funs)
        ts[i] = @spawn transform!(df, funs[i])
    end
    @sync for i in 1:length(funs)
        @async wait(ts[i])
    end
    return nothing
end

You should not use transform! then but use standard Base Julia low-level operations like:

sl = SpinLock()
@threads for (incol, outcol, fun) in collect(zip(incols, outcols, funs))
    src = df[!, incol]
    dst = fun(src)
    lock(sl)
    df[!, outcol] = dst
    unlock(sl)
end
1 Like

Wow, 3.5X speedup, thanks very much @bkamins!

function df_trans_E(df, funs)
    sl = SpinLock()
    @threads for fun in funs
        incol = fun[1]
        outcol = fun[2][2]
        f = fun[2][1]
        src = df[!, incol]
        dst = f(src)
        lock(sl)
        df[!, outcol] = dst
        unlock(sl)
    end
end
julia> @btime df_trans_E($df, $funs);
  5.213 ms (1066 allocations: 30.60 MiB)

vs original:

@btime df_trans_B($df, $funs);
 19.325 ms (410 allocations: 30.54 MiB)

It seems that Inmemorydatasets has the THREAD, why not use it whenever you need multi-threading?

Can I use TableTransforms.jl on data loaded in a DataFrame (DataFrames.jl)? If yes, can you give an example?

Yes, TableTransforms.jl works with any Tables.jl object, including DataFrame. Please check the docs for examples.