I want to do stream processing in a pipe-style in several steps or layers (similar to deep learning or “Pipes and Filters” pattern), like this:
b = f1(a)
c = f2(b)
d = f3(c)
# or simply
d = a |> f1 |> f2 |> f3
But this is so simple only for functions that have one input and one output. What if I want to write more complex steps, like:
b1, b2 = f1(a1, a2, a3)
c1 = f2(a1, b1, b2)
d1, d2 = f3(a2, a3, b1, c1)
Next, functions should be stateful:
f1!(state1, a1, a2, a3)
or just specialize on the state type: run!(f1state, a1, a2, a3)
Also, I want all data (a1
, a2
, b1
, b2
, etc.) to be stored into a FIFO buffer, for 2 purposes:
- functions should have access to
prev_len
of previous input elements. - batch processing, function takes
new_len
of an input and producesout_len
of output in one call.
So, all my data should be buffered with len = prev_len + new_len
, and buffers should be preallocated somewhere:
# a1 = CirculaBuffer{T1}(len) ... and so on
run!(f1state, b1, b2, a1, a2, a3)
run!(f2state, c1, a1, b1, b2)
run!(f3state, d1, d2, a2, a3, b1, c1)
When processing stops, I should be able to reset!
or restart!
it in the same way.
Ideally, it should be composable: I can describe some graph and connect it to another graph as a single processing node with its inputs and outputs.
So, how can I describe such a graph in Julia? Should I use Tasks
and Channels
to do that?