Is it possible to make this function faster?

Hi all:

I have a function foo that iterates through a ComplexF64 vector A. For each element A[i], it computes all distances |A[i] - A[j]| in the complex plane and then stores in B[i] the ratio of the minimum distance by the n-th minimum one. Typical sizes are: L ~ 1000, 3000 and n=20.

I already went through the performance tips section and made some changes, but maybe someone can make it faster.

function foo(A::Vector{<:Number}, n::Int)
    L = length(A)
    B = zeros(Float64, L)
    # Vector to store the distances between points.
    d = fill(1e5, n+1)
    @inbounds for i=1:L
        fill!(d, 1e5)
        # Index of the maximum element of d.
        ix_max = 1
        for j=1:L
            # Squared distance between points i and j.
            dij = abs2(A[i] - A[j])
            # If dij is smaller than any of the distances stored in d, 
            # replace maximum(d) by dij.
            if dij < d[ix_max]
                d[ix_max] = dij
                ix_max = argmax(d)
            end
        end
        sort!(d)
        d1 = sqrt(d[2])
        dn = sqrt(d[n+1])
        B[i] = d1/dn
    end
    return B
end

I know the computation time scales as O(L^2), so the bottleneck is in the loop of j, but I don’t know how to make it a bit more performant.

I could use threads, but I am not doing it because the function already lives inside another one that loops over the columns of a matrix, which are then passed as the vectors A to my function. This outermost loop already uses Threads.@threads.

Any idea? :grinning:

