Issues with shared memory parallel k-means implementation

performance
parallel

#1

Hello.

I’m trying to implement the k-means algorithm using the @threads macro. I’ve also implemented a sequential program for comparison, but it always run faster than the parallel version. The changes to the parallel algorithm are minimal and I can’t see where the problem lies. Even if I decrease the number of threads the problem still persists.

I tried profiling and measuring allocations, but nothing useful came up.

Sequential version: https://gist.github.com/novoselrok/0a250406774358104f2b9c6a8bb65b82
Parallel version: https://gist.github.com/novoselrok/d9f7c1c7ff722c8701a18988409a0181
Here is some test data: https://gist.github.com/novoselrok/d5b21991394c6850f5bf61fe540490b6

You can run the algorithms with the following command:
julia main.jl test_10000_100_4.txt 4 100
or from the REPL:
main(test_10000_100_4.txt", "4", "100")

I appreciate any comments, I’m still learning a lot about Julia, so thank you :slight_smile:


#2

So, you’re missing 2 things here, neither of which is necessarily obvious. First the easy one:

  • You need to launch Julia with the environment variable JULIA_NUM_THREADS set to the number of threads you want to use. The default is just 1. Try setting this to the number of threads available on your system before launching Julia, and check that it stuck with Threads.nthreads().

And next, the less obvious one:

  • There is a nasty closure-related bug in Julia that almost always comes up when using Threads.@threads: https://github.com/JuliaLang/julia/issues/15276. Suffice to say, the “easy” solution is to take the entire expression that you had within the macro, and move it to a function. Then, just call that function with the appropriate arguments in your @threads call. An example of this is provided below.

Doing the both of these things, I get about 0.7 seconds for sequential, and ~0.2 seconds for 8 threads (non-linear scaling likely due to memory bandwidth or something like that).

Relevant changes:

function do_thread(clusters_per_thread, clusters, points, labels, i)                                                            
    @inbounds begin                                                                                                             
        thread_id = Threads.threadid()                                                                                          
        point = points[i]                                                                                                       
        _, min_index = findmin([distance(cluster, point) for cluster in clusters])                                              
        labels[i] = min_index                                                                                                   
        add_point(clusters_per_thread[thread_id][min_index], point)                                                             
    end                                                                                                                         
end
...
    @inbounds for iter in 1:max_iter
        clusters_per_thread = [[Cluster() for _ in 1:k] for _ in 1:Threads.nthreads()]

        Threads.@threads for i in 1:num_points
            do_thread(clusters_per_thread, clusters, points, labels, i)
            #thread_id = Threads.threadid()
            #point = points[i]
            #_, min_index = findmin([distance(cluster, point) for cluster in clusters])
            #labels[i] = min_index
            #add_point(clusters_per_thread[thread_id][min_index], point)
        end