Channel not unblocking during some iteration situations

I’ve been fiddling around with iteration of a tree traversal to learn about interfaces and channels, and I’ve run into something I can’t seem to get around.

When using an unbuffered Channel(), iteration in a for loop blocks after the first value while using general functions (e.g. push!, collect, comprehensions, etc), however, it continues to completion when using I/O (e.g. print, @show, etc). This is the first big distinction I’ve found, but I accept there very well may be more subtlety to the matter (I did notice that Base.take! is also defined for I/O).

MWE (most relevant code at top):

abstract type AbstractNode end

function traverse(node::AbstractNode, c::Channel)
    put!(c, node.data)
    foreach(branch -> traverse(branch, c), branches(node))
end

function Base.iterate(node::AbstractNode)
    state = Channel(c -> traverse(node, c))
    isready(state) ? (take!(state), state) : nothing
end
function Base.iterate(node::AbstractNode, state)
    #println(isready(state))
    isready(state) ? (take!(state), state) : nothing
end

Base.iterate(::AbstractNode, ::Nothing) = nothing
Base.IteratorSize(::AbstractNode) = Base.SizeUnknown()

struct Leaf <: AbstractNode
    data::Int
end

struct Node <: AbstractNode
    data::Int
    leftbranch::Union{Node, Leaf}
    rightbranch::Union{Node, Leaf}
end

struct Root <: AbstractNode
    data::Int
    leftbranch::Union{Node, Leaf}
    rightbranch::Union{Node, Leaf}
end

data(node::Union{Root, Node, Leaf}) = node.data
branches(node::Union{Node, Root}) = (node.leftbranch, node.rightbranch)
branches(node::Leaf) = ()

With the following examples:

julia> tree = Root(5, Node(4, Leaf(3), Leaf(2)), Node(6, Leaf(7), Leaf(8)));

julia> [n for n in tree]
1-element Vector{Any}:
 5

julia> foreach(n -> println(n), tree)
 5
 4
 3
 2
 6
 7
 8

julia> [n for n in Channel(c -> traverse(tree, c))]
7-element Vector{Int64}:
 5
 4
 3
 2
 6
 7
 8

Also, uncommenting the print(isready(state)) line in my Base.iterate(node::AbstractNode, state) method allows for full iteration (note: printing is necessary and pushing isready(state) to an array does not work). It appears the call to take! doesn’t unblock the channel as I would have expected.

The following code (modified from Interfaces in the Manual) returns the full array, with the Channel being unblocked by the call to push!:

arr = []
next = iterate(tree)
while next !== nothing
    (item, state) = next
    push!(arr, item)
    next = iterate(tree, state)
end
arr

The example in the manual says that this is what for item in itr push!(arr, item) end translates into, so I’m not sure why this while loop works, but the for loop fails.

Note: I’m interested in figuring out why this is happening for pedagogical reasons and would like to see if it is possible to find a fix that still uses an unbuffered channel.

Other work-arounds I’ve already found include:

  • forgoing the Channel and simply pushing the data from the traverse function to an array
  • similarly, collect the Channel and use popfirst! instead of take!
  • using a buffered Channel(c -> traverse(node, c), n) where n ≥ 7 for the provided example tree

That said, if there is an established “best practice” for this type of example, I’d be happy to know about it as well. Cheers!

In case anyone is curious, it appears that defining the two argument Base.iterate method as:

function Base.iterate(node::AbstractNode, state)
    yield()
    isready(state) ? (take!(state), state) : nothing
end

works, for (presumably) the same reason as having the call to println() in there does. This led me to think that the take!(c::Channel) method (from julia/base/channels.jl) could be modified, and indeed adding a yield() to the finally clause (after unlock(c)) in take_unbuffered(c) also seems to allow for iteration in this case:

function take_unbuffered(c::Channel{T}) where T
    lock(c)
    try
        check_channel_state(c)
        notify(c.cond_put, nothing, false, false)
        return wait(c.cond_take)::T
    finally
        unlock(c)
        yield() # added for for loop iteration
    end
end

The complementary put_unbuffered method already has a yield in its definition, apparently to:

immediately give taker a chance to run, but don’t block the current task

So, I assume this yield added to take_unbuffered does likewise to immediately give getter a chance to run, but don’t block the current task. However, I’m unsure about this.

Further discussion where @jakobnissen points to a “best practices” implementation of the interface as:

function Base.iterate(node::AbstractNode)
    state = Channel(c -> traverse(node, c))
    iterate(node, state)
end

function Base.iterate(node::AbstractNode, state::Channel)
    # This iterate call on the channel will block if there are no more elements,
    # and will return `nothing` if there are no more elements AND the channel
    # is closed, which happens when `traverse` has yielded all its elements.
    y = iterate(state)
    isnothing(y) ? y : (first(y), state)
end

And that the behavior found with my implementation is expected (or at least not unexpected):

Putting in yield in iterate seems to solve the problem, because it allows the scheduler to switch to traverse. But it’s not a real fix, because there is still no guarantee the scheduler will actually switch task.