Use of Channels (or similar) constusts in parallel processing


#1

Hi,

I have used Julia Channels in real-time processing of data in a pipeline fashion. Each Channel passes data to the next processing step. A simplified example is the code below, one channel for generating random numbers and another to generate the averages of these numbers. This 2nd Channel is consumer for the 1st Channel data but also a producer for the following stage. I hope the code gives the essence of what I am after.

function randomsrc()
    function cr(ch)
        while true
            put!(ch, randn())
        end
    end
    return Channel(cr, ctype=Float64, csize=1024)
end

function avsrc(c)
    sum = 0.0
    count = 0
    function av(ch)
        for x in c
            sum += x
            count += 1
            put!(ch, sum/count)
        end
    end
    return Channel(av, ctype=Float64, csize=1024)
end

randch = randomsrc()
avch = avsrc(randch)
for (i, a) in enumerate(avch)
    if i % 100000 == 0
        @show(i, a)
    end
end

This works fine but I have performance issues when the data rate increases and/or when some of the intermediate stages start to do more with the incoming data, e.g. plotting it.

It seems from what I understood in the documentation, Channels are single threaded. Is there a parallel construct that helps distribute the processing to different threads on different cores to achieve higher throughput? I like the ease of use of the Channels and how intuitive using them for a pipeline-like problems, or chaining producers/consumers. Any pointers to similar constructs that can help me?

Thanks!


#2

I think what you are looking for is RemoteChannel. Here is the part of the documentation that compares Channels and RemoteChannels https://docs.julialang.org/en/v1/manual/parallel-computing/#Channels-and-RemoteChannels-1. You should also take a look at the consumer-producer example in that section if you haven’t done yet.


#4

I’ll look into the RemoteChannel as an alternative. I should update how it goes.
Thanks!


#5

I modified the original code based on the example code in the documentation of RemoteChannel to this:

@everywhere function randomsrc(ch)
    while true
        put!(ch, randn())
    end
end

@everywhere function avsrc(chin, chout)
    sum = 0.0
    count = 0
    while isopen(chin)
        x = take!(chin)
        sum += x
        count += 1
        put!(chout, sum/count)
    end
end

const ch1 = RemoteChannel(()->Channel{Float64}(1024));
const ch2 = RemoteChannel(()->Channel{Float64}(1024));

remote_do(randomsrc, 4, ch1)
remote_do(avsrc, 5, ch1, ch2)

i = 0
i::Int
while isopen(ch2)
   global i
   i += 1
   a = take!(ch2)
   if i % 100000 == 0
       @show(i, a)
   end
end

The performance is orders of magnitude slower than the original Channel code. I understand there must be overhead for multi-process communication but the code is really very slow. Is there anything I am doing wrong in the above code? I don’t have multiple readers or multiple writers, can I do anything to avoid any relevant overhead for the multiple reader/writer general case?