Finding low frequency numbers in a large collection

Hey all :slight_smile:


See edit here for clarification

I want to find all numbers in a file that have a frequency <=100. The numbers are in the UInt32 range so I could allocate a Vector{UInt8}(typemax(UInt32)) to count them (when above 255 I just stop incrementing). This would be ~ 4.2GB. The file I read from is not simply a list of integers I first have to parse them. Since reading goes faster than parsing I could speed it up by using one thread that reads and 4 threads that parse chunks of the buffer.

I don’t care much about occasionally missing frequency counts as long as I get close (and do not corrupt memory leading to odd counts). Hence I wondered if there is an unsafe way to increase these counts. I saw @tkf with TSXPlayground but not entirely sure how to use that.

So something like this, allowing counts to be missed but not corrupted:

function do_stuff(arr::Vector{UInt8})
    for i in 1:10_000_000
        count =  arr[1] 
        if count < typemax(eltype(arr))
            arr[1] += 1
        end
    end
end

function main()
    arr = zeros(UInt8, 10)

    # Spawn as in the original code do_stuff() pulls
    # from a channel
    @sync for i in 1:Threads.nthreads()
        Threads.@spawn do_stuff(arr)
    end

    println([Int(x) for x in arr])
end

main()

(This worked fine still, but this could corrupt I guess? In reality not all Threads will modify the same index but this would be possible)

Any tips? :smiley:

Give each thread its own array and then add them together at the end.

1 Like

The reason for the β€œunsafe” approach is memory usage… This of course would be ideal but each thread would need a 4.2GB vector, so just for 4 threads already goes over 20GB.

Other ideas with low memory usage are also welcome but cannot think of another efficient way. Min-sketch counts would be nice, but this is a bad idea for low-frequency due to overestimation

You might check out

The point of that code, using atomic, was to show how slow it is not to enhance performance when threaded… see the benchmark output:

Atomic operation is 10x slower… benchmark increments a single location in an array 1000 times

Quite some discussion like this and this, that atomic adds are very slow

There are a bunch of sketch variants, eg https://arxiv.org/pdf/1502.04885.pdf

Thanks. I read that paper before. I hoped that there would be a more direct way than implementing the sketch from scratch in Julia

Do you know ahead of time what the largest number is? Do you know if the numbers are sparse? Knowing these things may allow you to have multiple copies for each thread and sum up the numbers independently.

I think there are probably better ways of doing this that are a bit more memory friendly. For example, if you were to sort your list before counting, you can just iterate through with only keeping one value for a count until the number changes, only adding the number in the sorted list to a list when the count is over 100. This method is also easier to scale, as you can start each thread from a different part in the array.

1 Like

Hey!

I can know the numbers beforehand (just not their frequency). Ideally I would find a solution that can handle up to the max of UInt64. For a test set, I have the distribution of numbers that looks evenly sampled from the range:

The counts for the numbers are heavily skewed, in fact, 75% of the numbers are <=100. This is because these numbers originate from DNA data and hence are not random:


(The x-scale is log10)


I’m not sure if I get your second proposal. Could you elaborate as it sounds nice :slight_smile:

if you were to sort your list before counting
…as you can start each thread from a different part in the array

I do not really have a list beforehand as I have to read the numbers from a file (or actually parse them as it is text format with some mess in between). I could pass bytes to different threads, parse the numbers, and then maybe do what you propose?

Sorry for the late reply. I think my approach would only work if you can read in the file all at once. If you are interested in the serial version:

function freq_counts(numbers, threshold)
    numbers = sort(numbers)

    is_above_threshold = BitVector(undef, numbers[end])
    fill!(is_above_threshold, false)

    current_count = 0
    current_number = numbers[begin]
    @inbounds for i in numbers
        if i == current_number
            current_count += 1
        else
            is_above_threshold[i] = (current_count >= threshold)
            current_number = i
            current_count = 0
        end
    end

    # Correct final count
    if current_count >= threshold
        is_above_threshold[current_number] = true
    end

    return (1:numbers[end])[is_above_threshold]
end

I have tried to make this parallel:

