Transducers, paralelism and feedback loop

After watching the JuliaCon talk on Transducers.jl, I started to wonder about multiple cases where iterators are not optimal. One of the instances that seem suited for transducers are the systems with a feedback loop.

Let’s consider an intelligent algorithm (or a person ;D) which by learning the unknown function `f(x)` in the evaluation would schedule optimal values on which to evaluate the function. The algorithm could also take into account unresolved (not yet evaluated) points to schedule new ones and so enabling parallelism [1].

So far, by learning the Transducers.jl I came up with a code:

``````using Transducers

l = Learner() # the data for the intelligent algorithm which has ask!=iterate and tell! methods.

jobsch = Channel(TakeWhile(x->npoints(l)<10 && while unresolved(l)>4 sleep(1) end),l)

resch = Channel(Map(x->(x,x^2)),jobsch)

foreach(Map(identity),resch) do input
@show input
tell!(l,input)
end
``````

It is unclear to me how the evaluation from `jobsch` to `resch` happens. Does `Transducers.jl` create a separate loop for this transformation? If so, is it threaded? How to execute it parallel instead?

I also wonder if there is a better way to express waiting, which is now that dirty while loop.

[1]: Some time ago, I implemented the model with iterators in TaskMaster.jl

Unfortunately, telling `foldl` or `reduce` functions to “what to do next” is something impossible in the transducers. The only possible interaction is `reduced` which is something like the `break` in `for`-loop. It may be possible to extend transducers to implement feedback loops but nailing down a class of possible feedbacks with good compositionality sounds like a very challenging task.

If you can come up with a way to formalize your computation as associative binary operator you can use `reduce` to parallelize it (especially now that Transducers.jl 0.4 supports effcient early termination based on `reduced`). Turning a complex computation into an associative operator requires creativity. Guy Steele has many talks about this topic and I highly recommend searching them in YouTube etc.; for example, this Google Tech Talk.

Having said that, my impression is that what you are doing is hard to formulate as an associative operation because it looks like you are “adding elements” to the set to be processed. It may be possible to do what you want by violating all the purity assumptions of transducers and reducing functions (i.e., a transducer or reducing function mutates the `Learner` object). The `Learner` object can then implement these (totally undocumented internal) interfaces to control “what to do next.” But it is not only violating the purity assumption but is highly relying on the particular depth-first scheduling nature of Julia’s parallel runtime. So I’m not sure if I should even be mentioning this

4 Likes

I think I have came up with a vague idea of how could one implement feedback loops in transducers. I am a bit stuck/overwhelmed at writing a minimal working example. Perhaps we could get something through the discussion here.

For a moment, let’s consider a single feedback loop as transducer `Loop`. In the case I am interested, my feedback `Loop` needs to be able to store a learner state; it evaluates a function `f` in forward direction; combines inputs one by one coming in the transducer and from the learner `ask!` method with `op(input,feedback)` function. Thus the interface for initializing the Loop transducer would be:

``````Loop(f,op,x->Learner())
``````

When evaluated on the input it would ask as many points as possible given by `ask!` method.

For the feedback to be still usefull one might want to limit the number of parallel jobs being executed at a moment. One way to do that would be to limit the number of points executed by checking a learners state in the ordinary while loop. Also, the `ask!` method could return some “reduced” state to finish the execution when some convergence criteria would have been satisfied.

Is it possible to write such a transducer?

If it would be possible to make such a `Loop` transducer, then one could combine them in series. By extending interface Loop to accept transducer `xf` instead of function `f` one could put loops inside loops. That perhaps could be useful to write a “job scheduler” as one could use the input as a way of telling to which Loop the resources are given.

Sorry, it’s hard to tell how it works without a MEW. For example, I don’t know how

would work; what `x->Learner()` does?

Just to clarify what I was talking about, I created two demos:

Strategy 1: Passing `Learner` to `Loop`

``````using Transducers
using Transducers: R_, xform, inner, next, complete, @next

struct Learner1
points::Vector{Int}
end

struct Loop1 <: Transducer
learner::Learner1
end

function Transducers.next(rf::R_{Loop1}, result, input)
if isodd(input)
y = input
else
y = input ÷ 2
push!(xform(rf).learner.points, y)
end
return next(inner(rf), result, y)
end

function Transducers.__foldl__(rf, val, learner::Learner1)
i = 1
while i <= length(learner.points)
val = @next(rf, val, learner.points[i])
i += 1
end
return complete(rf, val)
end

learner = Learner1([2, 4, 8])
loop = Loop1(learner)

foreach(loop, learner) do x
@show x learner.points
end
``````

Output:

