Use of Channels (or similar) constructs in parallel processing

Hi,

I have used Julia Channels in real-time processing of data in a pipeline fashion. Each Channel passes data to the next processing step. A simplified example is the code below, one channel for generating random numbers and another to generate the averages of these numbers. This 2nd Channel is consumer for the 1st Channel data but also a producer for the following stage. I hope the code gives the essence of what I am after.

function randomsrc()
    function cr(ch)
        while true
            put!(ch, randn())
        end
    end
    return Channel(cr, ctype=Float64, csize=1024)
end

function avsrc(c)
    sum = 0.0
    count = 0
    function av(ch)
        for x in c
            sum += x
            count += 1
            put!(ch, sum/count)
        end
    end
    return Channel(av, ctype=Float64, csize=1024)
end

randch = randomsrc()
avch = avsrc(randch)
for (i, a) in enumerate(avch)
    if i % 100000 == 0
        @show(i, a)
    end
end

This works fine but I have performance issues when the data rate increases and/or when some of the intermediate stages start to do more with the incoming data, e.g. plotting it.

It seems from what I understood in the documentation, Channels are single threaded. Is there a parallel construct that helps distribute the processing to different threads on different cores to achieve higher throughput? I like the ease of use of the Channels and how intuitive using them for a pipeline-like problems, or chaining producers/consumers. Any pointers to similar constructs that can help me?

Thanks!

I think what you are looking for is RemoteChannel. Here is the part of the documentation that compares Channels and RemoteChannels https://docs.julialang.org/en/v1/manual/parallel-computing/#Channels-and-RemoteChannels-1. You should also take a look at the consumer-producer example in that section if you haven’t done yet.

I’ll look into the RemoteChannel as an alternative. I should update how it goes.
Thanks!

I modified the original code based on the example code in the documentation of RemoteChannel to this:

@everywhere function randomsrc(ch)
    while true
        put!(ch, randn())
    end
end

@everywhere function avsrc(chin, chout)
    sum = 0.0
    count = 0
    while isopen(chin)
        x = take!(chin)
        sum += x
        count += 1
        put!(chout, sum/count)
    end
end

const ch1 = RemoteChannel(()->Channel{Float64}(1024));
const ch2 = RemoteChannel(()->Channel{Float64}(1024));

remote_do(randomsrc, 4, ch1)
remote_do(avsrc, 5, ch1, ch2)

i = 0
i::Int
while isopen(ch2)
   global i
   i += 1
   a = take!(ch2)
   if i % 100000 == 0
       @show(i, a)
   end
end

The performance is orders of magnitude slower than the original Channel code. I understand there must be overhead for multi-process communication but the code is really very slow. Is there anything I am doing wrong in the above code? I don’t have multiple readers or multiple writers, can I do anything to avoid any relevant overhead for the multiple reader/writer general case?

I am not sure if this has been available in 2018, but for anyone asking this question as of Julia 1.6.3-1.7.0 version. Channel has a special constructor that takes first argument as a function parametrized with a channel, this way it will create a task for you. It is equivalent to create a task either via @async macro or with a Task() constructor. However as you noted tasks by default run in single thread that created them.

Channel constructor has an additional parameter spawn=true which is equivalent to using Threads.@spawn for each task. For example

function producer(c::Channel)
    put!(c, 0)
    for n in 1:10
        put!(c, n)
    end
    put!(c,11)
end
chn1 = Channel(producer, 8, spawn=true)
for n=chn1
    println(n)
end

Above code would create task but spawning it’s own thread (because of spawn=true), note by default tasks run within single thread. As a bonus because task is bound to a channel (equivalent of bind(c,task);) when task finishes, it also closes the channel.

1 Like