using ThreadsX
function freq_counts_parallel(numbers, threshold)
    numbers = ThreadsX.sort(numbers)

    is_above_threshold = BitVector(undef, numbers[end])
    fill!(is_above_threshold, false)

    n_threads = Threads.nthreads()
    block_size = cld(length(numbers), n_threads)
    Threads.@threads for t in 1:n_threads

        start_idx = (t-1)*block_size+1
        end_idx = min(t*block_size, length(numbers))

        current_count = 0
        current_number = numbers[start_idx]
        start_num = current_number
        # Skip to next number on each thread
        @inbounds if t > 1
            while current_number == start_num && start_idx <= end_idx
                start_idx += 1
                current_number = numbers[start_idx]
            end
            if start_idx >= end_idx
                continue
            end
        end
        @inbounds for i in start_idx:length(numbers)
            num = numbers[i]
            if num == current_number
                current_count += 1
            else
                is_above_threshold[num] = (current_count >= threshold)
                current_number = num
                current_count = 0
                if i >= end_idx
                    break
                end
            end
        end

        # Correct final count
        if current_count >= threshold
            is_above_threshold[current_number] = true
        end
    end

    return (1:numbers[end])[is_above_threshold]
end

And benchmarking with 8 threads gives only a 2x speed-up, but I imagine for a much a much larger list this would improve:

julia> N = 10^6;
julia> numbers = rand(1:2^10, N) .+ rand((100, 200, 300), N);
julia> @benchmark freq_counts($numbers, $(1050))
BenchmarkTools.Trial: 894 samples with 1 evaluation.
 Range (min … max):  4.065 ms … 9.571 ms  β”Š GC (min … max): 0.00% … 52.04%
 Time  (median):     5.101 ms             β”Š GC (median):    0.00%
 Time  (mean Β± Οƒ):   5.572 ms Β± 1.125 ms  β”Š GC (mean Β± Οƒ):  9.32% Β± 14.51%

           β–‚β–ˆβ–ˆβ–†β–ƒβ–‚
  β–‚β–β–β–β–β–β–β–‚β–ƒβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–‡β–ˆβ–‡β–ƒβ–ƒβ–‚β–‚β–‚β–‚β–β–‚β–β–β–β–β–β–β–β–β–β–β–β–β–‚β–‚β–‚β–‚β–‚β–‚β–‚β–ƒβ–ƒβ–ƒβ–…β–„β–„β–ƒβ–ƒβ–„β–ƒβ–‚β–‚ β–ƒ
  4.06 ms        Histogram: frequency by time       8.54 ms <

 Memory estimate: 7.64 MiB, allocs estimate: 6.

julia> @benchmark freq_counts_parallel($numbers, $(1050))
BenchmarkTools.Trial: 1617 samples with 1 evaluation.
 Range (min … max):  2.194 ms … 7.451 ms  β”Š GC (min … max):  0.00% … 48.57%
 Time  (median):     2.474 ms             β”Š GC (median):     0.00%
 Time  (mean Β± Οƒ):   3.077 ms Β± 1.384 ms  β”Š GC (mean Β± Οƒ):  19.38% Β± 21.66%

   β–‚β–‡β–ˆβ–‡β–„β–                                             ▃▄▃▁   
  β–†β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–„β–†β–„β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–β–„β–β–β–β–β–β–β–β–β–„β–…β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆ
  2.19 ms     Histogram: log(frequency) by time     6.39 ms <

 Memory estimate: 7.85 MiB, allocs estimate: 5188.

I should note that you may want to change the BitVector approach if you have a large set of numbers, but a very high threshold. It may be better just to keep a list for each thread and just append to the list. Then you can concat all the lists from each thread together to give your final list of numbers.

1 Like

Seeing the distribution of the numbers, I have a suggestion (which might not be relevant depending on the details of the application): find the complement set and consider anything outside it to be with <100 occurances.
Finding the items apearing >= 100 times has some nifty algorithms and the resulting list will be much smaller and faster to search and use (keeping it sorted perhaps).

Example of a nifty algorithm, is adding a number/incrementing counter with prob 1/32 (e.g. when 5 random bits == 0b00000). This would with high probability capture any number appearing >= 100 times and even give an estimate of occurance count. After this process, drop all numbers with counts <3 (less than 3, but I’m willing to spread the love too) and sort the rest into a do-not-include list.

The attraction with such an algorithm is the much lower memory access rate (decent random bits can be calculated on CPU) and thus high performance.

Needless to say, this is highly parallelizable.

1 Like

Thanks for the reply! Sounds like a smart idea. I read this 8 times but I think I need a bit (or 8 bits) of help to understand your nifty algorithm though.

Would be awesome as it’s breaking my head for quite some time now haha. Thought of using a custom implementation of mergesort where I merge vectors and at the same time keep track of the frequency. As soon as the frequency >X I will drop the index and overwrite it. But yours sounds much more interesting to try :smiley:

