Dataframes with groupby in parallel

I have a dataframe of size (2450225, 117)
Just using pandas and multiprocessing module, gives me the results in 34 seconds. I have panda code below. I am trying to achieve the same with julia and hoping this would be a faster operation. Searched around and found out i should be using @everywhere, pmap with DataFrames, but facing issues.

Can you help me achieve what was promised with julia :slight_smile: ;
with a very widely used scenario like this groupby in parallel ?


using Distributed
addprocs(16)
@everywhere begin
    using DataFrames
    using DataFramesMeta
end

@everywhere begin
    function combine_the_group(df_)
        first(sort(DataFrame(df_), :datekey), 1)
    end
end

# this process bloats my memory and understand i am doing julia wrong
# i should probably take the compute to the data rather than this way
# any suggestions are appreciated
# data_sf1_grps = [DataFrame(grp) for grp in groupby(data_sf1, [:ticker, :calendardate])]
# so i use 
data_sf1_grps = groupby(data_sf1, [:ticker, :calendardate])
# but now, the following fails
data_sf1 = pmap(combine_the_group, [grp for grp in data_sf1_grps])

error message

On worker 2:
KeyError: key SentinelArrays [91c51154-3ec4-41a3-a24f-3f23e20d615c] not found
getindex at ./dict.jl:467 [inlined]
root_module at ./loading.jl:968 [inlined]
deserialize_module at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Serialization/src/Serialization.jl:953
handle_deserialize at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Serialization/src/Serialization.jl:855
deserialize at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Serialization/src/Serialization.jl:773
deserialize_datatype at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Serialization/src/Serialization.jl:1251
handle_deserialize at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Serialization/src/Serialization.jl:826
deserialize at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Serialization/src/Serialization.jl:773
handle_deserialize at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Serialization/src/Serialization.jl:841
deserialize_fillarray! at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Serialization/src/Serialization.jl:1153
deserialize_array at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Serialization/src/Serialization.jl:1145
handle_deserialize at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Serialization/src/Serialization.jl:824
deserialize at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Serialization/src/Serialization.jl:1350
handle_deserialize at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Serialization/src/Serialization.jl:837
deserialize at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Serialization/src/Serialization.jl:1350
handle_deserialize at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Serialization/src/Serialization.jl:837
deserialize at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Serialization/src/Serialization.jl:773
#5 at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Serialization/src/Serialization.jl:932
ntuple at ./ntuple.jl:18
deserialize_tuple at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Serialization/src/Serialization.jl:932
handle_deserialize at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Serialization/src/Serialization.jl:816
deserialize at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Serialization/src/Serialization.jl:773 [inlined]
deserialize_msg at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/messages.jl:99
#invokelatest#1 at ./essentials.jl:710 [inlined]
invokelatest at ./essentials.jl:709 [inlined]
message_handler_loop at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/process_messages.jl:185
process_tcp_streams at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/process_messages.jl:142
#99 at ./task.jl:356

Stacktrace:
 [1] (::Base.var"#770#772")(::Task) at ./asyncmap.jl:178
 [2] foreach(::Base.var"#770#772", ::Array{Any,1}) at ./abstractarray.jl:2009
 [3] maptwice(::Function, ::Channel{Any}, ::Array{Any,1}, ::Array{SubDataFrame{DataFrame,DataFrames.Index,Array{Int64,1}},1}) at ./asyncmap.jl:178
 [4] wrap_n_exec_twice(::Channel{Any}, ::Array{Any,1}, ::Distributed.var"#206#209"{WorkerPool}, ::Function, ::Array{SubDataFrame{DataFrame,DataFrames.Index,Array{Int64,1}},1}) at ./asyncmap.jl:154
 [5] async_usemap(::Distributed.var"#190#192"{Distributed.var"#190#191#193"{WorkerPool,typeof(combine_the_group)}}, ::Array{SubDataFrame{DataFrame,DataFrames.Index,Array{Int64,1}},1}; ntasks::Function, batch_size::Nothing) at ./asyncmap.jl:103
 [6] #asyncmap#754 at ./asyncmap.jl:81 [inlined]
 [7] pmap(::Function, ::WorkerPool, ::Array{SubDataFrame{DataFrame,DataFrames.Index,Array{Int64,1}},1}; distributed::Bool, batch_size::Int64, on_error::Nothing, retry_delays::Array{Any,1}, retry_check::Nothing) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/pmap.jl:126
 [8] pmap(::Function, ::WorkerPool, ::Array{SubDataFrame{DataFrame,DataFrames.Index,Array{Int64,1}},1}) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/pmap.jl:101
 [9] pmap(::Function, ::Array{SubDataFrame{DataFrame,DataFrames.Index,Array{Int64,1}},1}; kwargs::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/pmap.jl:156
 [10] pmap(::Function, ::Array{SubDataFrame{DataFrame,DataFrames.Index,Array{Int64,1}},1}) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/pmap.jl:156
 [11] top-level scope at In[33]:1
 [12] include_string(::Function, ::Module, ::String, ::String) at ./loading.jl:1091

Pandas code for reference


%%time

def get_latest_stmt(df_):
    return df_.sort_values(by="datekey", ascending=True).head(1)

def parallel_process1(df_):
    return df_.groupby(["ticker","calendardate"], as_index=False).apply(get_latest_stmt).reset_index()

partitions_cores= int(mp.cpu_count()*0.75)
print(f"mp.cpu_count():{mp.cpu_count()}, partitions_cores:{partitions_cores}")
print("data_sf1",data_sf1.shape, data_sf1[data_sf1.dimension == "ARQ"].shape, data_sf1.ticker.nunique())

df_sf1_tickers = list(data_sf1.ticker.unique())
tickers_list = list(chunks(df_sf1_tickers, len(df_sf1_tickers)//partitions_cores))
dfs_sf1 = [data_sf1[data_sf1.ticker.isin(ticker) & (data_sf1.dimension == "ARQ")] 
           for ticker in tickers_list]

print(f"total:{len(tickers_list)}, tickers_per_core:{len(df_sf1_tickers)//partitions_cores}")

pool = mp.Pool(partitions_cores)
results = pool.map(parallel_process1, dfs_sf1)
pool.close()
pool.join()
gc.collect()

data_sf1 = pd.concat(results, sort=False).drop(["level_0","level_1"	], axis=1, errors="ignore")

data_sf1.shape, data_sf1.ticker.nunique()

output

mp.cpu_count():56, partitions_cores:42
data_sf1 (2450225, 117) (508810, 117) 14903
total:43, tickers_per_core:354
CPU times: user 11.5 s, sys: 2.85 s, total: 14.3 s
Wall time: 33.5 s
((494110, 117), 13622)

Thanks to @bkamins for answering on github issues