How to parallelize the computation of a minimum

Consider the following code:

function hamming4(bits1::Integer, bits2::Integer)
    return count_ones(bits1 ⊻ bits2)
end

function random_strings2(n, N)
    mask = UInt128(1) << n - 1
    return [rand(UInt128) & mask for i in 1:N]
end

function find_min(strings, n, N)
    minsofar = n
    for i in 1:N
        for j in i+1:N
            dist = hamming4(strings[i], strings[j])
            if dist < minsofar
                minsofar = dist
            end
        end
    end
    return minsofar
end

function ave_min(n, N)
    ITER = 500
    strings = random_strings2(n, N)
    new_min = find_min(strings, n, N)
    avesofar = new_min
    # print("New min ", new_min, ". New ave ", avesofar, "\n")
    total = avesofar
    for i in 1:ITER-1
        strings = random_strings2(n, N)
        new_min = find_min(strings, n, N)
        avesofar = avesofar*(i/(i+1)) + new_min/(i+1)
        print("Iteration ", i, ". New min ", new_min, ". New ave ", avesofar, "\n")
    end
    return avesofar
end

N = 2^15
n = 89
    
print("Overall average ", ave_min(n, N), "\n")

I would like to parallelize this code to speed it up. To do this I tried to add Threads.@threads for i in 1:N in the find_min function but this doesn’t seem to work at all. I assume it’s because the minsofar variable is causing dependencies.

How can I parallelize the find_min function?

Here’s one idea: you can keep track of what each thread has seen, and then combine at the end:

minsofar = fill(n, Threads.nthreads())
Threads.@threads for i in 1:N 
...
    @inbounds if dist < minsofar[Threads.threadid()]
        minsofar[Threads.threadid()] = dist
...
return minimum(minsofar)
2 Likes

This does work! Thank you.

I notice that it is using about 430% CPU instead of closer to 800% (according to top) and none of the cores is 100% busy. Is this just because it costs too much to start the threads maybe?

Yea I don’t know, I get about 3x from threading, on a 6-core machine. Besides @inbounds it also seems to be quicker to write directly min(new, old). Here’s the whole function:

function find_min(strings, n, N)
    minsofar = fill(n, Threads.nthreads())
    Threads.@threads for i in 1:N
    tid = Threads.threadid() 
        @inbounds for j in i+1:N
            dist = hamming4(strings[i], strings[j])
            minsofar[tid] = min(dist, minsofar[tid])
        end
    end
    minimum(minsofar)
end
1 Like

Would you mind pasting the whole for loop for your fastest version please? I am new to Julia.

Did you find @inbounds helped as well? Or even https://github.com/chriselrod/LoopVectorization.jl ?

Hmm… that version with min(dist, minsofar[tid]) is in fact a lot slower for me. With ITER = 10 I get 27 vs 10 seconds. It does indeed seem to be using more of the CPU but not in a good way.

You can do it with @spawn

function splitter(n, k)
    xz = [0; reverse([Int(ceil(n*(1 - sqrt(i/k)))) for i in 1:k-1]); n]
    return [xz[i]+1:xz[i+1] for i in 1:k]
end

function thread_find_min(strings, n, N, k)
    ranges = splitter(N, k)
    tasks = Vector{Task}(undef, k - 1)

    for i in 1:k - 1
        tasks[i] = Threads.@spawn chunk_find_min(strings, n, N, ranges[i])
    end

    minsofar = chunk_find_min(strings, n, N, ranges[end])

    return min(minsofar, minimum(fetch.(tasks)))
end

function chunk_find_min(strings, n, N, r)
    minsofar = n
    for i in r
        for j in i+1:N
            dist = hamming4(strings[i], strings[j])
            if dist < minsofar
                minsofar = dist
            end
        end
    end
    return minsofar
end

function ave_min2(n, N, iters = 10)
    k = Threads.nthreads()
    ITER = iters
    strings = random_strings2(n, N)
    new_min = thread_find_min(strings, n, N, k)
    avesofar = new_min
    # print("New min ", new_min, ". New ave ", avesofar, "\n")
    total = avesofar
    for i in 1:ITER-1
        strings = random_strings2(n, N)
        new_min = thread_find_min(strings, n, N, k)
        avesofar = avesofar*(i/(i+1)) + new_min/(i+1)
        print("Iteration ", i, ". New min ", new_min, ". New ave ", avesofar, "\n")
    end
    return avesofar
end

# previous version for completeness
function ave_min(n, N, iters = 10)
    ITER = iters
    strings = random_strings2(n, N)
    new_min = find_min(strings, n, N)
    avesofar = new_min
    # print("New min ", new_min, ". New ave ", avesofar, "\n")
    total = avesofar
    for i in 1:ITER-1
        strings = random_strings2(n, N)
        new_min = find_min(strings, n, N)
        avesofar = avesofar*(i/(i+1)) + new_min/(i+1)
        print("Iteration ", i, ". New min ", new_min, ". New ave ", avesofar, "\n")
    end
    return avesofar
end

On my laptop, with Threads.nthreads() == 4

@time print("Overall average ", ave_min(n, N, 10), "\n") 
# 5.597879 seconds (413 allocations: 5.014 MiB)

@time print("Overall average ", ave_min2(n, N, 10), "\n") 
# 1.705441 seconds (22.07 k allocations: 6.130 MiB)
2 Likes

Do you have 6 physical cores?

2 Likes

What is the function splitter doing?

It splits 1:N range to smaller ranges which are handled by individual threads

julia> splitter(10_000, 4)
4-element Array{UnitRange{Int64},1}:
 1:1340    
 1341:2929 
 2930:5000 
 5001:10000

It splits unevenly in order to respect the nature of the problem. When you are approaching N inner loop is getting smaller and smaller.

1 Like