Issues with shared memory parallel k-means implementation

So, you’re missing 2 things here, neither of which is necessarily obvious. First the easy one:

  • You need to launch Julia with the environment variable JULIA_NUM_THREADS set to the number of threads you want to use. The default is just 1. Try setting this to the number of threads available on your system before launching Julia, and check that it stuck with Threads.nthreads().

And next, the less obvious one:

  • There is a nasty closure-related bug in Julia that almost always comes up when using Threads.@threads: https://github.com/JuliaLang/julia/issues/15276. Suffice to say, the “easy” solution is to take the entire expression that you had within the macro, and move it to a function. Then, just call that function with the appropriate arguments in your @threads call. An example of this is provided below.

Doing the both of these things, I get about 0.7 seconds for sequential, and ~0.2 seconds for 8 threads (non-linear scaling likely due to memory bandwidth or something like that).

Relevant changes:

function do_thread(clusters_per_thread, clusters, points, labels, i)                                                            
    @inbounds begin                                                                                                             
        thread_id = Threads.threadid()                                                                                          
        point = points[i]                                                                                                       
        _, min_index = findmin([distance(cluster, point) for cluster in clusters])                                              
        labels[i] = min_index                                                                                                   
        add_point(clusters_per_thread[thread_id][min_index], point)                                                             
    end                                                                                                                         
end
...
    @inbounds for iter in 1:max_iter
        clusters_per_thread = [[Cluster() for _ in 1:k] for _ in 1:Threads.nthreads()]

        Threads.@threads for i in 1:num_points
            do_thread(clusters_per_thread, clusters, points, labels, i)
            #thread_id = Threads.threadid()
            #point = points[i]
            #_, min_index = findmin([distance(cluster, point) for cluster in clusters])
            #labels[i] = min_index
            #add_point(clusters_per_thread[thread_id][min_index], point)
        end