RemoteChannel allows more inputs than I defined

Hellos

I am creating a pipeline with RemoteChannel (if it does not work, I’ll create other post asking for help again), and I noticed that I can add more data than I specified.
I don’t know if I am just confused with the syntax and misunderstood how to defined the size of the channel, or it is actually a bug. Here the code:

# begin with a simple channel to test that my pc is working 
c1 = Channel(5)

put!(c1, 1)
put!(c1, 2)
put!(c1, 3)
put!(c1, 4)
put!(c1, 5)
# the next line does not work as expected
put!(c1, 6)
# if I did not run the previous line, the next line works
values = [take!(c1) for i=1:5]

using Distributed
addprocs(4)
const r_c1 = RemoteChannel(()->Channel{Int}(5));
# const r_c1 = RemoteChannel(()->Channel(1), 5); ## ? maybe this way

@spawnat 2 begin put!(r_c1, 1) end;
@spawnat 3 begin put!(r_c1, 2) end;
@spawnat 2 begin put!(r_c1, 3) end;
@spawnat 3 begin put!(r_c1, 4) end;
@spawnat 2 begin put!(r_c1, 5) end;
# somehow I can put values in the Channel
@spawnat 3 begin put!(r_c1, 6) end;
# if I DO run the previous line, the following line also works
values = [take!(r_c1) for i=1:6]

I am open to any explanation

1 Like

In the first part of your example put!(c1, 6) blocks the user process since everything is done sequentially.

In the 2nd part with

@spawnat 3 begin put!(r_c1, 6) end;

you start a closure asynchronously on process 3, which blocks immediately (but in the background) since the channel is full until you begin take!ing the values out in the next line.

The difference is only sequentially vs asynchronously.

:thinking:Just to be sure that I understood:

  • When I run @spawnat 3 begin put!(r_c1, 6) end;, the julia process 3 indeed is blocked and waiting for r_c1 to be free.
  • Because I made a @spawat, I do not see this happening.
  • The line values = [take!(r_c1) for i=1:6] works because when I made the first take!, automatically the r_c1 had a free space, and julia process 3 put its new value

am I right ?

yes!

Consider the following MWE:

The following blocks (freezes) the user process at step 6 of the for loop:

ch = Channel(5)
for i in 1:6
    put!(ch, i)
end
values = [take!(ch) for _ in 1:6]

the following does not block the user process but the @async task:

ch = Channel(5)
@async for i in 1:6
    put!(ch, i)
end
values = [take!(ch) for _ in 1:6]

When you start to take! out the values, the task continues and delivers the 6th value asynchronously.

2 Likes