Need memory and/or parallel optimization for pairwise distance metrics

I have a ~1000x800,000 sparse matrix, and I would like to calculate pairwise distance metrics for the columns. The vast majority of these will have distance 1, so I thought that I might store them as similarity (1 - distance) in a sparse matrix to make the memory footprint feasible.

Unfortunately, the pairwise() function from Distances.jl gives me an out-of-memory error (it’s generating a dense matrix with lots of 1s), and the serial version (where I calculate them one-at-a-time and add them to a sparse matrix) is painfully slow.

using SparseArrays
using Distances
using Combinatorics

function sparse_distance(mat, df=jaccard)
  amat = spzeros(size(mat, 2), size(mat, 2))

  for (i,j) in Combinatorics.combinations(1:size(mat,2), 2)
    j % 100_000 == 0 && @info i,j
    d = df(mat[:,i], mat[:,j])
    d == 1. && continue
    amat[i,j] = 1. - d
  end
  amat
end

Normally is situations like this, I would do one of the following, but I don’t think they’ll work

  1. Use threads to make things a bit more parallel - this won’t work because updating the sparse matrix isn’t threadsafe
  2. Use a generator to take some advantage of SIMD - can’t figure out a way to do this that doesn’t either store every entry or calculate the distance twice (eg [(jaccard(mat[:,i], mat[:,j]), i, j) for (i,j) in Combinatorics(1:size(mat,1), 2) if jaccard(mat[:,i], mat[:,j]) < 1]

Maybe I need to just accept the slowness and / or partition the calculations and store them in files or something, but if anyone has any fancy tricks, I would love to hear them :slight_smile:

The first thing to do here is @views d = df(mat[:,i], mat[:,j]). Otherwise you’re copying a Vector out every single time.

4 Likes

This seems to work:

using SparseArrays
using Distances
using Combinatorics
using ChunkSplitters
using Base.Threads

@views function sparse_distance0(mat, df=jaccard)
    amat_total = spzeros(size(mat, 2), size(mat, 2))
    lk = ReentrantLock()
    @sync for ic in chunks(1:size(mat,2); n=Threads.nthreads()) 
        @spawn begin
            amat_local = spzeros(size(mat, 2), size(mat, 2))
            for i in ic
                for j in axes(mat, 2)
                    i >= j && continue
                    j % 100_000 == 0 && @info i, j
                    d = df(mat[:, i], mat[:, j])
                    d == 1. && continue
                    amat_local[i, j] = 1. - d
                end
            end
            @lock lk amat_total += amat_local
        end
    end
    amat_total
end

I get:

julia> s0 = sparse_distance(mat); @b sparse_distance(mat) # original with @views
7.961 s (999084 allocs: 71.928 MiB, 0.03% gc time, without a warmup)

julia> println(sparse_distance0(mat) ≈ s0); @b sparse_distance0(mat)
true
1.645 s (328 allocs: 42.364 MiB, 0.08% gc time, without a warmup)

julia> Threads.nthreads()
4

ps: I don’t think you get much more from SIMD there, because the computation of the distance is probably using that.

ps2: I tested that with a rand(1000,1000) matrix.

4 Likes

Neat! Inferring here, are the following correct?

  1. the chunks function splits the column indices into nthreads groups, eg 1:100, 101:200 (if I had 1000 columns and 10 threads)
  2. then the loop calculates a block that’s N/threads x N, where N is the total number of columns. Each of these can be done in parallel because of @spawn
  3. The @lock lk ... allows updating the sparse matrix as the loops complete, by allowing only one of these to happen at a time?

I just tried this with 10k columns, and it took a few minutes, which is definitely a huge improvement, but I’m a bit wary of scaling that up to the full size all in-memory. I think I will modify this to do smaller chunks and then save them in CSVs as i,j,v.

1 Like

Do you need all distances or only nearest neighbors? Because e.g. NearestNeighbors.jl could be used to reduce the computational complexity a lot in that case.

2 Likes

Wow, I didn’t expect going to files to also provide a huge speedup

@views function sparse_distance_files(mat, dist_func=jaccard; outdir="dmats")
    lk = ReentrantLock()
    @sync for ic in chunks(1:size(mat,2); n=1000) 
        filename = joinpath(outdir, "chunk_$ic.csv")
        #isfile(filename) && continue
        @spawn begin
            amat_local = spzeros(size(mat, 2), size(mat, 2))
            for i in ic
                for j in axes(mat, 2)
                    i >= j && continue
                    d = dist_func(mat[:, i], mat[:, j])
                    d == 1. && continue
                    amat_local[i, j] = 1. - d
                end
            end
            df = [(; i, j, v=amat_local[i,j]) for i in ic for j in axes(mat,2) if amat_local[i,j] > 0]
            @lock lk CSV.write(filename, df)
        end
    end
    return nothing
end
julia> @btime sparse_distance0(cmat[:,1:5000]); # @Imiq's version
  2.675 s (1329 allocations: 352.15 MiB)

julia> @btime sparse_distance_files(cmat[:,1:5000]);
  872.644 ms (95241 allocations: 4.22 GiB)
1 Like

Hmm, that’s a good thought. I’m not sure TBH, but good to know about this!

You also may get some additional speedup by using a different chunking strategy: Multithreading · ChunkSplitters.jl

I don’t think you’ll have memory problems with that. But you can check how it scales with the number of columns before. The loop itself does not scale.

Assigning one element at a time is an incredibly expensive way to construct a sparse matrix in CSC format (what Julia uses).

It should be much more efficient to accumulate a vector of (i, j, 1-d) triplets (or three separate vectors I, J, V) — probably one for each parallel chunk and then concatenated afterwards — and then pass this to the sparse constructor at the end to get a sparse CSC matrix.

5 Likes

Just a few thoughts:

  • exact equality between floats is usually a bad idea. I would not be surprised if the condition d == 1. never is true. It’s probably better to allow some small tolerace or use isapprox (which uses sqrt(eps(1)) as default)
  • the 1. - d leads to very bad accuracy if d is close to 1. So of you care about very small similaritirs, then you should rethink your current approach and probably come up with a more accurate way of computing 1 - dist_func(x, y) to avoid the cancellation.
  • in Julia you generally can and should just write 1 and the conversion to float is handled by the compiler. It is usually worse to write 1. instead because that enforces Float64 and thus reduces the generality because even if someone calls the function with Float32 or Rational then the usage of a Float64 constant would convert the whole calculation to Float64. Floats are said to be contagious. So using integer constants is preferred.
  • your file based implementation does not require the lock bevause writes to different files don’t interfere
2 Likes

Following Steven advice in constructing the sparse matrix, we go down to:

julia> println(sparse_distance0(mat) ≈ s0); @b sparse_distance0(mat)
true
65.798 ms (349 allocs: 70.435 MiB, 9.62% gc time)
Code
using SparseArrays
using Distances
using Combinatorics
using ChunkSplitters
using Base.Threads

@views function sparse_distance(mat, df=jaccard)
    amat = spzeros(size(mat, 2), size(mat, 2))
    for (i, j) in Combinatorics.combinations(1:size(mat, 2), 2)
        j % 100_000 == 0 && @info i, j
        d = df(mat[:, i], mat[:, j])
        d == 1. && continue
        amat[i, j] = 1. - d
    end
    amat
end

@views function sparse_distance0(mat, df=jaccard)
    lk = ReentrantLock()
    x_all = Int[]
    y_all = Int[]
    z_all = Float64[]
    @sync for ic in chunks(1:size(mat,2); n=Threads.nthreads()) 
        @spawn begin
            x = Int[]
            y = Int[]
            z = Float64[]
            for i in ic
                for j in axes(mat, 2)
                    i >= j && continue
                    j % 100_000 == 0 && @info i, j
                    d = df(mat[:, i], mat[:, j])
                    d == 1 && continue
                    push!(x, i)
                    push!(y, j)
                    push!(z, 1 - d)
                end
            end
            lock(lk) do 
                append!(x_all, x)
                append!(y_all, y)
                append!(z_all, z)
            end
        end
    end
    return sparse(x_all, y_all, z_all, size(mat, 2), size(mat, 2))
end

And if we increase the number of chunks to 10 * Threads.nthreads() to improve load balancing, we get:

julia> println(sparse_distance0(mat) ≈ s0); @b sparse_distance0(mat)
true
41.972 ms (2085 allocs: 75.692 MiB, 2.84% gc time)
5 Likes

I think the code in Distances.jl for sparse arrays is not very optimized for simd. The appropriate thing would be to “multi-thread” over simd-lanes, ispc / gpu style. Unfortunately there is no good way to write that in julia, afaiu. But you could write it in ispc and ccall it.

As a low-effort thing: For this kind of computation, you definitely want to enable SMT / hyperthreading, the inner loop in Distances.jl will be bound on either branch-misses (probably) or CMOV latency (if the compiler manages to heroically transform the code).

The critical code is here Distances.jl/ext/DistancesSparseArraysExt.jl at c93b809e15080d7e6df1dde6cfe8f0a7f2a1582a · JuliaStats/Distances.jl · GitHub

Once you’re memory bound, you can fix that by changing the loop order: Use your favorite cache-oblivious curve, e.g. hilbert or Z-order. (analogous to chunking/blocking in matmul / gemm)

But probably you can declare victory, call it fast enough, and go home?

(exception: If you happen to specifically target zen5, then you might get better hand-written assembly than ispc-style via vp2intersectq, cf Using the most unhinged AVX-512 instruction to make the fastest phrase search algo | Gabriel’s Blog)

Good thoughts! I checked the precision things, and they don’t seem to matter in this case - the smallest differences are 0.001, so Float64 precision is plenty accurate (and I frequently get exact 1.0s), at least with Jaccard distance I’m using here.

re: generality, you’re probably right, but this isn’t for broader consumption. And good to know about file writes! Does the OS just handle parallel attempts to grab the hard-drive?

Well, maybe you could :laughing:. I appreciate that you think this humble biologist knows enough CS to have a favorite cache-oblivious curve :stuck_out_tongue: (or to even know what that means).

Very good - combining this with my writing to files instead,

@views function sparse_distance(mat, dist_func=jaccard)
    @sync for ic in chunks(1:size(mat,2); n=round(Int, size(mat,2)/1000)) 
        filename = "dmats/chunk_$ic.csv"
        isfile(filename) && continue
        @spawn begin
            is = Int[]; js = Int[]; vs = Float64[]
            for i in ic
                for j in axes(mat, 2)
                    i >= j && continue
                    j % 100_000 == 0 && @info i,j
                    d = dist_func(mat[:, i], mat[:, j])
                    d == 1. && continue
                    
                    push!(is, i)
                    push!(js, j)
                    push!(vs, 1. - d)
                end
            end
            CSV.write(filename, (; i=is, j=js, v=vs))
        end
    end
    return nothing
end

Just for fun, I benchmarked the difference between building 3 vectors, building a vector of Tuples, a vector of NamedTuples (since I’m writing to a CSV, this can be used directly by CSV.write()), or a DataFrame. I was surprised the performance on push!ing to DataFrames is so bad… I expected it to be analogous the the vectors or NT vector.

julia> @btime build_vecs(); @btime build_tvec(); @btime build_ntvec(); @btime build_df();
  3.817 ms (112 allocations: 39.30 MiB)
  4.510 ms (113 allocations: 50.73 MiB)
  3.189 ms (57 allocations: 39.30 MiB)
  174.382 ms (6479030 allocations: 164.84 MiB) 
functions
function build_vecs(n=1000)
    is = Int[]
    js = Int[]
    vs = Float64[]
    for i in 1:n
        for j in 1:n
            i < j || continue
            v = rand()
            push!(is, i)
            push!(js, j)
            push!(vs, v)
        end
    end
    return DataFrame(i=is, j=js, v=vs)
end

function build_tvec(n=1000)
    ts = Tuple{Int64, Int64, Float64}[]
    for i in 1:n
        for j in 1:n
            i < j || continue
            v = rand()
            push!(ts, (i,j,v))
        end
    end
    return DataFrame(ts, [:i, :j, :v])
end

function build_ntvec(n=1000)
    nts = @NamedTuple{i::Int64, j::Int64, v::Float64}[]
    for i in 1:n
        for j in 1:n
            i < j || continue
            v = rand()
            push!(nts, (; i,j,v))
        end
    end
    return DataFrame(nts)
end

function build_df(n=1000)
    df = DataFrame(i = Int[], j = Int[], v = Float64[])
    for i in 1:n
        for j in 1:n
            i < j || continue
            v = rand()
            push!(df, (;i,j,v))
        end
    end
    return df
end
1 Like

I think I need to figure out how to spawn these things on my SLURM cluster intead, I keep hitting OOM on my laptop even though I’m trying to write to files

In some sense yes. How else could have 2 different programs read files from disk similtaneously? They surely don’t share locks :slight_smile:

I just googled this and it seems that Jaccard distances boils down to counting the coordinates that are identical between 2 vectors. Assuming this is understanding is correct, what is the element type of your large sparse matrix respectively what are the values? I assume they are very restricted (e.g. not random floats but more like a couple of small ints). I wonder whether a sparse matrix is then the most efficient way to encode this data. Compounding is the fact that, AFAIK, views of sparse arrays not very well supported/efficient. Do need this sparse input matrix for other things as well or could it also be e.g. a vector of sparse vectors?

Also regarding OOMs: your result is a 800kx800k matrix where each element consumes at least 8byte (actually 3x8byte for sparse). This gives 2.56TB for a dense matrix. So to get into reasonable memory territory for a laptop you’d need a sparsity of ~0.1% which would come out to ~7.7GB memory for a sparse matrix). Is this a realistic sparsity in your case?

