Thread-safe implementation for simple counter/mean

I have a callable which performs an iterative calculation, which is otherwise thread-safe. I would like to keep a statistic of the mean number of iterations that it used, and is thread-safe.

Stylized code follows:

mutable struct OnlineMean
    count::Int
    mean::Float64
    OnlineMean() = new(0, 0.0)
end

function update!(om::OnlineMean, x)
    om.count += 1
    om.mean += (x - om.mean) / om.count
    nothing
end

struct MyCallable
    … # other fields
    mean_iterations::OnlineMean
end

# this function called from various tasks
function (mc::MyCallable)(z)
    y, iterations = do_the_work_and_count(mc, z) # assume this is thread-safe
    update!(mc.mean_iterations, iterations) # QUESTION: make this thread-safe
    y
end

Is this as simple as putting a SpinLock in OnlineMean and using it in update!? Or should I somehow (how?) use Channels to pass in updates, or atomic operations…

Any of the above?

Spinlock is simple, and since the critical section is small, probably not too much at risk for starvation.

Channel has a lock internally, so it’s the same but with more memory overhead (and more complicated to control, since you need the one writer task that you need to maintain).

Atomics are probably optimal in terms of overhead, but they’ll be a bit tricky here since you need to update two things that can’t be written atomically at the same time due to both being 8 bytes large. Consider getting some inspiration from Sequence counters and sequential locks — The Linux Kernel documentation , the technique there can probably be adapted to your usecase, depending on how frequent you read/write data.

I’d use a ReentrantLock here by wrapping OnlineMean in Lockable. It’s probably good enough. Spinlocks are tricky to use and I doubt they provide much benefit over a ReentrantLock, but they do cause issues if you change the code so it can yield while the lock is taken.

I can’t think of a way to do this atomically, unless you instead store the sum and the count instead of the count and the mean.

I thought about the problem and I think I could just do that, as I limit iterations to be <500, and practically they are around 20–100. So with an UInt64, I can still record 2^{64-9} \approx 10^{16} observations without overflow.

Can you please give me a hint on how to do this atomically, eg implementing update!(::OnlineMean, ::Int) above? I read the Julia Atomic Manifesto but I am still not sure how to use this feature. Relying on 1.12 features is OK for me.

Below is an example of using atomics. However, updates such as @atomic om.x += 1 is implemented with a loop in LLVM (since neither armv8 or x86 has dedicated instructions for this), so on my Mac, under contention, using atomics is much slower than using a lock.

Here is the atomic version:

mutable struct OnlineMean
    count::Threads.Atomic{UInt64}
    sum::Threads.Atomic{UInt64}

    OnlineMean() = new(Threads.Atomic{UInt64}(0), Threads.Atomic{UInt64}(0))
end

mean(om::OnlineMean) = om.sum[] / om.count[]

# This is thread-safe. Multiple tasks can increment om.count
# before om.sum, but that will not corrupt the data in `om`,
# as both fields will be updated correctly eventually.
function update!(om::OnlineMean, x::Integer)
    ux = UInt64(x)
    # Monotonic is enough here, since we only care about the operation
    # itself being atomic, and don't care about its ordering w.r.t other
    # operations
    Threads.atomic_add!(om.count, UInt64(1))
    Threads.atomic_add!(om.sum, ux)
    om
end

function foo()
    om = OnlineMean()

    # THis loop is thread-safe
    Threads.@threads for _ in 1:10000
        for _ in 1:1000
            obs = rand(UInt(0):UInt(100))
            update!(om, obs)
        end
    end

    # NOTE: Computing the mean IS NOT threadsafe concurrently
    # with calling `update!`, since the om.count and om.sum fields
    # may be updated at different times! However, once the loop
    # above is complete, it's in the right state
    return mean(om)
end

And here with a lock:

mutable struct OnlineMean2
    count::UInt64
    sum::UInt64

    OnlineMean2() = new(0, 0.0)
end

mean(om::OnlineMean2) = om.sum / om.count

function update!(om::Lockable{OnlineMean2}, x::Integer)
    ux = UInt64(x)
    @lock om begin
        om[].count += 1
        om[].sum += ux
    end
    om
end

function foo2()
    om = Lockable(OnlineMean2())

    Threads.@threads for _ in 1:10000
        for _ in 1:1000
            obs = rand(UInt(0):UInt(100))
            update!(om, obs)
        end
    end

    @lock om mean(om[])
end

Finally, the fastest result for this case is to have each thread update seperate counters and then merge them at the end - on my computer this is 10x faster than either. But of course, the speed there depends on how expensive instantiation of these OnlineMean objects are - in this example, they’re very cheap but that might not be the case for your real world code.

function update!(om::OnlineMean2, x::Integer)
    om.count += 1
    om.sum += UInt64(x)
    om
end

function foo3()
    means = [OnlineMean2() for thread in 1:10000]
    Threads.@threads for i in eachindex(means)
        om = means[i]
        for _ in 1:1000
            obs = rand(UInt(0):UInt(100))
            update!(om, obs)
        end
    end
    om = means[1]
    for other_om in @view means[2:end]
        om.count += other_om.count
        om.sum += other_om.sum
    end
    mean(om)
end

A few more observations:

First, I’m really surprised the lock version is faster than the atomic version. I guess it goes to show both how good our ReentrantLocks are, and also how slow atomic read-modify-write is.

Also, I was wrong about codegen. On my armv8 and x86 there are indeed instructions for this.

