Hello. I am trying to implement myself a (write-preferring) read-write lock in Julia, following the explanation in this article: Readers–writer lock - Wikipedia
Here is the ReadWriteLocks
module I wrote,
module ReadWriteLocks
export ReadWriteLock, read_lock, write_lock
mutable struct ReadWriteLock
readers::Int
writers::Int
writing::Bool
lock::ReentrantLock
cond::Threads.Condition
function ReadWriteLock()
new(0, 0, false, ReentrantLock(), Threads.Condition())
end
end
function read_lock(f, l::ReadWriteLock)
lock(l.lock) do
while l.writers > 0 || l.writing
wait(l.cond::Threads.Condition)
end
l.readers += 1
end
f()
lock(l.lock) do
l.readers -= 1
if l.readers == 0
notify(l.cond::Threads.Condition)
end
end
nothing
end
function write_lock(f, l::ReadWriteLock)
lock(l.lock) do
l.writers += 1
while l.readers > 0 || l.writing
wait(l.cond::Threads.Condition)
end
l.writers -= 1
l.writing = true
end
f()
lock(l.lock) do
l.writing = false
notify(l.cond::Threads.Condition, all=true)
end
nothing
end
end
and here is a code that uses the above defined module:
using .ReadWriteLocks
@show Threads.nthreads()
function read_task()
t = rand(0.001:0.003)
sleep(t)
end
function write_task()
tid = Threads.threadid()
t = rand(0.001:0.001:0.05) # Generate a variable workload
sleep(t)
return tid
end
n = 100
a = fill(0, n)
l = ReadWriteLock()
Threads.@threads for i in 1:n
read_lock(l) do
read_task()
end
write_lock(l) do
a[i] = write_task()
end
end
@show a
However, if I run the above code, I get the following error:
TaskFailedException
Stacktrace:
[1] wait
@ ./task.jl:334 [inlined]
[2] threading_run(func::Function)
@ Base.Threads ./threadingconstructs.jl:38
[3] top-level scope
@ ./threadingconstructs.jl:97
nested task error: ConcurrencyViolationError("lock must be held")
Stacktrace:
[1] concurrency_violation()
@ Base ./condition.jl:8
[2] assert_havelock(l::ReentrantLock, tid::Nothing)
@ Base ./condition.jl:29
[3] assert_havelock
@ ./lock.jl:36 [inlined]
[4] assert_havelock
@ ./condition.jl:72 [inlined]
[5] _wait2(c::Base.GenericCondition{ReentrantLock}, waiter::Task)
@ Base ./condition.jl:83
[6] wait(c::Base.GenericCondition{ReentrantLock})
@ Base ./condition.jl:120
[7] (::Main.ReadWriteLocks.var"#5#7"{ReadWriteLock, Int64})()
@ Main.ReadWriteLocks ~/.julia/dev/Example/src/ReadWriteLocks.jl:43
In this article Announcing composable multi-threaded parallelism in Julia it says
… a
Condition
object (used to signal tasks when events occur) can only be used by the thread that created it. Attempts to wait for or notify conditions from other threads will raise errors. Separate thread-safe condition variables have been added, and are available asThreads.Condition
. This needs to be a separate type because thread-safe use of condition variables requires acquiring a lock.
If I understand the above explanation correctly, having declared cond::Threads.Condition
explicitly, I expect that I wouldn’t get the above error. I should be misunderstanding something, but I couldn’t help out my self. Can anyone help me?
Thank you!