Is it possible to relaunch a task bound to a channel so that the data can be recollected from the channel?. Here is a MWE:
julia> ch = Channel(0) do ch
for i in 1 : 5
put!(ch, i)
end
end
Channel{Any}(0) (1 item available)
julia> collect(ch)
5-element Vector{Any}:
1
2
3
4
5
julia> collect(ch)
Any[]
To do this, I tried to re-initialize the task bound to the channel, but I got an exception saying that the channel is closed.
ulia> ch = Channel(0) do ch
for i in 1 : 5
put!(ch, i)
end
end
Channel{Any}(0) (1 item available)
julia> f = ch.cond_put.waitq.head.code
#517 (generic function with 1 method)
julia> collect(ch)
5-element Vector{Any}:
1
2
3
4
5
julia> collect(ch)
Any[]
julia> @async f()
Task (failed) @0x00007f47aaa60160
InvalidStateException("Channel is closed.", :closed)
Stacktrace:
[1] check_channel_state
@ ./channels.jl:170 [inlined]
[2] put!
@ ./channels.jl:314 [inlined]
[3] (::var"#13#14")(ch::Channel{Any})
@ Main ./REPL[5]:3
[4] (::Base.var"#517#518"{var"#13#14", Channel{Any}})()
@ Base ./channels.jl:132
[5] (::var"#15#16")()
@ Main ./task.jl:406
There are almost two problems with that approach:
- after the
ch = Channel(0) do ch for i in 1 : 5 ...
loop the task has finished and
- with the
collect(ch)
the channel is closed.
If you want repeatedly receive data from a channel, you need
- a persistent server task and
- for the client/server interaction you need some convention, for example
- a messaging pattern or
- if the server delivers continuously on the client side some
collect_data
function, where you can specify the number of items to collect.
A messaging pattern could follow the following lines:
# define the messages
struct Request
n::Int
end
struct Response{T}
y::T
end
struct Stop end
# define the server behavior
function serve(req, res)
i = 1
while true
msg = take!(req)
if msg isa Request
put!(res, Response(collect(i:i+msg.n-1)));
i = msg.n+1
elseif msg isa Stop
break
end
end
end
# define the channels
a = Channel()
b = Channel()
# start the server
t = Threads.@spawn serve(a, b)
Then you can do
julia> put!(a, Request(5))
Request(5)
julia> take!(b).y
5-element Vector{Int64}:
1
2
3
4
5
julia> put!(a, Request(5))
Request(5)
julia> take!(b).y
5-element Vector{Int64}:
6
7
8
9
10
Then you can continue to request/receive data as long as you want and stop the server task at some point.
2 Likes
Thank you @pbayer for your detailed reply.
But the problem is not to repeatedly receive data from a channel. In such a case, as you did in your example, producer/consumer or server/client tasks can be created and bound to channels such that the data flow from server to the client be provided until the server’s task is poison-pilled (with Stop in the example).
But, the problem is exactly what I explained in my MWE. After a channel is closed and the task bound to the channel is done, can we somehow reopen the channel and relaunch the task so that the data flow from the client to server starts again?