Iterator says `Channel is closed`

A custom iterator throws the subject error every time I run the code below. It does so when run plain or under VSCode, and it does so with one thread or many. It is odd for several reasons:

  1. The line throwing the error is immediately preceded by a test for the channel being open.
  2. There is only one producer and one consumer on the channel.
  3. The problem does not occur when run under the debugger (at least for some versions of the code).
  4. A slight change in seemingly irrelevant code eliminates the problem (even further below).

Can anyone explain to me what is going on, or how I can avoid the problem? Does this look like a julia bug?

I think the problem arises after successful iteration of all possibilities; given the different behavior under the debugger that isn’t a very firm conclusion.

Running julia 1.8.5 on MS-Windows Server 2019.

In case you’re wondering, my motivation was to do something like Python’s yield, which seems to require a Channel in julia.

My apologies for the length; this is what I ended up with building up from simpler code that didn’t exhibit the error.

Code to Generate Error

struct TestR
    requests
end


function test_helper(chan::Channel, t::TestR)
    for (ctor, λs) in t.requests
        for λ in λs
            put!(chan, λ)
        end
    end
end

function Base.iterate(t::TestR)
    f(c::Channel) = test_helper(c, t)
    chan = Channel(f)
    return Base.iterate(t, chan)
end


function Base.iterate(t::TestR, chan)
    if !isopen(chan)
        return nothing
    end
    x = take!(chan)  # error thrown from here
    if isnothing(x)
        return nothing
    end
    return (x, chan)
end

function Base.length(t::TestR)
    sum(length(λs) for (_, λs) in t.requests)
end

function outside(t::TestR)
    r = Array{String}(undef, length(t))
    i = 1
    for x in t
        r[i] = string(x)
    end
    return r
end

myt = TestR([(exp, (1.4,)), ])
println(outside(myt))

Error Message

PS C:\Users\rdboylan\Documents\BP\MSEP\src> julia --project -t4 -- test3.jl
ERROR: LoadError: InvalidStateException: Channel is closed.
Stacktrace:
 [1] try_yieldto(undo::typeof(Base.ensure_rescheduled))
   @ Base .\task.jl:871
 [2] wait()
   @ Base .\task.jl:931
 [3] wait(c::Base.GenericCondition{ReentrantLock})
   @ Base .\condition.jl:124
 [4] take_unbuffered(c::Channel{Any})
   @ Base .\channels.jl:433
 [5] take!
   @ .\channels.jl:410 [inlined]
 [6] iterate
   @ C:\Users\rdboylan\Documents\BP\MSEP\src\test3.jl:25 [inlined]
 [7] outside(t::TestR)
   @ Main C:\Users\rdboylan\Documents\BP\MSEP\src\test3.jl:41
 [8] top-level scope
   @ C:\Users\rdboylan\Documents\BP\MSEP\src\test3.jl:46
in expression starting at C:\Users\rdboylan\Documents\BP\MSEP\src\test3.jl:46

Error-free Code

Same as other code except

function outside(t::TestR)
    r = Array{String}(undef, length(t))
    for x in t
        print(x,", ")
    end
    print("\n")
end

I seem to have found something that prevents the problem: inserting sleep(0.001) before the isopen() test in iterate(t::TestR, chan). This is somewhat consistent with the theory that the problem is a race, and the channel really is being closed between the isopen() and the (apparently) immediately following take!.

However, in that case inserting yield() in the same spot seems as if it should work too, and it does not do so reliably. In the presence of some print() statements I had added for debugging yield worked, but when I commented those out it didn’t.

Also, unlike a typical race, the behavior seems to be reproducible: given a particular set of code, it either always succeeds or always fails. My samples sizes have been small, however.

My reading of the documentation is that the channel should not be closed until after the take!. When Channel is instantiated with a function argument, as this code does, the docs say it won’t be destroyed until the generated Task completes. They also seem to say that put! on the channel blocks until the value is read. I can’t quite tell what behavior would be expected if the channel has a buffer (i.e., if put! ends up placing an object in the buffer, is that enough to proceed?), but I am using the default which is unbuffered.

The sample code only has one value to iterate. So after take!() the code in test_helper(), which is executing in the task, can proceed from its put!(), which should cause it to exit, resulting in the channel being closed by julia. And for some reason the closure is deferred sometimes to between the test whether the channel is open and reading from the channel. Under this interpretation the problem isn’t really that the take! fails, but that isopen() is true even though, the generator having been exhausted, it should be false.

An alternate approach would be to dispense with the isopen() test entirely, instead putting the take!() inside an error catcher which would just return none if an InvalidStateException were thrown. I don’t know what the penalty for such error handling is in julia, and it seems a pretty ugly solution. Then again, so is sprinkling sleep() in the code :frowning: .

Here’s what seems a more reliable solution. Instead of attempting to pull items from the channel myself, use a channel iterator. The behavior noted above is very unfortunate, but does not seem to violate the semantics of the language, which do not assure that coroutines will execute in any particular order.

function Base.iterate(t::TestR)
    f(c::Channel) = test_helper(c, t)
    chan = Channel(f)
    r = iterate(chan)
    if isnothing(r)
        return nothing
    end
    return (r[1], (chan, r[2]))
end


function Base.iterate(t::TestR, state)
    r = iterate(state[1], state[2])
    if isnothing(r)
        return nothing
    end
    # I think (r[1], state) would also work for next
    # But safer to treat channel state as opaque.
    return (r[1], (state[1], r[2]))
end

There are some subtleties.

My earliest attempts tried to return either the channel or the results of iterate(chan) for iterate(::TestR). But this does not reduce the problem to one previously solved, because the iterator infrastructure will still call iterate(::TestR, state) for the next iteration.

Second, the state for the TestR iterator must include the channel as well as the state for the underlying channel iterator, since the channel iterator needs both.

In my very limited testing, using the same high-level code that threw an error before, the code above works without error.

So how does iterate produce reliable results for Channels?

So

  1. It does test isopen().
  2. isready() is an allowed alternative. How it could be ready without being open I don’t know.
  3. It does use try ... catch to do the take!.
  4. Iterator state is always nothing since the Channel already has the state.