Hello there, performance enhancement help wanted:
Initial function
using Base.Threads, DataFrames, YFinance, Statistics, Dates
function historical_low(
code::Vector{String},
startdt::String,
l::Float64,
ratio::Float64
)
lk = ReentrantLock()
low_σ = Vector{String}(undef, 0) # Preallocated result storage
@threads for i in code
try
p = get_prices(
i, startdt = startdt, enddt = today(), interval = "1d",
divsplits = true, exchange_local_time = true
) |> DataFrame
# Compute only once to avoid redundant allocations
today_low = p.low[end]
min_low = minimum(p.low)
std_low = std(p.low)
# Stock price check
if l < today_low < min_low + ratio * std_low
lock(lk) do
push!(low_σ, i)
end
end
catch e
@error "Error processing $i: $e"
end
end
return unique(low_σ) # Remove duplicates before returning
end
code = get_all_symbols("NYSE")
@time results = historical_low(code, "2022-01-01", 5.0, 0.1)
which gives
309.934138 seconds (5.00 M allocations: 1.982 GiB, 0.23% gc time, 2007 lock conflicts, 0.52% compilation time)
48-element Vector{String}:
After asking GPT, I got an optimized version:
Optimized
function historical_low(
code::Vector{String},
startdt::String,
l::Float64, # l: 股价最低值
ratio::Float64
)
results = Channel{String}(Inf) # Thread-safe storage for results
batch_size::Int = 50
# Split stocks into batches
batches = Iterators.partition(code, batch_size) |> collect
# Launch tasks asynchronously
futures = [
@spawn begin
local_results = Vector{String}(undef, 0)
for i in batch
try
p = get_prices(
i, startdt = startdt, enddt = today(), interval = "1d",
divsplits = true, exchange_local_time = true
) |> DataFrame
# Use Tables.columntable() to avoid extra allocations
lows = Tables.columntable(p)[:low]
today_low = lows[end]
min_low = minimum(lows)
std_low = std(lows)
if l < today_low < min_low + ratio * std_low
push!(local_results, i)
end
catch e
@error "Error processing $i: $e"
end
end
# Send results to channel (avoids lock contention)
for stock in local_results
put!(results, stock)
end
end for batch in batches
]
# Wait for all tasks to complete
wait.(futures)
close(results) # Mark channel as finished
# Collect all results efficiently
return collect(results) # Convert channel to array
end
@time results = historical_low(code, "2022-01-01", 5.0, 0.1)
this gives:
80.924692 seconds (3.92 M allocations: 1.925 GiB, 1.28% gc time, 4950 lock conflicts)
48-element Vector{String}:
Can it be further optimized?
I also tried pmap()
for distributed computing:
Distributed Computing
using Distributed, DataFrames, YFinance, Statistics, Dates
addprocs(5)
procs()
@everywhere begin
using DataFrames, YFinance, Statistics, Dates # Ensure all workers have dependencies
function fetch_low_stock(i, startdt, l, ratio)
try
p = get_prices(
i, startdt = startdt, enddt = today(),
interval = "1d", divsplits = true, exchange_local_time = true
) |> DataFrame
if isempty(p) || isempty(p.low)
return nothing
end
last_low, min_low, std_low = p.low[end], minimum(p.low), std(p.low)
return (l < last_low < min_low + ratio * std_low) ? i : nothing
catch e
@warn "Error processing $i:$e"
return nothing
end
end
end
function historical_low_pmap(
code::Vector{String},
startdt::String,
l::Float64,
ratio::Float64)
batch_size = 1000 # Adjust batch size based on available memory
code_batches = Iterators.partition(code, batch_size) |> collect
results = vcat(pmap(
batch -> filter(!isnothing, fetch_low_stock.(
batch, startdt, l, ratio)
), code_batches)...
)
return results # Remove duplicates
end
but it is even worse (more than 15 minutes), so I gave it up