Stream processing in Julia?

I want to do stream processing in a pipe-style in several steps or layers (similar to deep learning or “Pipes and Filters” pattern), like this:

b = f1(a)
c = f2(b)
d = f3(c)
# or simply
d = a |> f1 |> f2 |> f3

But this is so simple only for functions that have one input and one output. What if I want to write more complex steps, like:

b1, b2 = f1(a1, a2, a3)
c1 = f2(a1, b1, b2)
d1, d2 = f3(a2, a3, b1, c1)

Next, functions should be stateful:

f1!(state1, a1, a2, a3) or just specialize on the state type: run!(f1state, a1, a2, a3)

Also, I want all data (a1, a2, b1, b2, etc.) to be stored into a FIFO buffer, for 2 purposes:

  1. functions should have access to prev_len of previous input elements.
  2. batch processing, function takes new_len of an input and produces out_len of output in one call.

So, all my data should be buffered with len = prev_len + new_len, and buffers should be preallocated somewhere:

# a1 = CirculaBuffer{T1}(len) ... and so on
run!(f1state, b1, b2, a1, a2, a3)
run!(f2state, c1, a1, b1, b2)
run!(f3state, d1, d2, a2, a3, b1, c1)

When processing stops, I should be able to reset! or restart! it in the same way.

Ideally, it should be composable: I can describe some graph and connect it to another graph as a single processing node with its inputs and outputs.

So, how can I describe such a graph in Julia? Should I use Tasks and Channels to do that?

1 Like

I’m fairly new to Julia so I in no way claim to be an authority, but chaining only works with 1 parameter. First I was thinking you could use a Tuple for the results of the function and the input to the next. However since it appears each function needs most of the data, you might be better to create an struct that is passed along like:

struct Data
    a1
    a2
    a3
    b1
    b2
    c1
    d1
    d1
end

Function f1 would take in the Data object, populate b1 and b2, and return the structure. f2 would then populate c1 and return the structure, and on and on.

As for the state the recommended way would be to do something like:

a :| v->f1(f1state, v) :| v->f2(f2state, v) |> v->f3(f3state, v)

Which allows you to pass multiple values to your function. I’m not sure about your FIFO stuff, didn’t quite understand what you meant.