Why `lock` does NOT work for full channel?

I am wondering why lock function does NOT work for full channel? For example:

function showme(n, m)
    c = Channel(m)
    t1 = @async begin
        sleep(3)
        for _ in 1:n
            put!(c, 0)
            sleep(0.1)
        end
    end
    t2 = @async begin
        lock(c)
        for _ in 1:n
            put!(c, 1)
            sleep(0.1)
        end
        unlock(c)
    end
    d= []
    sleep(3)
    for _ in 1:2n
        push!(d, take!(c))
    end
    show(d)    
end
julia> showme(5,5)
Any[1, 1, 1, 1, 1, 0, 0, 0, 0, 0]
julia> showme(5,4)
Any[1, 1, 1, 1, 0, 1, 0, 0, 0, 0]
# lock does NOT works when channel is full !

Actually, i think my answer was incorrect; some slight variations recover the same underlying problem, even with a separate lock:

function showme(n, m)
    c = Channel(m)
    l = ReentrantLock()
    t1 = @async begin
        for x in 1:n

            put!(c, 0)
            sleep(0.1)
        end
    end
    t2 = @async begin
        lock(l)
        for x in 1:n
            put!(c,1)
            sleep(0.001)
        end
        unlock(l)
    end

    d= []
    sleep(1)
    for _ in 1:2n
        push!(d, take!(c))
    end
    show(d)
end

julia> showme(5,5)
Any[0, 1, 1, 1, 1, 1, 0, 0, 0, 0]
julia> showme(5,4)
Any[0, 1, 1, 1, 1, 0, 1, 0, 0, 0]

