@bkamins, would you please suggest how to make the attached code more efficient with parallelization? The DataFrame size is 1,877,655,217 rows × 27 columns.
using DataFrames, Dates, Statistics, Base.Threads
# Optimized function to detect heatwaves
function detect_heatwaves(latitudes, dates, daily_max_temperatures)
window_size = 31
heatwave_flags = falses(length(daily_max_temperatures))
threshold_percentile = 0.9
i = 1
while i <= length(dates)
hemisphere = latitudes[i] >= 0 ? "Northern" : "Southern"
month_num = month(dates[i])
is_in_interest = (hemisphere == "Northern" ? month_num in [5, 6, 7, 8, 9] : month_num in [11, 12, 1, 2, 3])
if is_in_interest
window_start = max(1, i - window_size ÷ 2)
window_end = min(length(daily_max_temperatures), i + window_size ÷ 2)
threshold = quantile(daily_max_temperatures[window_start:window_end], threshold_percentile)
count_consecutive_days = 0
j = i # set j as counter starting from i
while j <= length(dates) && daily_max_temperatures[j] > threshold
count_consecutive_days += 1
j += 1
end
if count_consecutive_days >= 3
heatwave_flags[i:j-1] .= true
i = j # update i to skip the counted days
else
i += 1
end
else
i += 1
end
end
return heatwave_flags
end
# Load data
# Function to process each group
function process_group(group)
latitudes = group.lat
dates = group.time
daily_max_temperatures = group.AvgSurfT_tavg
heatwave_flags = detect_heatwaves(latitudes, dates, daily_max_temperatures)
group.heatwave_con = heatwave_flags
return group
end
# Parallelize the processing using Threads module
function parallel_heatwave_detection(df)
grouped_df = DataFrames.groupby(df, [:lon, :lat])
results = Vector{DataFrame}(undef, length(grouped_df))
@threads for idx in 1:length(grouped_df)
results[idx] = process_group(grouped_df[idx])
end
combined_results = vcat(results...)
return combined_results
end
# Calculate and display the time taken for the process
@time p_df = parallel_heatwave_detection(dfm)