Also, atomicrmw codegen from the @atomic macro was updated recently (codegen: add a pass for late conversion of known modify ops to call atomicrmw - Pull Request #57010 - JuliaLang/julia - GitHub) to be more efficient. I think this lands in 1.13. In 1.12 and before, you need to use Threads.Atomic instead of the more general and versatile @atomic macro.

Finally, and very annoyingly, codegen introspection lies about atomics (Codegen introspection misleading for`atomic_max!` · Issue #59645 · JuliaLang/julia · GitHub), which makes this a little tricky to dig into.

So, my takeaway:

  • ReentrantLock remains king, and Lockable is the way to use it.
  • Atomics are cool but they’re tricky to get right and should only be used when benchmarking verifies it makes a difference
  • Structuring your code so that it requires as little as possible thread coordination, and as much work as possible are done in a single task without inter-task communication or synchronization is best.

There’s a potential numerical problem here in that the order that you add the floats together will matter. What you may want to consider is storing some of numbers into a buffer where you sort them. The main idea is that you want to add numbers of similar magnitude together first.

There’s some additional explanation here:

Thanks for all those detailed benchmarks! I still don’t understand ReentrantLock fuly. Suppose in our OnlineMean2 example, Task 1 acquires a lock and starts the operation, and then Task 2 also does this before Task 1 is done. Does ReentrantLock allow this? Would this not cause data corruption? (Sorry for the silly question)

I would prefer that, and would use task local values, eg from OhMyThreads.jl. But I am unsure how to collect those at the end.

Just in case you missed this: OnlineStats.jl does what you want, and seems to have at least some threaded functionality: Big Data · OnlineStats Documentation

If you change the OnlineMean to

mutable struct OnlineMean2
@atomic count::Int
@atomic sum::Float64
end
getMean(om::OnlineMean2) = onlineMean2.sum/onlineMean2.count

then you can simply use atomics, no need for locks for concurrent writers (readers may still conflict with writers, though)

The reason you get away with this is that your atomic updates to the fields commute (up to floating point rounding). Your original algorithm where you store count and mean does not have this property: Updates require count and mean to be jointly atomic, not just separately atomic.

You can also do

mutable struct AtomicOnlineMean
@atomic count_mean::Tuple{Int, Float64}
end

and do atomic updates. This is lock-free because you’re atomically updating 16 bytes and all modern architectures can do that. Yay!

If you had to carry more values than fit into 16 bytes, then your situation starts to get more complicated.

Atomics on that use a spinlock internally, with all the issues that implies (e.g.: if your system is under load, the thread holding the spinlock can get preempted in the critical section, causing all other threads to hammer the spinlock, causing more system load, which increases the chance of lock-holders getting preempted in the critical section. I am an enemy of naive spinlocks, but don’t want to digress too much).

Sorting them still leads to an O(n) error bound though it reduces the constant coefficient (on the bound, but not necessarily on the actual error as pointed out by Higham), whereas pairwise summation changes the bound to O(\log n) (and is also much faster than sorting and summing, assuming you use a large base case) , and Kahan summation changes it to O(1). Though in all cases you might still get a large relative error if the sum is ill-conditioned (\sum |x_i| \gg |\sum x_i|).

If you are storing the numbers in a buffer, you might as well call sum, which does pairwise summation. Or just use Kahan summation, since the extra arithmetic cost is probably dwarfed by the cost of acquiring a lock. This is assuming roundoff error is a concern at all (which it may not be… you may have plenty of precision for naive summation in your problem).

If your concern is not the magnitude of the error but rather that the answer be deterministic (independent of the thread scheduling), of course, then I sorting is a viable option.

Before this topic goes off on a tangent about summing floats, I just want to note that it is all integers,

so I can keep an integer sum just as well.

fastest will probably be a local sum per thread and periodically aggregate, right?

Having read A Concurrency Cost Hierarchy | Performance Matters, I fully expect so.

Possibly, I am not quite confused as the advice I have seen in the Julia community is not to think about threads, but tasks, as tasks may just be reassigned to a different thread (unless I take explicit steps against this).

I don’t know how to “periodically aggregate” in this framework. I could if I could access task-local storage for all tasks, but I don’t see an API for that.

yes, I should have said “task” not “thread.” sloppy word choice

I don’t know the details of your framework, but one random idea off the top of my head might be to do the sum under a lock, but the task holds the lock most of the time (so it doesn’t have to constantly acquire it for each summand) and then releases every % 1000 == 0 operations to wait for aggregation

I guess the major workload happens at this line, rather than to repeatedly update the mean value.

So if let me to implement it, I would just use a Channel to coordinate a vector of tasks.

As suggested in Post #7, multithreading in julia is somewhat tricky, which admits of diverse methods. It’s rare for a user to predict which one could be faster. Altogether, I think Channel + repeatedly tasks spawning is good. And you can monitor the progress of the update of the mean (which is exactly the spirit of online). (By comparison, the methods using Locks above probably don’t achieve this.)

When using locks (including ReentrantLock), if task 1 takes the lock, and task 2 attempts to take the lock before task 1 has released it, then task 2 will stall until task 1 has released the lock.

For ReentrantLock specifically, task 2 will enter a short loop where it will repeatedly try to take the lock (i.e. it will spin). After some number of attempts, task 2 will push itself into a queue managed by the lock, and then go to sleep. Once task 1 is done, the lock will wake up the first task waiting in the queue, if any. This hybrid approach ensures low latency if the lock is quickly released, but also avoids keeping the thread busy in a loop if the lock isn’t quickly released.

In this case, since update! is so fast, probably most of the time, tasks waiting for the lock will be able to acquire the lock while spinning and won’t go to sleep. That’s probably why it’s fast.

What is the benefit of using Lockable? I am looking into using it, but it is a julia 1.11 and up feature