Multi-threading with DataFrames

Hi

I am trying to figure out how multi-threading works with data frames.

Example 1:

using DataFrames, ProgressMeter

df = DataFrame(reshape(1:50_000_000, :, 25), :auto);
df.id1 = repeat(1:10_000, 200);
df.id2 = string.(df.id1);

g = Int[];
gdf = groupby(df, :id1);

progress = Progress(length(gdf))
res = transform(gdf, :x1 => (x -> (
                                    t = Threads.threadid();
                                    push!(g, t);
                                    sleep(0.001);
                                    ProgressMeter.next!(progress; showvalues = [(:iter, t)]); t
                                    ) => :tid));
unique(g)

Output:

4-element Vector{Int64}:
 4
 1
 2
 3

Example 2:

g = Int[];
progress = Progress(length(gdf))
res = transform(gdf, :x1 => (x -> (
                                    t = Threads.threadid();
                                    push!(g, t);
                                    sleep(0.001);
                                    ProgressMeter.next!(progress; showvalues = [(:iter, t)]);
                                    return x
                                    )));
unique(g)

Output 2:

1-element Vector{Int64}:
 2

it looks like, transform supports threading only when the function returns a single value, it does not support threading, when the inner function returns an array.

  1. Does anyone know why?
  2. Also, how can I do it on multiple threads in Example 2?

ta!

Ad. 1. The rules when multi-threading is used are documented in Functions · DataFrames.jl. The reason for the behavior in your question is that if function returns a scalar it is easier to compose the final output vector in a multi-threaded way. When function returns a vector we cannot know upfront what its length will be, so currently produced vectors are appended sequentially (it might change in the future).
Ad. 2. You can use threading manually yourself e.g. along lines like:

output = Vector{Any}(length(gdf))
Threads.@threads for i in 1:length(gdf)
    sdf = gdf[i]
    output[i] = your_function(sdf)
end
result = reduce(vcat, output)

Some more examples how to do it are in The state of multiple threading in DataFrames.jl | Blog by Bogumił Kamiński.

3 Likes

hi @bkamins

thanks a lot for the detailed response. I need to add these columns to the existing dataframe. I feel there are 2 options for me:

  1. I can create a new dataframe and add “x1” from existing dataframe to this dataframe and keep updating the new dataframe with the transformed column. I presume adding to a dataframe is safe unless we are reading from it at the same time. Can you please advise? [1]
  2. Alternatively, similar to your example above, I can create a Matrix. this Matrix would have cols from existing dataframe along with the new transformed columns. I would keep concatenting this Matrix and after the loop has finished convert it to a dataframe,

Can you please confirm which approach would work better please? I will put together code examples when I am home.

[1]: the caller is responsible for ensuring that a given DataFrame object is never modified by one thread while others are using it (either for reading or writing).
https://dataframes.juliadata.org/stable/lib/functions/#Multithreading-support

[2]: Convert matrix to dataframe
https://www.juliabloggers.com/working-with-matrices-in-dataframes-jl-1-0/

thanks a lot
Roh

My initial guess is that you can construct the new columns into a new dataframe separately as his example code does. Make sure you keep the grouping columns on there as well. Then at the end you can join it all together if you wish, using the grouping columns to join on.

Yes. And adding a column should be done in a single threaded mode.

got it! so, updating a column in a manual threading loop is threadsafe and adding columns should be done in single threaded manner.

thanks for the response @bkamins and @tbeason! I will post back the code in this thread when I am home later today.

As long as you do not read this column at the same time. Also please make sure that you are updating the column and not replacing it.

As long as you do not read this column at the same time. Also please make sure that you are updating the column and not replacing it.

understood, noted.

[Updated my prev response]

ahh! A quick google search shows me:

When setting a column of a data frame the df[!, :C] and df.C syntaxes are equivalent and they would replace (or create) the :C column in df. This is different from using df[:, :C] to set a column in a data frame, which updates the contents of column in-place if it already exists.

Source: Getting Started · DataFrames.jl

I think this is what you meant. thanks again for all your help with this, this has been a great learning experience.

ta!

I came across this thread as I am facing (I believe) a task somewhat similar to what is discussed here. I have a dataframe with say n columns, and I’m using a set of functions to compute say p new columns to the dataframe, based on the data in the set of n columns. There is no intermediate calculation or dependency, meaning that the order of computation of the p columns doesn’t matter.

Is it correct to say that as long as I preallocate the p new columns in the dataframe, updating them with functions taking one or many of the n input columns as inputs is safe?

EDIT: so I gave it a shot, here’s my code (uses DataFrames and DataFramesMeta, the latter because my functions are programmatically generated)