``````x = 1
learner.points = [2, 4, 8, 1]
x = 2
learner.points = [2, 4, 8, 1, 2]
x = 4
learner.points = [2, 4, 8, 1, 2, 4]
x = 1
learner.points = [2, 4, 8, 1, 2, 4]
x = 1
learner.points = [2, 4, 8, 1, 2, 4, 1]
x = 2
learner.points = [2, 4, 8, 1, 2, 4, 1, 2]
x = 1
learner.points = [2, 4, 8, 1, 2, 4, 1, 2]
x = 1
learner.points = [2, 4, 8, 1, 2, 4, 1, 2, 1]
x = 1
learner.points = [2, 4, 8, 1, 2, 4, 1, 2, 1]
``````

Strategy 2: Bounce message back using bottom reducing function

``````using Transducers
using Transducers: R_, xform, inner, next, complete, @next

struct Learner2
points::Vector{Int}
end

struct Message
next_points::Vector{Int}
end

struct Loop2 <: Transducer
learner::Learner2
end

function Transducers.next(rf::R_{Loop2}, result, msg)
next_points = msg.next_points
if isodd(x)
y = x
else
y = x ÷ 2
next_points = vcat(next_points, y)
end
return next(inner(rf), result, Message(y, next_points))
end

function Transducers.__foldl__(rf, val, learner::Learner2)
i = 1
while i <= length(learner.points)
val = @next(rf, val, Message(learner.points[i], Int[]))
val :: Message
append!(learner.points, val.next_points)
i += 1
end
return complete(rf, val)
end

learner = Learner2([2, 4, 8])
loop = Loop2(learner)
xf = loop |> Map() do msg
return msg
end

foldl(right, xf, learner)
``````

Output:

``````msg.payload = 1
msg.next_points = [1]
learner.points = [2, 4, 8]
msg.next_points = [2]
learner.points = [2, 4, 8, 1]
msg.next_points = [4]
learner.points = [2, 4, 8, 1, 2]
msg.next_points = Int64[]
learner.points = [2, 4, 8, 1, 2, 4]
msg.next_points = [1]
learner.points = [2, 4, 8, 1, 2, 4]
msg.next_points = [2]
learner.points = [2, 4, 8, 1, 2, 4, 1]
msg.next_points = Int64[]
learner.points = [2, 4, 8, 1, 2, 4, 1, 2]
msg.next_points = [1]
learner.points = [2, 4, 8, 1, 2, 4, 1, 2]
msg.next_points = Int64[]
learner.points = [2, 4, 8, 1, 2, 4, 1, 2, 1]
``````

Notes

It is important to note that both strategies “break” some of the rule of transducers to implement feedback loop. Strategy 1 breaks that transducers are pure; it mutates `Learner` object. Strategy 2 breaks that reducible (= `Learner2`) should be agnostic about the accumulator; it unpacks `val`. But… is it bad to “break” the rule? I actually don’t know. Maybe it’s more like you can do extra stuff by adding structure to the framework (e.g., accumulator is always of type `Message`).

2 Likes

It allows the executor of the transducer to decide where to initialise the Learner. Also decouples it from the environment so it could be considered as pure (output depends on input + some stochasticity from computation race).

The transducer implementations look a little bit too cryptic for me. Perhaps I can manage to implement `Loop(f,op,x->Learner())` as a closure of the function which one passes to map. I will try to do the example by evening.

Unparalized Loop

In the case where there is no parallel execution, I can write:

``````function Loop(f,op,learner)
l = learner()
function finner(input)
x = op(input,feedinput)
y = f(x)
tell!(l,(x,y))
return (x,y)
end
end
``````

Then I could execute it on the with the `map` function:

``````map(Loop(f,op,x->AdaptiveLearner1D((0,1))),1:10)
``````

where `AdaptiveLearner1D` could come from the package Adaptive.jl.

Paralized Loop

To generalise the `Loop` given above I introduce a concept of work distributor `Master` which captures workers to do the work as it comes in:

``````mutable struct Master
results
slaves
unresolved
end

"""
Gives the slave a duty to follow orders of his new Master
"""
function captureslave!(pid,f::Function,master::Master)
wp = @spawnat pid begin
while true
if x==nothing
break
else
y = f(x)
put!(results,(x,y))
end
end
end
push!(master.slaves,wp)
end

function Master(f::Function,wpool::AbstractWorkerPool)
results = RemoteChannel(()->Channel{Tuple{Any,Any}}(10))

for p in wpool.workers
captureslave!(p,f,master)
end

return master
end

Master(f::Function) = Master(f,WorkerPool(nprocs()==1 ? [1] : workers()))
``````

Then I could write paralel version of the `Loop` rather simply:

``````function LoopP(f,op,learner)
l = learner()
master = Master(f)
function finner(input)
while master.unresolved < length(master.slaves)
if feedinput==nothing
break
else
x = op(input,feedinput)
master.unresolved += 1
end
end

if master.unresolved==0
return nothing
else
y = take!(master.results)
tell!(l,(x,y))
master.unresolved -= 1
return (x,y)
end
end
end
``````

Which could be executed with `map` in the same manner.

Thus since it is written out as a function on which one maps, it seems that it could also be generalised as a transducer. But is it possible?

