Is it possible to make this function faster?

Hi all:

I have a function foo that iterates through a ComplexF64 vector A. For each element A[i], it computes all distances |A[i] - A[j]| in the complex plane and then stores in B[i] the ratio of the minimum distance by the n-th minimum one. Typical sizes are: L ~ 1000, 3000 and n=20.

I already went through the performance tips section and made some changes, but maybe someone can make it faster.

function foo(A::Vector{<:Number}, n::Int)
    L = length(A)
    B = zeros(Float64, L)
    # Vector to store the distances between points.
    d = fill(1e5, n+1)
    @inbounds for i=1:L
        fill!(d, 1e5)
        # Index of the maximum element of d.
        ix_max = 1
        for j=1:L
            # Squared distance between points i and j.
            dij = abs2(A[i] - A[j])
            # If dij is smaller than any of the distances stored in d, 
            # replace maximum(d) by dij.
            if dij < d[ix_max]
                d[ix_max] = dij
                ix_max = argmax(d)
            end
        end
        sort!(d)
        d1 = sqrt(d[2])
        dn = sqrt(d[n+1])
        B[i] = d1/dn
    end
    return B
end

I know the computation time scales as O(L^2), so the bottleneck is in the loop of j, but I don’t know how to make it a bit more performant.

I could use threads, but I am not doing it because the function already lives inside another one that loops over the columns of a matrix, which are then passed as the vectors A to my function. This outermost loop already uses Threads.@threads.

Any idea? :grinning:

