Relaunch the task bound to channel

Is it possible to relaunch a task bound to a channel so that the data can be recollected from the channel?. Here is a MWE:

julia> ch = Channel(0) do ch 
       for i in 1 : 5 
       put!(ch, i)
       end 
       end
Channel{Any}(0) (1 item available)

julia> collect(ch)
5-element Vector{Any}:
 1
 2
 3
 4
 5

julia> collect(ch) 
Any[]

To do this, I tried to re-initialize the task bound to the channel, but I got an exception saying that the channel is closed.

ulia> ch = Channel(0) do ch 
       for i in 1 : 5 
       put!(ch, i)
       end 
       end
Channel{Any}(0) (1 item available)

julia> f = ch.cond_put.waitq.head.code
#517 (generic function with 1 method)

julia> collect(ch) 
5-element Vector{Any}:
 1
 2
 3
 4
 5

julia> collect(ch) 
Any[]

julia> @async f() 
Task (failed) @0x00007f47aaa60160
InvalidStateException("Channel is closed.", :closed)
Stacktrace:
 [1] check_channel_state
   @ ./channels.jl:170 [inlined]
 [2] put!
   @ ./channels.jl:314 [inlined]
 [3] (::var"#13#14")(ch::Channel{Any})
   @ Main ./REPL[5]:3
 [4] (::Base.var"#517#518"{var"#13#14", Channel{Any}})()
   @ Base ./channels.jl:132
 [5] (::var"#15#16")()
   @ Main ./task.jl:406

There are almost two problems with that approach:

  • after the ch = Channel(0) do ch for i in 1 : 5 ... loop the task has finished and
  • with the collect(ch) the channel is closed.

If you want repeatedly receive data from a channel, you need

  1. a persistent server task and
  2. for the client/server interaction you need some convention, for example
    • a messaging pattern or
    • if the server delivers continuously on the client side some collect_data function, where you can specify the number of items to collect.

A messaging pattern could follow the following lines:

# define the messages
struct Request
    n::Int
end

struct Response{T}
    y::T
end

struct Stop end

# define the server behavior
function serve(req, res)
    i = 1
    while true
        msg = take!(req)
        if msg isa Request  
            put!(res, Response(collect(i:i+msg.n-1))); 
            i = msg.n+1
        elseif msg isa Stop 
            break
        end
    end
end

# define the channels
a = Channel()
b = Channel()
# start the server
t = Threads.@spawn serve(a, b)

Then you can do

julia> put!(a, Request(5))
Request(5)

julia> take!(b).y
5-element Vector{Int64}:
 1
 2
 3
 4
 5

julia> put!(a, Request(5))
Request(5)

julia> take!(b).y
5-element Vector{Int64}:
  6
  7
  8
  9
 10

Then you can continue to request/receive data as long as you want and stop the server task at some point.

2 Likes

Thank you @pbayer for your detailed reply.

But the problem is not to repeatedly receive data from a channel. In such a case, as you did in your example, producer/consumer or server/client tasks can be created and bound to channels such that the data flow from server to the client be provided until the server’s task is poison-pilled (with Stop in the example).

But, the problem is exactly what I explained in my MWE. After a channel is closed and the task bound to the channel is done, can we somehow reopen the channel and relaunch the task so that the data flow from the client to server starts again?