I am trying to figure out how multi-threading works with data frames.
Example 1:
using DataFrames, ProgressMeter
df = DataFrame(reshape(1:50_000_000, :, 25), :auto);
df.id1 = repeat(1:10_000, 200);
df.id2 = string.(df.id1);
g = Int[];
gdf = groupby(df, :id1);
progress = Progress(length(gdf))
res = transform(gdf, :x1 => (x -> (
t = Threads.threadid();
push!(g, t);
sleep(0.001);
ProgressMeter.next!(progress; showvalues = [(:iter, t)]); t
) => :tid));
unique(g)
Output:
4-element Vector{Int64}:
4
1
2
3
Example 2:
g = Int[];
progress = Progress(length(gdf))
res = transform(gdf, :x1 => (x -> (
t = Threads.threadid();
push!(g, t);
sleep(0.001);
ProgressMeter.next!(progress; showvalues = [(:iter, t)]);
return x
)));
unique(g)
Output 2:
1-element Vector{Int64}:
2
it looks like, transform supports threading only when the function returns a single value, it does not support threading, when the inner function returns an array.
Does anyone know why?
Also, how can I do it on multiple threads in Example 2?
Ad. 1. The rules when multi-threading is used are documented in Functions · DataFrames.jl. The reason for the behavior in your question is that if function returns a scalar it is easier to compose the final output vector in a multi-threaded way. When function returns a vector we cannot know upfront what its length will be, so currently produced vectors are appended sequentially (it might change in the future).
Ad. 2. You can use threading manually yourself e.g. along lines like:
output = Vector{Any}(length(gdf))
Threads.@threads for i in 1:length(gdf)
sdf = gdf[i]
output[i] = your_function(sdf)
end
result = reduce(vcat, output)
thanks a lot for the detailed response. I need to add these columns to the existing dataframe. I feel there are 2 options for me:
I can create a new dataframe and add “x1” from existing dataframe to this dataframe and keep updating the new dataframe with the transformed column. I presume adding to a dataframe is safe unless we are reading from it at the same time. Can you please advise? [1]
Alternatively, similar to your example above, I can create a Matrix. this Matrix would have cols from existing dataframe along with the new transformed columns. I would keep concatenting this Matrix and after the loop has finished convert it to a dataframe,
Can you please confirm which approach would work better please? I will put together code examples when I am home.
My initial guess is that you can construct the new columns into a new dataframe separately as his example code does. Make sure you keep the grouping columns on there as well. Then at the end you can join it all together if you wish, using the grouping columns to join on.
As long as you do not read this column at the same time. Also please make sure that you are updating the column and not replacing it.
understood, noted.
[Updated my prev response]
ahh! A quick google search shows me:
When setting a column of a data frame the df[!, :C] and df.C syntaxes are equivalent and they would replace (or create) the :C column in df. This is different from using df[:, :C] to set a column in a data frame, which updates the contents of column in-place if it already exists.
I came across this thread as I am facing (I believe) a task somewhat similar to what is discussed here. I have a dataframe with say n columns, and I’m using a set of functions to compute say p new columns to the dataframe, based on the data in the set of n columns. There is no intermediate calculation or dependency, meaning that the order of computation of the p columns doesn’t matter.
Is it correct to say that as long as I preallocate the p new columns in the dataframe, updating them with functions taking one or many of the n input columns as inputs is safe?
EDIT: so I gave it a shot, here’s my code (uses DataFrames and DataFramesMeta, the latter because my functions are programmatically generated)
Threads.@threads for (l,r) in lr_tuples_vec
code = :@transform!($resdf,$(l) = $(r))
eval(code)
end
l is a QuoteNode and r is an Expr, and resdf has all l named columns preallocated with zeros.
The outcome of the above, across different tries, is between one and all of the new columns get populated (when they don’t, they contain the preallocated zeros).
Am I correct to suspect a synchronization issue? Any idea how to address it?
Yes, but it is a general Julia issue. You are calling transform! that is in-place in parallel. This is not safe. Just as the following code is not safe in Base Julia:
julia> x = Int[]
Int64[]
julia> Threads.@threads for i in 1:16
sleep(rand()) # ensure randomness in execution order
push!(x, i)
end
julia> x
16-element Vector{Int64}:
5
13
14
9
1
10
11
6
15
16
2
12
7
3
8
4
If you want to update the same object many times in parallel you need to synchronize its updates.
In this case the pseudocode would be:
for your loop
compute new column using transform (without !) into a new data frame dftmp
lock
add a column to resdf from dftmp
unlock
end
In this way you compute dftmp in parallel, but you ensure that you update resdf safely. Still the order of updates of resdf would be non deterministic.
If you wanted a deterministic update order you should store tmpdf results in a pre-allocated vector and then process adding them to resdf after finishing the computations.
Thank you for your answer. So I see how your solution at the bottom is safe, and that was going to be my fallback option.
To go back to your earlier example though, and forgive me if the question is naive, I see how your loop is not safe, but I thought a transform! would be closer to the below (given I preallocate the columns):
julia> x = zeros(16)
16-element Vector{Float64}:
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
julia> Threads.@threads for i in 1:16
x[i] = i
end
julia> x
16-element Vector{Float64}:
1.0
2.0
3.0
4.0
5.0
6.0
7.0
8.0
9.0
10.0
11.0
12.0
13.0
14.0
15.0
16.0
julia>
… which I thought (I’m not sure anymore) was safe?
If
the columns are already present in the data frame (i.e. you do not modify the structure of the data frame) and you do not modify metadata
then it should be safe.
Ok that’s what I thought. Some columns are still all zeros though (I fill with zeros when I preallocate) when the function returns, which I’m not sure how to explain.
Ah interesting. Curious though, why would the transform! that does not modify a column need to rewrite it? Isn’t one of the purposes of the inplace modification to avoid unnecessary operations (allocations or otherwise)?
Final word on this: multithreaded ended up being as slow as single threaded on a 36T machine, not because of locks, but most likely due to my use of eval. I ended up sticking to single threaded, using regular code as opposed to eval (at the expense of some flexibility in functionality), and got a 25x speed up by not relying on eval.
The above can’t be passed as an argument to transform or @transform, is there a way (even if it involves redefining my own say @transform2)?
I tried a few things but I’m not an expert in Meta programming, and I’m trying to avoid eval for the performance reasons discussed previously (and also because it’s not that elegant).