For a recent multi-threaded simulation, I needed a thread-safe way to use pre-allocated arrays, and the simple “nthreads() length vector of arrays” wasn’t working due to the presence of yield points in the spawn’ed tasks. (The problem noted by @tkfhere.)
I came up with a fairly simple type to add some locks around the pre-allocated arrays, and it solved my problem and the simulation results look correct. However, I’m not very familiar with multi-threading, so I’m curious if there are any obvious drawbacks or potential issues with this approach:
struct SafeBuffer{T,N}
held::Semaphore
locks::NTuple{N,ReentrantLock}
buffers::Vector{T}
end
function acquirebuffer(buffer)
acquire(buffer.held)
@inbounds for ci in eachindex(buffer.locks)
if trylock(buffer.locks[ci])
return buffer.locks[ci], buffer.buffers[ci]
end
end
return nothing
end
function releasebuffer(lock, buffer)
unlock(lock)
release(buffer.held)
end
function withbuffer(f, buffer)
l, buf = acquirebuffer(buffer)
try
f(buf)
finally
releasebuffer(l, buffer)
end
end
Not to reply to a question with a question, but aren’t you describing a need to do a thread safe multi producer - multi consumer pattern? Would that not be better suited to Channels of fixed size?
I’ve never been able to understand concurrent producer/consumer patterns with Channels, so it isn’t clear to me what that would look like. In particular, I’m not sure how you would reuse arrays with that approach.
The async docs provide a really good example halfway down.
As you will see - you can create a channel of fixed type and size (which would be your array replacement) and write / read to them asynchronously.
If you need to lock access to an array / vector of fixed size and then read / write by index, the above wont suit since the Channel approach is analogous to pipes and you only get access to top / bottom element. It may also require a bit of a rewrite. My assumption here is that you need to fill something up with multiple threads and then potentially empty or consume as its filled; with the fixed size for performance.
Obviously a bit of a guess (tricky without more info) and still not answering your original question…
This pattern is very handy and performs reasonably well in many situations. But, since the whole purpose of Channel is to serialize the access, it’s not really the best approach. If your input collection is supported by SplittablesBase.jl, FLoops.jl’s @init can be useful.
But, if you need a buffer pool that works outside producer-consumer pattern or parallel loop, I see that something like SafeBuffer in the OP could be handy. One way to simplify the synchronization is:
struct BufferPool{T}
buffers::Vector{T}
condition::Threads.Condition
end
function bufferpool(factory, n::Integer)
n > 0 || error("need at least one buffer; got n = $n")
return BufferPool([factory() for _ in 1:n], Threads.Condition())
end
function withbuffer(f, pool::BufferPool)
b = lock(pool.condition) do
while isempty(pool.buffers)
wait(pool.condition)
end
pop!(pool.buffers)
end
try
f(b)
finally
lock(pool.condition) do
push!(pool.buffers, b)
notify(pool.condition)
end
end
end
Do you have something along the lines of my SafeBuffer or BufferPool implemented in one of your packages? (Or are you aware of such a thing in a published package?)
Can you comment on any benefits (performance or otherwise) to BufferPool over SafeBuffer?
No, I don’t think I’ve used something like this in released packages so far.
SafeBuffer vs BufferPool is just my reaction to “hmm… there are many locks… can I use less?” Since Base.Semaphore uses a Threads.Condtion anyway, I think the performance can be better for BufferPool since it uses strictly less locks. But I don’t know if that’s noticeable. Also, ideally, it’d be better if I we had a lock-free (-ish) stack for something like buffer pool (which I’ve been wanting to play with).
You can implement a lock-free and thread-safe buffer in Actors. This is based on channels and buffer access is serialized through the actor’s message channel (link). A simple implementation would be:
module SafeBuffer
using Actors
export newBuffer
struct Buffer{T}
lk::Link
end
(b::Buffer)() = call(b.lk)
# actor behavior
buf(x) = copy(x)
buf(x, f, args...) = f(x, args...)
# interface function
newBuffer(x::Vector{T}) where T = Buffer{Vector{T}}(Actors.spawn(buf, x))
Actors.call(b::Buffer, args...) = call(b.lk, args...)
end
with that you can do:
julia> include("safebuffer.jl")
Main.SafeBuffer
julia> using Actors, .SafeBuffer, .Threads
julia> sf = newBuffer([1,2,3,4,5])
Main.SafeBuffer.Buffer{Vector{Int64}}(Link{Channel{Any}}(Channel{Any}(32), 1, :default))
julia> sf()
5-element Vector{Int64}:
1
2
3
4
5
julia> @threads for _ in 1:495
call(sf, push!, threadid())
end
julia> length(sf())
500
julia> map(x->length(filter(==(x), sf())), 1:nthreads())
16-element Vector{Int64}:
32
32
32
32
⋮
31
31
31
30
julia> @threads for _ in 1:499
call(sf, pop!)
end
julia> sf()
1-element Vector{Int64}:
1
Note: the Guards actors library provides a more comfortable implementation.
How does that enable the use and reuse of separate arrays? The use case being described here is multiple calls along the lines of:
@spawn withbuffer(pool) do buf
fill_array_somehow!(buf)
reduce_array_func(buf)
end
where there are a number of pre-allocated arrays, one of which is “claimed” and used by mutating-bang functions with a guarantee that the contents of the array buf won’t be changed by another spawned task. It looks like there is only one array in your solution?
You would have an actor for each array and access them concurrently as you want. There will be no races. If multiple tasks have the actor’s link, they can send it messages and therefore change the guarded buffer.
For that you would have to ensure that only one task at a time has access to a buffer. With Actors you can write a pool-actor managing the access to a buffer pool. If a task calls acquire, the pool-actor would give it access to a buffer, if it calls release, the actor takes it back. In your example the withbuffer task finishes after calling release. The reference to the buffer then is destroyed and it would be safe for the pool-actor to hand over variables or references.
Since Julia does not support migration of tasks across OS threads (yet), waking up one task may not maximize the thread usage. But yeah, you can manage waiter tasks more cleverly like per-thread queues.
IIUC, Actors.jl looks like an abstraction based on Channel. Since Channel does use locks, it is not lock-free. Usually, lock-free data structure means, very roughly speaking, “use atomics instead of locks and alike for non-blocking communication” and in loose conversation often include wait-free and obstruction-free. Of course, since you can trivially write a spin lock using atomics, the key point is how the atomics are used. See, e.g., Non-blocking algorithm - Wikipedia for proper definitions of them. I saw people using lock-free to just simply mean “not using lock” but that’s not what the term means for experts.
a language construct or a standard library function is implemented using a lock - where lock ordering can be carefully controlled – or if a user or 2nd party package uses them,
in solving a concurrency problem you share memory using locks or if you share data by communicating (as Go people put it).
We probably agree that locks should be avoided or are over-used. One problem with them is, that they are so easy to use and to botch everything in a concurrent program.
A Channel is a higher-level language construct to serialize concurrent access to a shared resource or service. But it is not an easy alternative to locks since it always needs some function (task, goroutine …) to serve the resource. Actors.jl abstracts away the channel and basically focusses on the serving.
BTW if in a future Julia version – maybe with atomics – there will be a lock-free implementation of Channel, I will be happy to use that. But a stateful receiver (a take!) on a channel will always block until there is data to receive.
Absolutely! This is why my first suggestion was to use Channel. I agree “share memory by communicating” is the best starting point for concurrent programming.