I started working on something like you proposed (see reply above). I think it could also work iteratively when using something like this where I can provide numbers and existing frequencies. Have to think about how to not keep allocating the outputs though. Maybe two big output arrays that I switch between when merging.

I think this new idea would only work when all numbers are already in the vector right. That would be quite a lot of RAM when the input gets big

I was thinking of something along these lines:

julia> dat = vcat(rand(1:10,1900),rand(11:20,100));

julia> filter(((k,v),)-> v >= 3, countmap(filter(x->rand()<1/32, dat)))
Dict{Int64, Int64} with 10 entries:
  5  => 6
  4  => 5
  6  => 5
  7  => 7
  2  => 9
  10 => 5
  9  => 10
  8  => 11
  3  => 3
  1  => 5

The prepared dat has 1:10 appearing frequently and the 11:20 rare. And the latter statement finds the ones appearing frequently.

My suggestion is to optimize the algorithm and code without multithreading and then continue to multithread.

1 Like

Needless to say, this is highly parallelizable.

I think for this you assumed chunks are sufficiently large to reflect the actual frequency? Or you had another idea? Cause without assuming that, a number can of course miss <3 (giving back love ;p) per chunk but satisfy it over all chunks

I could share the non-passing numbers between chunks again and recheck this every time, but that would make it much more complicated I guess

Yes, you are right, as with everything parallel, nothing is too simple. I meant, that considering the probabilistic nature of algorithm, adding some race condition problems from all threads accessing the same data structure would not be a problem.
In this case, threads can be used to better saturate the bandwidth between CPU and memory.
As for missing items, it is possible to run the algorithm with several diferent random functions and thus get better odds of catching all frequent items.
BTW the example code above is more illustrative. In real implementation, you would want to use random bits (not floats) and perhaps explicitly code the whole loop.

Yeah I did write that in a loop and used a sort to not allocate the map (and make this all in-place)

function rl_filter!(a::Vector{Int64}, c::Int)
    sort!(a) 
    @inbounds prev = a[1]
    rl = 0
    j = 1
    @inbounds for ai in a
        if ai == prev 
            rl +=1 
        else 
            if rl >= c 
                a[j] = prev 
                j +=1 
            end 
            prev = ai 
            rl = 1 
        end
    end
    # Last stretch check
    if rl >= c 
        @inbounds a[j] = prev
        j +=1
    end
   resize!(a,j-1)
   sizehint!(a, j-1)
end


function test() 
    dat = vcat(rand(1:10,1900),rand(11:20,100)) 
    filter!(x->rand()<1/32, dat)
    rl_filter!(dat, 3)
    println(dat)
end

I think the main β€œproblem” I will have is that my files contain billions, and later trillions of numbers. So this is super cool to filter, but keeping track of that what didn’t pass (to incorporate that in the next chunk) will cause quite some overhead.

Maybe this is all just negligible with large chunks though, in the worst case I think something is uncommon while it’s just slightly common (otherwise it would’ve been spotted in at least one chunk)

It feels there is still a lot of room to optimize. But you have to specifiy the problem more cleanly for readers to engage with it.
The most important parameters are:
N - number of items in files
B - bound on largest number (2^32 or something)
K - number of occurances which make item uninteresting (100?)
And the freqeuncies profile, this is the tricky bit. The best way would be to choose a random subset of, say, a ~100,000 items and see their frequency counts. Something as follows:

function sampleprofile(v, S = 1_000_000; seed = 1)
    N = length(v)
    mask = nextpow(2, S)-1
    d = Dict{eltype(v),Int}()
    for i in 1:N
        if (seed + hash(v[i]) & mask)==0
            d[v[i]] = get(d, v[i], 0)+1
        end
    end
    return d
end

Now sampleprofile(v) will return the dictionary of counts of 1 in about 1_000_000 values (use parameter to set dilution).
This dictionary can be used to generate the more interesting:

d = sampleprofile(v)
datashape = sort(countmap(values(d)))

If you could provide the datashape of your data, it would be helpful.
Finally, there is also the amount of error you are willing to accept (in missing or extraneous values to your target output).

Yeah, this question indeed went from a question about adding numbers to the actual underlying problem that I have to solve :sweat_smile: Thanks a lot for thinking with me

Number of items in the files: ~25 billion, but this has to scale
bound on the largest number: technically UInt64 ,
number of occurrences that make the item uninteresting: 100

Your function, sampleprofile always seem to return an empty dictionary for me (it never passed the if statement)

If sampleprofile returns empty dictionary, reduce S parameter (defaults to 1_000_000). Reduce it until you get a decent sized dictionary and then proceed with analysing dictionary with sort(countmap(...))