3 Likes

You may try to run the calculations of one column per round (that will already take ~5Gb of memory if the matrix is not sparse), and save one file per column. Something like:

@views function sparse_distance0(mat, df=jaccard)
    lk = ReentrantLock()
    for i in axes(mat, 2)
        x, y, z = Int32[], Int32[], Float32[]
        for jc in chunks(axes(mat, 2); n=Threads.nthreads())
            for j in jc
                i >= j && continue
                j % 100_000 == 0 && @info i, j
                d = df(mat[:, i], mat[:, j])
                d == 1 && continue
                lock(lk) do
                    push!(x, i)
                    push!(y, j)
                    push!(z, 1 - d)
                end
            end
        end
        # write file
#        CSV.write("file_$i.csv", ... 
    end
end

This should keep the memory footprint low (although with a lot GC time).

2 Likes

Yeah, essentially. In my actual use case, it’s a sparse boolean matrix (presence/absence of microbial genes in stool samples). The raw values are relative abundances (fractions between 0 and 1 that sum to 1), and ultimately I’ll want to use other metrics, but for now I’m just doing mat = input_mat .> 0 and using Jaccard.

The matrix is definitely sparse - when I’ve looked at the first 1000 or so columns, it’s about 5% non-zero. I wonder if it would be worth doing Float32 to reduce things a bit more

An easy win could be to encode the dissimilarity of vectors in an UInt16 (just the count of indices that differ) and use Int32 to store the coordinates. That slashes the size of the resulting matrix by over 50% (24 byte per entry down to 10).

1 Like

Then you can run more than one column per iteration, but not many of them anyway.

Something like this might work:

@views function sparse_distance0(mat, df=jaccard)
    lk = ReentrantLock()
    for i in axes(mat, 2)
        # create file for this i
        xi, yi, zi = Int32[], Int32[], Float32[]
        @sync for jc in chunks(axes(mat, 2); n=Threads.nthreads())
            @spawn begin
                x, y, z = Int32[], Int32[], Float32[]
                for j in jc
                    i >= j && continue
                    j % 100_000 == 0 && @info i, j
                    d = df(mat[:, i], mat[:, j])
                    d == 1 && continue
                        push!(x, i)
                        push!(y, j)
                        push!(z, 1 - d)
                end
                lock(lk) do
                    append!(xi, x)
                    append!(yi, y)
                    append!(zi, z)
                end
            end
        end
        # write xi, yi, zi to file$i.csv
    end
end
1 Like