CPU multithreading: why can't my code estimate PI (throwing darts)?

Well there are a number of problems present here. Let’s start high level and work towards the more technical things. In the end, I will present a simple solution using OhMyThreads.jl.

General strategy

for _ = 1:N
    Threads.@spawn begin
        hits_in_circle = hits_in_circle + throw_dart()
    end
end

Ignoring all the other details, this code spawns a Task for every shot you want to do - in total a whopping 2^24 Tasks. This is not a good a idea, because Task come with some overhead, so the amount of work a Task does needs to amortize this. A single throw_dart is insanely cheap (4.7ns on my machine) and Task creation has an overhead of \mathcal{O}(\mu s) due to creating and scheduling IIRC. On top of that a Task allocates a bit of context, which might cause the memory issues you are seeing.

What you should do instead: Split your workload in decently sized chunks and spawn a Task for every chunk. This is facilitated by packages like ChunkSplitters.jl. In your case it is rather easy to just divide the range up (note this code is still incorrect!):

function estimate_pi(N)
    nthreads = Threads.nthreads()
    chunksize = ceil(Int, N/nthreads) # round value up
    println("Running estimate PI with MULTI threading on CPU. nr of threads: ", nthreads)
    hits_in_circle = 0
    for _ = 1:nthreads
        Threads.@spawn begin
            for _ in 1:chunksize
                hits_in_circle = hits_in_circle + throw_dart()
            end
        end
    end
    pi_est = 4 * hits_in_circle / (nthreads*chunksize)
    println("PI is ", pi_est)
end

Synchronization

This code does not work, because you just tell Julia to create and run these tasks but not to wait for them to finish. This can be fixed easily with putting @sync front of the outer loop. This time no out-of-memory should occur, however the results likely are still wrong.

Race condition

Even with @sync the result is likely too small. This is because you have multiple threads reading and writing to/from the same memory location concurrently here:

hits_in_circle = hits_in_circle + throw_dart()

This line really consists of the individual steps:

  1. Read the value of hits_in_circle
  2. Add the result from throw_dart
  3. Write result back to memory

If you have multiple threads doing this concurrently, then you risk losing some hits, if 2 threads read at the same time. There are a couple of ways around this like using so called atomic instructions or locks or some other synchronization mechanism. But all of these cost you performance. So it is best to restructure your code, to make the race condition disappear. This is very easy in this case: Every Task can just accumulate in a local variable and then return this total. Then you just need to sum up the individual results and there is no chance to ever have a data race :slight_smile:
This could look like:

function estimate_pi(N)
    nthreads = Threads.nthreads()
    chunksize = ceil(Int, N/nthreads) # round value up
    println("Running estimate PI with MULTI threading on CPU. nr of threads: ", nthreads)
    tasks = []
    for _ = 1:nthreads
        task = Threads.@spawn begin
            hits_in_circle = 0
            for _ in 1:chunksize
                hits_in_circle = hits_in_circle + throw_dart()
            end
            hits_in_circle # return the value
        end
        push!(tasks, task)
    end
    total = mapreduce(fetch, +, tasks) # fetch results from the individual tasks and add up
    # since we use fetch no @sync is needed
    pi_est = 4 * total / (nthreads*chunksize)
    println("PI is ", pi_est)
end

Easy solution with OhMyThreads.jl

Okay now that I explained the pitfalls, I will show you how easy this could be :slight_smile: I will just copy the example from OhMyThreads’s Readme:

function estimate_pi(N; ntasks=nthreads())
    M = tmapreduce(+, 1:N; ntasks) do i
        rand()^2 + rand()^2 < 1.0
    end
    pi = 4 * M / N
    return pi
end
14 Likes