Won’t that have to, in general, iterate over all of d (so your code is more like `O(n^2 * L)?

You are only changing one element in d so you shouldn’t need to do a complete argmax, right?

Thanks for your reply, Kristoffer. d should be a different vector for each index i of the vector A, so I don’t think I can just fill it once. Maybe I didn’t explain the algorithm correctly, I will try again:
For each element A[i] I compute all distances of this element to every other element A[j] in the vector. Then I select the n+1 smallest ones (taking into account that the smallest distance is |A[i]-A[i]| = 0), which I store in the vector d. Finally I perform a small computation with the elements of d, sqrt(d[2]/d[n+1]), and store the result of this computation into B[i].

A sketch of this is

function foo(A::Vector{<:Number}, n::Int)
    L = length(A)
    B = zeros(Float64, L)
    for i=1:L
        d = norm(A[i] - A)
        sort!(d)
        B[i] = sqrt(d[2]/d[n+1])
    end
    return B
end

but here d stores all possible distances, which is slower than my version.

Because there is no ordering of the elements of A I must go through every pair of elements A[i]-A[j], that’s why I say that the algorithm complexity is of order O(L^2).

Sorry I meant O(L^2*n), (argmax is O(n) inside a O(L^2) loop)

Totally right, I didn’t understand. However the function only computes argmax iff dij < d[max], so maybe it is just an upper bound if we are really unlucky. If you want some context, A are the eigenvalues of several random non-hermitian matrices. I know they are ordered by their real part, but I’m not sure whether I can use this to skip some iterations.

If I understand this correctly you could speed up the algorithm if you only operate on the necessary d of size n+1 instead of 1e5 and update d by inserting a new value at a sorted position (e.g. by utilizing searchsorted). Then you can skip the costly sort! step in each iteration because d already contains the largest n+1 sorted values.

The searchsorted suggestion is in the right direction I think. Also consider looking into partialsort/partialsortperm to sort the top n+1 entries only.
Another thought is that since distances are symmetric, you only need to iterate on

for i in 1:L, j in (i+1):L
end

rather than the full L^2.

Finally, since this is a euclidean distance based thing in which you’re concerned with the β€œclosest point” and the β€œ(n+1)th closest point”, organizing your data into a quadtree initially may provide a boost by being able to completely ignore most entries. Check out RegionTrees.jl for that. certainly for the closest point seach this would be vital, but with just a bit of preprocessing, I think a good heuristic for which bins to look in for the n+1th case would also be possible.

Your sketch above should by 85X faster:

function foo2(A::Vector{<:Number}, n::Int)
    L = length(A)
    B = zeros(L)
    d = zeros(L)
    for i=1:L
        @. d = abs2(A[i] - A)
        sort!(d)
        B[i] = sqrt(d[2]/d[n+1])
    end
    return B
end

Why not use it? Am I missing something?

If you have some order in your data it is really useful if you provide code to generate representative input data.

Hi all, thanks for your answers. partialsort is a good idea, unfortunately it is a bit slower than my original function. I provide a benchmark with what I understood was the use of partialsort and my sketch function (which is really slower than the other ones)

function foo(A::Vector{<:Number}, n::Int)
    L = length(A)
    B = zeros(Float64, L)
    d = fill(1e5, n+1)
    @inbounds for i=1:L
        fill!(d, 1e5)
        ix_max = 1
        for j=1:L
            dij = abs2(A[i] - A[j])
            if dij < d[ix_max]
                d[ix_max] = dij
                ix_max = argmax(d)
            end
        end
        sort!(d)
        d1 = sqrt(d[2])
        dn = sqrt(d[n+1])
        B[i] = d1/dn*sqrt(n/pi)
    end
    return B
end

function foo2(A::Vector{<:Number}, n::Int)

    L = length(A)
    B = zeros(Float64, L)
    d = fill(1e5, n+1)
    @inbounds for i=1:L
        fill!(d, 1e5)
        for j=1:L
            dij = abs2(A[i] - A[j])
            if dij < d[end]
                d[end] = dij
                partialsort!(d, n+1)
            end
        end
        d1 = sqrt(d[2])
        dn = sqrt(d[n+1])
        B[i] = d1/dn*sqrt(n/pi)
    end
    
    return B
end

function foo3(A::Vector{<:Number}, n::Int)
    L = length(A)
    B = zeros(L)
    d = zeros(L)
    for i=1:L
        @. d = abs2(A[i] - A)
        sort!(d)
        B[i] = sqrt(d[2]*n/(d[n+1]*pi))
    end
    return B
end

function main(L)
    A = randn(ComplexF64, L)
    n = 20
    
    # Check that results are equal.
    S1 = foo(A, n)
    S2 = foo2(A, n)
    S3 = foo3(A, n)
    display(norm(S1-S2))
    display(norm(S1-S3))
    
    # Benchmark.
    @btime foo($A, $n)
    @btime foo2($A, $n)
    @btime foo3($A, $n)
    return
end

main(3000)

Output:

0.0
4.914452931421627e-15

  43.340 ms (3 allocations: 23.77 KiB)
  55.410 ms (3 allocations: 23.77 KiB)
  393.345 ms (4 allocations: 47.03 KiB)

I also used TimerOutputs.jl (really useful one, thanks, Kristoffer) to check that the final sorting is not the problem

function foo_timeit(A::Vector{<:Number}, n::Int)
    to = TimerOutput()

    L = length(A)
    B = zeros(Float64, L)
    d = fill(1e5, n+1)
    @timeit to "for: i" @inbounds for i=1:L
        fill!(d, 1e5)
        ix_max = 1
        @timeit to "for: j" for j=1:L
            dij = abs2(A[i] - A[j])
            if dij < d[ix_max]
                @timeit to "place dij" d[ix_max] = dij
                @timeit to "argmax" ix_max = argmax(d)
            end
        end
        @timeit to "sort" sort!(d)
        d1 = sqrt(d[2])
        dn = sqrt(d[n+1])
        B[i] = d1/dn
    end
    
    println(to)
    return B
end

foo_timeit(randn(ComplexF64, 3000), 20);

Output:

────────────────────────────────────────────────────────────────────────
                                 Time                   Allocations      
                         ──────────────────────   ───────────────────────
    Tot / % measured:         156ms / 100%            28.8KiB / 11.5%    

 Section         ncalls     time   %tot     avg     alloc   %tot      avg
 ────────────────────────────────────────────────────────────────────────
 for: i               1    156ms   100%   156ms   3.31KiB  100%   3.31KiB
   for: j         3.00k    154ms  98.9%  51.4ΞΌs   1.66KiB  50.0%    0.57B
     argmax        383k   49.5ms  31.7%   129ns     0.00B  0.00%    0.00B
     place dij     383k   20.3ms  13.0%  53.0ns     0.00B  0.00%    0.00B
   sort           3.00k   1.10ms  0.71%   368ns     0.00B  0.00%    0.00B
 ────────────────────────────────────────────────────────────────────────

@kristoffer.carlsson, with respect to the data it is very heterogeneous. It comes from general random complex matrices, random symmetric complex matrices and dissipative forms of quantum systems. A general form to get some input data for my function would be:

M = randn(ComplexF64, L, L)
A = eigvals(M)

With respect to inspecting only over for i in 1:L, j in (i+1):L it might be a good idea, but I would have to find a nice way to store intermediate results that is not too memory consuming. Anyways, I didn’t know about RegionTrees.jl, it’s a very good idea to preprocess input data in this form! I’ll try to implement it and give you feedback if I get anything.

Sorry for the long answer. Thank you all again for those that answered. :grinning:

function foo4(A::Vector{<:Number}, n::Int)
    L = length(A)
    B = zeros(Float64, L)
    d = fill(1e5, n + 1)
    @inbounds for i = 1:L
        fill!(d, 1e5)
        for j = 1:L
            dij = abs2(A[i] - A[j])
            if dij < d[n + 1]
                insert!(d, searchsortedfirst(d, dij), dij)
                resize!(d, n + 1)
            end
        end
        d1 = sqrt(d[2])
        dn = sqrt(d[n + 1])
        B[i] = d1 / dn * sqrt(n / pi)
    end
    return B
end

on my machine:

julia> @btime foo($A, $n);
  39.238 ms (3 allocations: 23.77 KiB)

julia> @btime foo4($A, $n);
  30.960 ms (4 allocations: 24.13 KiB)

Thanks @laborg ! That really works! :grinning: :grinning:

(Sorry for the late answer)