Reduce allocations & lock conflicts and speed up when using multithreads

Hello there, performance enhancement help wanted:

Initial function
using Base.Threads, DataFrames, YFinance, Statistics, Dates

function historical_low(
    code::Vector{String}, 
    startdt::String,
    l::Float64, 
    ratio::Float64
)
    lk = ReentrantLock()
    low_σ = Vector{String}(undef, 0)  # Preallocated result storage

    @threads for i in code
        try
            p = get_prices(
                i, startdt = startdt, enddt = today(), interval = "1d", 
                divsplits = true, exchange_local_time = true
            ) |> DataFrame

            # Compute only once to avoid redundant allocations
            today_low = p.low[end]
            min_low = minimum(p.low)
            std_low = std(p.low)

            # Stock price check
            if l < today_low < min_low + ratio * std_low
                lock(lk) do
                    push!(low_σ, i)  
                end
            end
        catch e
            @error "Error processing $i: $e"
        end
    end
    return unique(low_σ)  # Remove duplicates before returning
end

code = get_all_symbols("NYSE")
@time results = historical_low(code, "2022-01-01", 5.0, 0.1)

which gives

309.934138 seconds (5.00 M allocations: 1.982 GiB, 0.23% gc time, 2007 lock conflicts, 0.52% compilation time)
48-element Vector{String}:

After asking GPT, I got an optimized version:

Optimized
function historical_low(
	code::Vector{String}, 
	startdt::String,
	l::Float64, # l: 股价最低值
	ratio::Float64  
	)
	
	results = Channel{String}(Inf)  # Thread-safe storage for results
	
	batch_size::Int = 50  
	# Split stocks into batches
    batches = Iterators.partition(code, batch_size) |> collect
	
	# Launch tasks asynchronously
    futures = [
		@spawn begin

        local_results = Vector{String}(undef, 0)

        for i in batch
            try
                p = get_prices(
                    i, startdt = startdt, enddt = today(), interval = "1d",
                    divsplits = true, exchange_local_time = true
                ) |> DataFrame

                # Use Tables.columntable() to avoid extra allocations
                lows = Tables.columntable(p)[:low]

                today_low = lows[end]
                min_low = minimum(lows)
                std_low = std(lows)

                if l < today_low < min_low + ratio * std_low
                    push!(local_results, i)
                end
            catch e
                @error "Error processing $i: $e"
            end
        end

        # Send results to channel (avoids lock contention)
        for stock in local_results
            put!(results, stock)
        end
    	end for batch in batches
	]

    # Wait for all tasks to complete
    wait.(futures)
    close(results)  # Mark channel as finished

    # Collect all results efficiently
    return collect(results)  # Convert channel to array
end
@time results = historical_low(code, "2022-01-01", 5.0, 0.1)

this gives:

80.924692 seconds (3.92 M allocations: 1.925 GiB, 1.28% gc time, 4950 lock conflicts)
48-element Vector{String}:

Can it be further optimized?
I also tried pmap() for distributed computing:

Distributed Computing
using Distributed, DataFrames, YFinance, Statistics, Dates

addprocs(5)
procs()
@everywhere begin

	using DataFrames, YFinance, Statistics, Dates  # Ensure all workers have dependencies

	function fetch_low_stock(i, startdt, l, ratio)
		try
			p = get_prices(
				i, startdt = startdt, enddt = today(), 
				interval = "1d", divsplits = true, exchange_local_time = true
			) |> DataFrame
	
			if isempty(p) || isempty(p.low)
				return nothing
			end
			
			last_low, min_low, std_low = p.low[end], minimum(p.low), std(p.low)
			
			return (l < last_low < min_low + ratio * std_low) ? i : nothing
	
		catch e
			@warn "Error processing $i:$e"
			return nothing
		end
	end
end

function historical_low_pmap(
    code::Vector{String}, 
	startdt::String,
    l::Float64, 
	ratio::Float64)

	batch_size = 1000 # Adjust batch size based on available memory
    code_batches = Iterators.partition(code, batch_size) |> collect
	results = vcat(pmap(
		batch -> filter(!isnothing, fetch_low_stock.(
			batch, startdt, l, ratio)
		), code_batches)...
	)
	return results # Remove duplicates
end

but it is even worse (more than 15 minutes), so I gave it up

You should profile your code to figure out what takes time. If the work inside each task is what takes the most time, then optimise that.

If your issues are contention around the channel (but I doubt that), then you can lock the channel before pushing in a loop like so:

# Send results to channel
@lock results begin
    for stock in local_results
        put!(results, stock)
    end
end