EDIT I made few mistakes in `LoopP` which I have now fixed. The code is however not yet tested.

OK. Thanks for the MWEs. I’m now reasonably certain that the strategies I demonstrated above are the right approach if you want to use transducers. Probably strategy 1 (mutation-based) is easier. It should be fine as long as you don’t need to use it for very small models where “small-size optimization” (e.g., using StaticArrays) becomes important.

But, as you observed, the implementation based on reducibles and transducers is rather “cryptic.” It may be the indication that the combination of reducibles and transducers is not the right tool for your problem. Having said that, it may be useful if you want to fuse pre- and post- processings with learning. It would be great if you can explore this area because I’ve never tried. But I don’t want to blindly encourage this direction as there is a chance that it may not be as fruitful as you would have hoped.

2 Likes

I just finalised the design of the TaskMaster.jl package which takes inputs and puts outputs in the channels. I thought that it would be trivial to couple them also with transducers, but it just halts. The relevant code:

``````using Distributed
using Transducers

@everywhere f(x) = x^2

master = ProcMaster(f)
learner = IgnorantLearner(1:10)
loop = Loop(master,learner)

@sync begin
inch = Channel(Map(identity),1:10)
outch = Channel(1)
@async evaluate(loop,inch,outch)

for o in outch
@show o
end
### Gets stuck at the first element
collect(Map(identity),outch)
end
``````

To see how I use `evaluate(loop,inch,outch)` see evaluate.jl file on the GitHub.

It would also be aesthetically pleasing if I could couple `Loop` with transducers with `|>`. So, for example, I could do:

``````collect(Map(identity) |> Loop(master,learner) |> Map(identity),1:10)
``````

Also, it seems easily possible to initiate `Master` with transducer instead of a function with channels. Not sure how that would be useful. Maybe it would allow modelling electrical circuits.

Haven’t read this whole thread as it’s bit long. However, it seems this issue is relevant regarding waiting on channel results.

The clojure equivalent to select is `alts!` which is transducer friendly.

``````    for o in outch
@show o
end
### Gets stuck at the first element
collect(Map(identity),outch)
``````

As you used `outch` in the `for` loop, `outch` is closed by the time it is passed to `collect`. Although it should not block. I’d try:

• Use `@show collect(Map(identity),outch)` and see if it is the `end` of `@sync` that is blocking (which would imply there are some quirks in evaluate).
• Use `collect(itr)` instead of `collect(Map(identity), itr)` and see if it is due to a bug in Transducers.jl

Note that `Map(identity) |> xf |> Map(identity)` is equivalent to `xf` because `Map(identity)` does nothing (`Map(identity)` is the identity transducer).

`Map(identity)` is just one of the transducers. I meant `xf1 |> Loop(master,learner) |> xf2`.

1 Like

It makes sense. I added that for loop subconsciously for debugging purposes. It just halts.

Ok. So I found that the bug is on the my side. A following code also halts:

``````inch = Channel(4)
put!(inch,3)
put!(inch,5)
put!(inch,9)
put!(inch,0)
#put!(inch,nothing)

outch = Channel(10)

@sync begin
@async evaluate(loop,inch,outch)

for o in outch
@show o
end
end
``````

Putting `nothing` as the last element closes the channel in the evaluate. Perhaps I need only to add conditional to test if inch is closed instead of value of the last element.

And that indeed fixed the issue. Now I can couple Transducers with TaskMaster simply:

``````@everywhere f(x) = x^2

master = ProcMaster(f)
learner = IgnorantLearner(1:10)
loop = Loop(master,learner)

inch = Channel(Map(identity),1:10)
outch = Channel(10)

@sync begin
@async evaluate(loop,inch,outch)
collect(Map(identity),outch)
end
``````

Would be nice to have a `|>` syntax for that though. How have you implemented `|>` syntax in transducers?

`|>` for `Transducers` means composition (I probably should have used `∘`). So, if you want to sandwich a `Loop` with transducers, you need to turn `Loop` into a transducer. I think this would require one of the strategies I outlined above.

Note that it is quite easy to turn `evaluate` in an iterator transform:

``````evaluating(loop) = inch -> Channel() do outch
evaluate(loop, inch, outch)
end

outch = evaluating(loop)(inch)
collect(outch)
# or equivalently
inch |> evaluating(loop) |> collect
``````

where `|>` here is the standard `x |> f == f(x)`.

1 Like

Cool! Iterator transform allowed me to reduce the code by 50 lines and improved readability a lot Seems then that iterator transforms are the way to go if one wants to couple TaskMaster.jl with Transducers.jl. The original problem thus is solved.

1 Like

Note that iterator transforms are not transducers (they are related, though). But iterator transforms are also powerful tool for this kind of task. For example, Query.jl provides a nice interface for creating and composing iterator transforms. So, if you expose the API as an iterator transform, your package will interop nicely with with Queryverse ecosystem.

2 Likes