After digging in, it appears that lock(c::Channel) calls lock(c.cond_take.lock), the underlying ReentrantLock for the channel. And, your put! call ends up nesting lock calls (see https://github.com/JuliaLang/julia/blob/master/base/channels.jl, around line 313), and observe also:

julia> c = Channel(4)
Channel{Any}(4) (empty)

julia> islocked(c.cond_put.lock)
false

julia> lock(c)

julia> lock(c)

julia> islocked(c.cond_put.lock)
true

julia> unlock(c)

julia> islocked(c.cond_put.lock)
true

julia> unlock(c)

julia> islocked(c.cond_put.lock)
false

My intuition is that these nested locks (which I don’t quite understand) are interacting with you asynchronous code in unanticipated ways. Perhaps your lock call in t2 is nesting within a lock that is blocking triggered by a put! in t1?

It looks you can lock the calls within t1 to get the behaviour you want:

function showme(n, m)
    c = Channel(m)
    t1 = @async begin
        for _ in 1:n
            lock(c)
            put!(c, 0)
            unlock(c)
            sleep(0.1)
        end
    end
    t2 = @async begin
        lock(c)
        for _ in 1:n
            put!(c,1)
            sleep(0.001)
        end
        unlock(c)
    end

    d= []
    sleep(1)
    for _ in 1:2n
        push!(d, take!(c))
    end
    show(d)
end
julia> showme(5,5)
Any[0, 1, 1, 1, 1, 1, 0, 0, 0, 0]
julia> showme(5,4)
Any[0, 1, 1, 1, 1, 1, 0, 0, 0, 0]

Thanks. One special case that when m =1 break the expected lock rule ! !

Following results comes from the above pyrex41 changed code

julia> showme(5,0)
Any[0, 1, 1, 1, 1, 1, 0, 0, 0, 0]
julia> showme(5,1)
Any[0, 1, 0, 1, 1, 1, 1, 0, 0, 0]
julia> showme(5,2)
Any[0, 1, 1, 1, 1, 1, 0, 0, 0, 0]
julia> showme(5,3)
Any[0, 1, 1, 1, 1, 1, 0, 0, 0, 0]
julia> showme(5,4)
Any[0, 1, 1, 1, 1, 1, 0, 0, 0, 0]
julia> showme(8,1)
Any[0, 1, 0, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0]

That lock isn’t doing what you think it’s doing. A lock does not prevent another task from writing to the channel. It only prevents two channels from executing code simultaneously while holding the lock.

In your case, after 3 seconds t2 has written 4 items to the channel and is waiting to write a 5th, and t1 is also waiting to write it’s first item to the channel (there is no lock to prevent it doing so). So there is a race between the two tasks to write the 5th item, and in your case t1 wins the race hence the result you see.

To get the behaviour you want you can use locks in both tasks.

4 Likes

Now, I understand ‘lock’ does NOT mean ‘blocking’. What I want is when a task operate on channel, other task is blocked to operate on that channel until the ‘block’ released.

Following is the ‘blocking’ version code:

function showme(n,m)
    lk = Channel{Bool}(1)
    c = Channel(m)
    t1 = @async begin
        sleep(rand()/10) # sleep time to judge which task 'block' the channel! 
        put!(lk, true)
        for _ in 1:n
            put!(c, 0)
            sleep(0.1)
        end
        take!(lk)
    end
    t2 = @async begin
         sleep(rand()/10)
        put!(lk, true)
        for _ in 1:n
            put!(c, 1)
            sleep(0.1)
        end
        take!(lk)
    end
    d= []
    sleep(1)
    for _ in 1:2n
        push!(d, take!(c))
    end
    show(d)    
end

test with different ‘m’:

julia> showme(5,0)
Any[0, 0, 0, 0, 0, 1, 1, 1, 1, 1]
julia> showme(5,0)
Any[1, 1, 1, 1, 1, 0, 0, 0, 0, 0]
julia> showme(5,1)
Any[0, 0, 0, 0, 0, 1, 1, 1, 1, 1]
julia> showme(5,1)
Any[0, 0, 0, 0, 0, 1, 1, 1, 1, 1]
julia> showme(5,1)
Any[1, 1, 1, 1, 1, 0, 0, 0, 0, 0]

how to use ‘lock’ to get what I want? Thanks

Change above ‘blocking’ version to ‘lock’ version and the result is what I want. But I don’t understand why the non-blocking ‘lock’ will virtually ‘block’ the operation. The simplified code is following:

function testlock()
    lk = Channel{Bool}()
    t1 = @async begin
        lock(lk)
        sleep(2)
        unlock(lk)
        println("lock - 2 sec: done")
    end
    t2 = @async begin
        sleep(0.001) # make sure the t1 task run first
        lock(lk)
        sleep(1)
        unlock(lk)
        println("lock - 1 sec: done")
    end
end

test:

julia> testlock()
Task (runnable) @0x00007fa5b4deb1f0

julia> lock - 2 sec: done
lock - 1 sec: done

The thing make me confusing is: Why lock(lk) will ‘lock’ its following operations until ‘unlock(lk)’ ? Seems that the ‘lock’ is doing locking the task while ‘lock’ is a non-blocking operation !

An other lock version with ReentrantLock
The simplified code is following:

function lockme()
    lk = ReentrantLock()
    t1 = @async begin
        lock(lk)
        sleep(2)
        unlock(lk)
        println("lock - 2 sec: done")
    end
    t2 = @async begin
        lock(lk)
        sleep(1)
        unlock(lk)
        println("lock - 1 sec: done")
    end
    sleep(0.001) # make sure the t1 or t2 task run first
    function fun()
        println("lock - 0 sec: done")
    end
    lock(fun, lk)
end

test:

julia> lockme()
lock - 2 sec: done
lock - 1 sec: done
lock - 0 sec: done

My understanding of above: the operations between lock and unlock will executed within the task without any interruption from other task.

if you want a thread to respect lock, you need to lock from that thread, such that if the object is already locked, this thread will wait until it is able to obtain the lock.

Can you give me some suggestion on my opinion:

My understanding of above: the operations between lock and unlock will executed within the task without any interruption from other task.

Thanks

yes, assuming all other tasks also has work gated between lock unlock, as I said

1 Like