I want to find all numbers in a file that have a frequency <=100. The numbers are in the UInt32 range so I could allocate a Vector{UInt8}(typemax(UInt32)) to count them (when above 255 I just stop incrementing). This would be ~ 4.2GB. The file I read from is not simply a list of integers I first have to parse them. Since reading goes faster than parsing I could speed it up by using one thread that reads and 4 threads that parse chunks of the buffer.
I donβt care much about occasionally missing frequency counts as long as I get close (and do not corrupt memory leading to odd counts). Hence I wondered if there is an unsafe way to increase these counts. I saw @tkf with TSXPlayground but not entirely sure how to use that.
So something like this, allowing counts to be missed but not corrupted:
function do_stuff(arr::Vector{UInt8})
for i in 1:10_000_000
count = arr[1]
if count < typemax(eltype(arr))
arr[1] += 1
end
end
end
function main()
arr = zeros(UInt8, 10)
# Spawn as in the original code do_stuff() pulls
# from a channel
@sync for i in 1:Threads.nthreads()
Threads.@spawn do_stuff(arr)
end
println([Int(x) for x in arr])
end
main()
(This worked fine still, but this could corrupt I guess? In reality not all Threads will modify the same index but this would be possible)
The reason for the βunsafeβ approach is memory usageβ¦ This of course would be ideal but each thread would need a 4.2GB vector, so just for 4 threads already goes over 20GB.
Other ideas with low memory usage are also welcome but cannot think of another efficient way. Min-sketch counts would be nice, but this is a bad idea for low-frequency due to overestimation
Do you know ahead of time what the largest number is? Do you know if the numbers are sparse? Knowing these things may allow you to have multiple copies for each thread and sum up the numbers independently.
I think there are probably better ways of doing this that are a bit more memory friendly. For example, if you were to sort your list before counting, you can just iterate through with only keeping one value for a count until the number changes, only adding the number in the sorted list to a list when the count is over 100. This method is also easier to scale, as you can start each thread from a different part in the array.
I can know the numbers beforehand (just not their frequency). Ideally I would find a solution that can handle up to the max of UInt64. For a test set, I have the distribution of numbers that looks evenly sampled from the range:
The counts for the numbers are heavily skewed, in fact, 75% of the numbers are <=100. This is because these numbers originate from DNA data and hence are not random:
Iβm not sure if I get your second proposal. Could you elaborate as it sounds nice
if you were to sort your list before counting
β¦as you can start each thread from a different part in the array
I do not really have a list beforehand as I have to read the numbers from a file (or actually parse them as it is text format with some mess in between). I could pass bytes to different threads, parse the numbers, and then maybe do what you propose?
Sorry for the late reply. I think my approach would only work if you can read in the file all at once. If you are interested in the serial version:
function freq_counts(numbers, threshold)
numbers = sort(numbers)
is_above_threshold = BitVector(undef, numbers[end])
fill!(is_above_threshold, false)
current_count = 0
current_number = numbers[begin]
@inbounds for i in numbers
if i == current_number
current_count += 1
else
is_above_threshold[i] = (current_count >= threshold)
current_number = i
current_count = 0
end
end
# Correct final count
if current_count >= threshold
is_above_threshold[current_number] = true
end
return (1:numbers[end])[is_above_threshold]
end
I have tried to make this parallel:
using ThreadsX
function freq_counts_parallel(numbers, threshold)
numbers = ThreadsX.sort(numbers)
is_above_threshold = BitVector(undef, numbers[end])
fill!(is_above_threshold, false)
n_threads = Threads.nthreads()
block_size = cld(length(numbers), n_threads)
Threads.@threads for t in 1:n_threads
start_idx = (t-1)*block_size+1
end_idx = min(t*block_size, length(numbers))
current_count = 0
current_number = numbers[start_idx]
start_num = current_number
# Skip to next number on each thread
@inbounds if t > 1
while current_number == start_num && start_idx <= end_idx
start_idx += 1
current_number = numbers[start_idx]
end
if start_idx >= end_idx
continue
end
end
@inbounds for i in start_idx:length(numbers)
num = numbers[i]
if num == current_number
current_count += 1
else
is_above_threshold[num] = (current_count >= threshold)
current_number = num
current_count = 0
if i >= end_idx
break
end
end
end
# Correct final count
if current_count >= threshold
is_above_threshold[current_number] = true
end
end
return (1:numbers[end])[is_above_threshold]
end
And benchmarking with 8 threads gives only a 2x speed-up, but I imagine for a much a much larger list this would improve:
julia> N = 10^6;
julia> numbers = rand(1:2^10, N) .+ rand((100, 200, 300), N);
julia> @benchmark freq_counts($numbers, $(1050))
BenchmarkTools.Trial: 894 samples with 1 evaluation.
Range (min β¦ max): 4.065 ms β¦ 9.571 ms β GC (min β¦ max): 0.00% β¦ 52.04%
Time (median): 5.101 ms β GC (median): 0.00%
Time (mean Β± Ο): 5.572 ms Β± 1.125 ms β GC (mean Β± Ο): 9.32% Β± 14.51%
ββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββ ββββββββ β
4.06 ms Histogram: frequency by time 8.54 ms <
Memory estimate: 7.64 MiB, allocs estimate: 6.
julia> @benchmark freq_counts_parallel($numbers, $(1050))
BenchmarkTools.Trial: 1617 samples with 1 evaluation.
Range (min β¦ max): 2.194 ms β¦ 7.451 ms β GC (min β¦ max): 0.00% β¦ 48.57%
Time (median): 2.474 ms β GC (median): 0.00%
Time (mean Β± Ο): 3.077 ms Β± 1.384 ms β GC (mean Β± Ο): 19.38% Β± 21.66%
ββββββ ββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββ ββββββ β
2.19 ms Histogram: log(frequency) by time 6.39 ms <
Memory estimate: 7.85 MiB, allocs estimate: 5188.
I should note that you may want to change the BitVector approach if you have a large set of numbers, but a very high threshold. It may be better just to keep a list for each thread and just append to the list. Then you can concat all the lists from each thread together to give your final list of numbers.
Seeing the distribution of the numbers, I have a suggestion (which might not be relevant depending on the details of the application): find the complement set and consider anything outside it to be with <100 occurances.
Finding the items apearing >= 100 times has some nifty algorithms and the resulting list will be much smaller and faster to search and use (keeping it sorted perhaps).
Example of a nifty algorithm, is adding a number/incrementing counter with prob 1/32 (e.g. when 5 random bits == 0b00000). This would with high probability capture any number appearing >= 100 times and even give an estimate of occurance count. After this process, drop all numbers with counts <3 (less than 3, but Iβm willing to spread the love too) and sort the rest into a do-not-include list.
The attraction with such an algorithm is the much lower memory access rate (decent random bits can be calculated on CPU) and thus high performance.
Thanks for the reply! Sounds like a smart idea. I read this 8 times but I think I need a bit (or 8 bits) of help to understand your nifty algorithm though.
Would be awesome as itβs breaking my head for quite some time now haha. Thought of using a custom implementation of mergesort where I merge vectors and at the same time keep track of the frequency. As soon as the frequency >X I will drop the index and overwrite it. But yours sounds much more interesting to try
I started working on something like you proposed (see reply above). I think it could also work iteratively when using something like this where I can provide numbers and existing frequencies. Have to think about how to not keep allocating the outputs though. Maybe two big output arrays that I switch between when merging.
I think this new idea would only work when all numbers are already in the vector right. That would be quite a lot of RAM when the input gets big
I think for this you assumed chunks are sufficiently large to reflect the actual frequency? Or you had another idea? Cause without assuming that, a number can of course miss <3 (giving back love ;p) per chunk but satisfy it over all chunks
I could share the non-passing numbers between chunks again and recheck this every time, but that would make it much more complicated I guess
Yes, you are right, as with everything parallel, nothing is too simple. I meant, that considering the probabilistic nature of algorithm, adding some race condition problems from all threads accessing the same data structure would not be a problem.
In this case, threads can be used to better saturate the bandwidth between CPU and memory.
As for missing items, it is possible to run the algorithm with several diferent random functions and thus get better odds of catching all frequent items.
BTW the example code above is more illustrative. In real implementation, you would want to use random bits (not floats) and perhaps explicitly code the whole loop.
Yeah I did write that in a loop and used a sort to not allocate the map (and make this all in-place)
function rl_filter!(a::Vector{Int64}, c::Int)
sort!(a)
@inbounds prev = a[1]
rl = 0
j = 1
@inbounds for ai in a
if ai == prev
rl +=1
else
if rl >= c
a[j] = prev
j +=1
end
prev = ai
rl = 1
end
end
# Last stretch check
if rl >= c
@inbounds a[j] = prev
j +=1
end
resize!(a,j-1)
sizehint!(a, j-1)
end
function test()
dat = vcat(rand(1:10,1900),rand(11:20,100))
filter!(x->rand()<1/32, dat)
rl_filter!(dat, 3)
println(dat)
end
I think the main βproblemβ I will have is that my files contain billions, and later trillions of numbers. So this is super cool to filter, but keeping track of that what didnβt pass (to incorporate that in the next chunk) will cause quite some overhead.
Maybe this is all just negligible with large chunks though, in the worst case I think something is uncommon while itβs just slightly common (otherwise it wouldβve been spotted in at least one chunk)
It feels there is still a lot of room to optimize. But you have to specifiy the problem more cleanly for readers to engage with it.
The most important parameters are:
N - number of items in files
B - bound on largest number (2^32 or something)
K - number of occurances which make item uninteresting (100?)
And the freqeuncies profile, this is the tricky bit. The best way would be to choose a random subset of, say, a ~100,000 items and see their frequency counts. Something as follows:
function sampleprofile(v, S = 1_000_000; seed = 1)
N = length(v)
mask = nextpow(2, S)-1
d = Dict{eltype(v),Int}()
for i in 1:N
if (seed + hash(v[i]) & mask)==0
d[v[i]] = get(d, v[i], 0)+1
end
end
return d
end
Now sampleprofile(v) will return the dictionary of counts of 1 in about 1_000_000 values (use parameter to set dilution).
This dictionary can be used to generate the more interesting:
d = sampleprofile(v)
datashape = sort(countmap(values(d)))
If you could provide the datashape of your data, it would be helpful.
Finally, there is also the amount of error you are willing to accept (in missing or extraneous values to your target output).
Yeah, this question indeed went from a question about adding numbers to the actual underlying problem that I have to solve Thanks a lot for thinking with me
Number of items in the files: ~25 billion, but this has to scale
bound on the largest number: technically UInt64 ,
number of occurrences that make the item uninteresting: 100
Your function, sampleprofile always seem to return an empty dictionary for me (it never passed the if statement)
If sampleprofile returns empty dictionary, reduce S parameter (defaults to 1_000_000). Reduce it until you get a decent sized dictionary and then proceed with analysing dictionary with sort(countmap(...))