Hello.
I’m trying to implement the k-means algorithm using the @threads macro. I’ve also implemented a sequential program for comparison, but it always run faster than the parallel version. The changes to the parallel algorithm are minimal and I can’t see where the problem lies. Even if I decrease the number of threads the problem still persists.
I tried profiling and measuring allocations, but nothing useful came up.
Sequential version: kmeans_seq.jl · GitHub
Parallel version: https://gist.github.com/novoselrok/d9f7c1c7ff722c8701a18988409a0181
Here is some test data: https://gist.github.com/novoselrok/d5b21991394c6850f5bf61fe540490b6
You can run the algorithms with the following command:
julia main.jl test_10000_100_4.txt 4 100
or from the REPL:
main(test_10000_100_4.txt", "4", "100")
I appreciate any comments, I’m still learning a lot about Julia, so thank you
So, you’re missing 2 things here, neither of which is necessarily obvious. First the easy one:
- You need to launch Julia with the environment variable
JULIA_NUM_THREADS
set to the number of threads you want to use. The default is just 1. Try setting this to the number of threads available on your system before launching Julia, and check that it stuck with Threads.nthreads()
.
And next, the less obvious one:
- There is a nasty closure-related bug in Julia that almost always comes up when using
Threads.@threads
: https://github.com/JuliaLang/julia/issues/15276. Suffice to say, the “easy” solution is to take the entire expression that you had within the macro, and move it to a function. Then, just call that function with the appropriate arguments in your @threads
call. An example of this is provided below.
Doing the both of these things, I get about 0.7 seconds for sequential, and ~0.2 seconds for 8 threads (non-linear scaling likely due to memory bandwidth or something like that).
Relevant changes:
function do_thread(clusters_per_thread, clusters, points, labels, i)
@inbounds begin
thread_id = Threads.threadid()
point = points[i]
_, min_index = findmin([distance(cluster, point) for cluster in clusters])
labels[i] = min_index
add_point(clusters_per_thread[thread_id][min_index], point)
end
end
...
@inbounds for iter in 1:max_iter
clusters_per_thread = [[Cluster() for _ in 1:k] for _ in 1:Threads.nthreads()]
Threads.@threads for i in 1:num_points
do_thread(clusters_per_thread, clusters, points, labels, i)
#thread_id = Threads.threadid()
#point = points[i]
#_, min_index = findmin([distance(cluster, point) for cluster in clusters])
#labels[i] = min_index
#add_point(clusters_per_thread[thread_id][min_index], point)
end
3 Likes