Won’t that have to, in general, iterate over all of d (so your code is more like `O(n^2 * L)?

You are only changing one element in d so you shouldn’t need to do a complete argmax, right?

Thanks for your reply, Kristoffer. d should be a different vector for each index i of the vector A, so I don’t think I can just fill it once. Maybe I didn’t explain the algorithm correctly, I will try again:
For each element A[i] I compute all distances of this element to every other element A[j] in the vector. Then I select the n+1 smallest ones (taking into account that the smallest distance is |A[i]-A[i]| = 0), which I store in the vector d. Finally I perform a small computation with the elements of d, sqrt(d[2]/d[n+1]), and store the result of this computation into B[i].

A sketch of this is

function foo(A::Vector{<:Number}, n::Int)
    L = length(A)
    B = zeros(Float64, L)
    for i=1:L
        d = norm(A[i] - A)
        sort!(d)
        B[i] = sqrt(d[2]/d[n+1])
    end
    return B
end

but here d stores all possible distances, which is slower than my version.

Because there is no ordering of the elements of A I must go through every pair of elements A[i]-A[j], that’s why I say that the algorithm complexity is of order O(L^2).

Sorry I meant O(L^2*n), (argmax is O(n) inside a O(L^2) loop)

Totally right, I didn’t understand. However the function only computes argmax iff dij < d[max], so maybe it is just an upper bound if we are really unlucky. If you want some context, A are the eigenvalues of several random non-hermitian matrices. I know they are ordered by their real part, but I’m not sure whether I can use this to skip some iterations.

If I understand this correctly you could speed up the algorithm if you only operate on the necessary d of size n+1 instead of 1e5 and update d by inserting a new value at a sorted position (e.g. by utilizing searchsorted). Then you can skip the costly sort! step in each iteration because d already contains the largest n+1 sorted values.

The searchsorted suggestion is in the right direction I think. Also consider looking into partialsort/partialsortperm to sort the top n+1 entries only.
Another thought is that since distances are symmetric, you only need to iterate on

for i in 1:L, j in (i+1):L
end

rather than the full L^2.

Finally, since this is a euclidean distance based thing in which you’re concerned with the β€œclosest point” and the β€œ(n+1)th closest point”, organizing your data into a quadtree initially may provide a boost by being able to completely ignore most entries. Check out RegionTrees.jl for that. certainly for the closest point seach this would be vital, but with just a bit of preprocessing, I think a good heuristic for which bins to look in for the n+1th case would also be possible.

1 Like

Your sketch above should by 85X faster:

function foo2(A::Vector{<:Number}, n::Int)
    L = length(A)
    B = zeros(L)
    d = zeros(L)
    for i=1:L
        @. d = abs2(A[i] - A)
        sort!(d)
        B[i] = sqrt(d[2]/d[n+1])
    end
    return B
end

Why not use it? Am I missing something?

If you have some order in your data it is really useful if you provide code to generate representative input data.

1 Like

Hi all, thanks for your answers. partialsort is a good idea, unfortunately it is a bit slower than my original function. I provide a benchmark with what I understood was the use of partialsort and my sketch function (which is really slower than the other ones)

function foo(A::Vector{<:Number}, n::Int)
    L = length(A)
    B = zeros(Float64, L)
    d = fill(1e5, n+1)
    @inbounds for i=1:L
        fill!(d, 1e5)
        ix_max = 1
        for j=1:L
            dij = abs2(A[i] - A[j])
            if dij < d[ix_max]
                d[ix_max] = dij
                ix_max = argmax(d)
            end
        end
        sort!(d)
        d1 = sqrt(d[2])
        dn = sqrt(d[n+1])
        B[i] = d1/dn*sqrt(n/pi)
    end
    return B
end

function foo2(A::Vector{<:Number}, n::Int)

    L = length(A)
    B = zeros(Float64, L)
    d = fill(1e5, n+1)
    @inbounds for i=1:L
        fill!(d, 1e5)
        for j=1:L
            dij = abs2(A[i] - A[j])
            if dij < d[end]
                d[end] = dij
                partialsort!(d, n+1)
            end
        end
        d1 = sqrt(d[2])
        dn = sqrt(d[n+1])
        B[i] = d1/dn*sqrt(n/pi)
    end
    
    return B
end

function foo3(A::Vector{<:Number}, n::Int)
    L = length(A)
    B = zeros(L)
    d = zeros(L)
    for i=1:L
        @. d = abs2(A[i] - A)
        sort!(d)
        B[i] = sqrt(d[2]*n/(d[n+1]*pi))
    end
    return B
end

function main(L)
    A = randn(ComplexF64, L)
    n = 20
    
    # Check that results are equal.
    S1 = foo(A, n)
    S2 = foo2(A, n)
    S3 = foo3(A, n)
    display(norm(S1-S2))
    display(norm(S1-S3))
    
    # Benchmark.
    @btime foo($A, $n)
    @btime foo2($A, $n)
    @btime foo3($A, $n)
    return
end

main(3000)

Output:

0.0
4.914452931421627e-15

  43.340 ms (3 allocations: 23.77 KiB)
  55.410 ms (3 allocations: 23.77 KiB)
  393.345 ms (4 allocations: 47.03 KiB)

I also used TimerOutputs.jl (really useful one, thanks, Kristoffer) to check that the final sorting is not the problem

function foo_timeit(A::Vector{<:Number}, n::Int)
    to = TimerOutput()

    L = length(A)
    B = zeros(Float64, L)
    d = fill(1e5, n+1)
    @timeit to "for: i" @inbounds for i=1:L
        fill!(d, 1e5)
        ix_max = 1
        @timeit to "for: j" for j=1:L
            dij = abs2(A[i] - A[j])
            if dij < d[ix_max]
                @timeit to "place dij" d[ix_max] = dij
                @timeit to "argmax" ix_max = argmax(d)
            end
        end
        @timeit to "sort" sort!(d)
        d1 = sqrt(d[2])
        dn = sqrt(d[n+1])
        B[i] = d1/dn
    end
    
    println(to)
    return B
end

foo_timeit(randn(ComplexF64, 3000), 20);

Output:

────────────────────────────────────────────────────────────────────────
                                 Time                   Allocations      
                         ──────────────────────   ───────────────────────
    Tot / % measured:         156ms / 100%            28.8KiB / 11.5%    

 Section         ncalls     time   %tot     avg     alloc   %tot      avg
 ────────────────────────────────────────────────────────────────────────
 for: i               1    156ms   100%   156ms   3.31KiB  100%   3.31KiB
   for: j         3.00k    154ms  98.9%  51.4ΞΌs   1.66KiB  50.0%    0.57B
     argmax        383k   49.5ms  31.7%   129ns     0.00B  0.00%    0.00B
     place dij     383k   20.3ms  13.0%  53.0ns     0.00B  0.00%    0.00B
   sort           3.00k   1.10ms  0.71%   368ns     0.00B  0.00%    0.00B
 ────────────────────────────────────────────────────────────────────────

@kristoffer.carlsson, with respect to the data it is very heterogeneous. It comes from general random complex matrices, random symmetric complex matrices and dissipative forms of quantum systems. A general form to get some input data for my function would be:

M = randn(ComplexF64, L, L)
A = eigvals(M)

With respect to inspecting only over for i in 1:L, j in (i+1):L it might be a good idea, but I would have to find a nice way to store intermediate results that is not too memory consuming. Anyways, I didn’t know about RegionTrees.jl, it’s a very good idea to preprocess input data in this form! I’ll try to implement it and give you feedback if I get anything.

Sorry for the long answer. Thank you all again for those that answered. :grinning:

function foo4(A::Vector{<:Number}, n::Int)
    L = length(A)
    B = zeros(Float64, L)
    d = fill(1e5, n + 1)
    @inbounds for i = 1:L
        fill!(d, 1e5)
        for j = 1:L
            dij = abs2(A[i] - A[j])
            if dij < d[n + 1]
                insert!(d, searchsortedfirst(d, dij), dij)
                resize!(d, n + 1)
            end
        end
        d1 = sqrt(d[2])
        dn = sqrt(d[n + 1])
        B[i] = d1 / dn * sqrt(n / pi)
    end
    return B
end

on my machine:

julia> @btime foo($A, $n);
  39.238 ms (3 allocations: 23.77 KiB)

julia> @btime foo4($A, $n);
  30.960 ms (4 allocations: 24.13 KiB)

Thanks @laborg ! That really works! :grinning: :grinning:

(Sorry for the late answer)

1 Like