Threads.@threads for (l,r) in lr_tuples_vec
       code = :@transform!($resdf,$(l) = $(r))
       eval(code)
end

l is a QuoteNode and r is an Expr, and resdf has all l named columns preallocated with zeros.

The outcome of the above, across different tries, is between one and all of the new columns get populated (when they don’t, they contain the preallocated zeros).

Am I correct to suspect a synchronization issue? Any idea how to address it?

Yes, but it is a general Julia issue. You are calling transform! that is in-place in parallel. This is not safe. Just as the following code is not safe in Base Julia:

julia> x = Int[]
Int64[]

julia> Threads.@threads for i in 1:16
           sleep(rand()) # ensure randomness in execution order
           push!(x, i)
       end

julia> x
16-element Vector{Int64}:
  5
 13
 14
  9
  1
 10
 11
  6
 15
 16
  2
 12
  7
  3
  8
  4

If you want to update the same object many times in parallel you need to synchronize its updates.

In this case the pseudocode would be:

for your loop
     compute new column using transform (without !) into a new data frame dftmp
     lock
     add a column to resdf from dftmp
     unlock
end

In this way you compute dftmp in parallel, but you ensure that you update resdf safely. Still the order of updates of resdf would be non deterministic.

If you wanted a deterministic update order you should store tmpdf results in a pre-allocated vector and then process adding them to resdf after finishing the computations.

3 Likes

Thank you for your answer. So I see how your solution at the bottom is safe, and that was going to be my fallback option.

To go back to your earlier example though, and forgive me if the question is naive, I see how your loop is not safe, but I thought a transform! would be closer to the below (given I preallocate the columns):

julia> x = zeros(16)
16-element Vector{Float64}:
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0

julia> Threads.@threads for i in 1:16
           x[i] = i
       end

julia> x
16-element Vector{Float64}:
  1.0
  2.0
  3.0
  4.0
  5.0
  6.0
  7.0
  8.0
  9.0
 10.0
 11.0
 12.0
 13.0
 14.0
 15.0
 16.0

julia> 

… which I thought (I’m not sure anymore) was safe?

If
the columns are already present in the data frame (i.e. you do not modify the structure of the data frame) and you do not modify metadata
then it should be safe.

Ok that’s what I thought. Some columns are still all zeros though (I fill with zeros when I preallocate) when the function returns, which I’m not sure how to explain.

did you check if they are the same columns as original columns or updated columns?

Ah - I see what happens. Indeed there might be an issue with transform!. The problem is that two instances of transform! might in parallel do:

  1. one of them writes the “old” version of the column (the transform! that does not modify it)
  2. the other writes the new (this is the transform! that modifies the column)

In an unfortunate event when (this sequence of parallel events is possible):

  • the first call fetches column (to write it back later as is)
  • the second call writes it
  • the first call writes it

you will get the original column in the result (which is not what you want).

1 Like

Ah interesting. Curious though, why would the transform! that does not modify a column need to rewrite it? Isn’t one of the purposes of the inplace modification to avoid unnecessary operations (allocations or otherwise)?

It is the purpose, but this purpose can be achieved in several ways. The way DataFrames.jl does it is https://github.com/JuliaData/DataFrames.jl/blob/main/src/abstractdataframe/selection.jl#L945.

1 Like

I see. Thank you for that. I’ll need to work around it.

Final word on this: multithreaded ended up being as slow as single threaded on a 36T machine, not because of locks, but most likely due to my use of eval. I ended up sticking to single threaded, using regular code as opposed to eval (at the expense of some flexibility in functionality), and got a 25x speed up by not relying on eval.

1 Like

@bkamins Bogumil, is there a way to modify @transform to pass the a transformation as an Expr?

julia> using DataFrames, DataFramesMeta

julia> begin
             x = DataFrame(a = [2, 3])
             y = DataFrame(b = [3, 4])
             df = hcat(x, y)
             end
2×2 DataFrame
 Row │ a      b     
     │ Int64  Int64 
─────┼──────────────
   1 │     2      3
   2 │     3      4

julia> @transform(df,:c = 2*:a+:b)
2×3 DataFrame
 Row │ a      b      c     
     │ Int64  Int64  Int64 
─────┼─────────────────────
   1 │     2      3      7
   2 │     3      4     10

julia> 

What if I have my transformation as a string or a parsed expression, how could I used it with transform (or @transform):

julia> t = Meta.parse(":c = 2*:a+:b")
:(:c = 2 * :a + :b)

The above can’t be passed as an argument to transform or @transform, is there a way (even if it involves redefining my own say @transform2)?

I tried a few things but I’m not an expert in Meta programming, and I’m trying to avoid eval for the performance reasons discussed previously (and also because it’s not that elegant).