Strange error with coroutines

#1

I get an error filling a an array using two different Channels. Surprisingly, the error disappears by adding a debug println statement. I tried to set up a smaller example. However, with only one Channel the error could not be reproduced.

"""
    deliver(c::Channel, pile::Vector{Int}, r::UnitRange{Int})

Take a card from range `r` in `pile`.
"""
function deliver(c::Channel, pile::Vector{Int}, r::UnitRange{Int})
    for i in r
        put!(c, pile[i])
    end
end

"""
    collectlr!(pile::Vector{Int}, c1::Channel, c2::Channel)

Collect cards randomly form `c1` or `c2`. If one of them is exhausted return the other.
"""

function collectlr!(dest::Vector{Int}, c1::Channel, c2::Channel)
    k = 0
    for i in 1:lastindex(dest)
        if !isready(c1)
           return(c2, i)
        else
            rlr = rand()
            if rlr < 0.5
                dest[i] = take!(c1)
                #println("c1: ", dest[i])
            elseif !isready(c2)
                return(c1, i)
            else
                dest[i] = take!(c2)
                #println("c2: ", dest[i])
            end
        end
        k = i
    end
    return(c1, k)
end

function mix!(dest::Vector{Int}, source::Vector{Int})
    p = lastindex(source)
    hp = div(p, 2)
    c1 = Channel(c -> deliver(c, source, 1:hp))
    c2 = Channel(c -> deliver(c, source, hp + 1:p))
    (c, i) = collectlr!(dest, c1, c2)
    for j in i:lastindex(dest)
        dest[j] = take!(c)
    end
    dest
end

Execution:

julia> p1 = collect(1:8); p2 = zeros(Int, 8); mix!(p2, p1)
ERROR: InvalidStateException("Channel is closed.", :closed)
Stacktrace:
 [1] try_yieldto(::typeof(Base.ensure_rescheduled), ::Base.RefValue{Task}) at ./event.jl:196
 [2] wait() at ./event.jl:255
 [3] take_unbuffered(::Channel{Any}) at ./channels.jl:338
 [4] take! at ./channels.jl:315 [inlined]
 [5] mix!(::Array{Int64,1}, ::Array{Int64,1}) at ./REPL[4]:8
 [6] top-level scope at none:0

Uncomment the println-statements and get:

julia> p1 = collect(1:8); p2 = zeros(Int, 8); mix!(p2, p1)
c2: 5
c2: 6
c2: 7
c1: 1
c2: 8
8-element Array{Int64,1}:
 5
 6
 7
 1
 8
 2
 3
 4

Thanks for hints!

0 Likes

#2

I think you need yield() calls (e.g. after the “concurrent” take!). Base.println uses the same libuv event loop as the Channels, so it does the yield for you. (You can use Core.print to diagnose tasks without interfering.)

0 Likes

#3

Yes, works! Thanks for the hint and the info about the event loop!

I tried another version using Iterators.Stateful. This seems to be more efficient in this case.

0 Likes