Transducers, paralelism and feedback loop

After watching the JuliaCon talk on Transducers.jl, I started to wonder about multiple cases where iterators are not optimal. One of the instances that seem suited for transducers are the systems with a feedback loop.

Let’s consider an intelligent algorithm (or a person ;D) which by learning the unknown function f(x) in the evaluation would schedule optimal values on which to evaluate the function. The algorithm could also take into account unresolved (not yet evaluated) points to schedule new ones and so enabling parallelism [1].

So far, by learning the Transducers.jl I came up with a code:

using Transducers

l = Learner() # the data for the intelligent algorithm which has ask!=iterate and tell! methods.

jobsch = Channel(TakeWhile(x->npoints(l)<10 && while unresolved(l)>4 sleep(1) end),l)

resch = Channel(Map(x->(x,x^2)),jobsch)

foreach(Map(identity),resch) do input
    @show input
    tell!(l,input)
end

It is unclear to me how the evaluation from jobsch to resch happens. Does Transducers.jl create a separate loop for this transformation? If so, is it threaded? How to execute it parallel instead?

I also wonder if there is a better way to express waiting, which is now that dirty while loop.

[1]: Some time ago, I implemented the model with iterators in TaskMaster.jl

Unfortunately, telling foldl or reduce functions to “what to do next” is something impossible in the transducers. The only possible interaction is reduced which is something like the break in for-loop. It may be possible to extend transducers to implement feedback loops but nailing down a class of possible feedbacks with good compositionality sounds like a very challenging task.

If you can come up with a way to formalize your computation as associative binary operator you can use reduce to parallelize it (especially now that Transducers.jl 0.4 supports effcient early termination based on reduced). Turning a complex computation into an associative operator requires creativity. Guy Steele has many talks about this topic and I highly recommend searching them in YouTube etc.; for example, this Google Tech Talk.

Having said that, my impression is that what you are doing is hard to formulate as an associative operation because it looks like you are “adding elements” to the set to be processed. It may be possible to do what you want by violating all the purity assumptions of transducers and reducing functions (i.e., a transducer or reducing function mutates the Learner object). The Learner object can then implement these (totally undocumented internal) interfaces to control “what to do next.” But it is not only violating the purity assumption but is highly relying on the particular depth-first scheduling nature of Julia’s parallel runtime. So I’m not sure if I should even be mentioning this :slight_smile:

4 Likes

I think I have came up with a vague idea of how could one implement feedback loops in transducers. I am a bit stuck/overwhelmed at writing a minimal working example. Perhaps we could get something through the discussion here.

For a moment, let’s consider a single feedback loop as transducer Loop. In the case I am interested, my feedback Loop needs to be able to store a learner state; it evaluates a function f in forward direction; combines inputs one by one coming in the transducer and from the learner ask! method with op(input,feedback) function. Thus the interface for initializing the Loop transducer would be:

Loop(f,op,x->Learner())

When evaluated on the input it would ask as many points as possible given by ask! method.

For the feedback to be still usefull one might want to limit the number of parallel jobs being executed at a moment. One way to do that would be to limit the number of points executed by checking a learners state in the ordinary while loop. Also, the ask! method could return some “reduced” state to finish the execution when some convergence criteria would have been satisfied.

Is it possible to write such a transducer?

If it would be possible to make such a Loop transducer, then one could combine them in series. By extending interface Loop to accept transducer xf instead of function f one could put loops inside loops. That perhaps could be useful to write a “job scheduler” as one could use the input as a way of telling to which Loop the resources are given.

Sorry, it’s hard to tell how it works without a MEW. For example, I don’t know how

would work; what x->Learner() does?

Just to clarify what I was talking about, I created two demos:

Strategy 1: Passing Learner to Loop

using Transducers
using Transducers: R_, xform, inner, next, complete, @next

struct Learner1
    points::Vector{Int}
end

struct Loop1 <: Transducer
    learner::Learner1
end

function Transducers.next(rf::R_{Loop1}, result, input)
    if isodd(input)
        y = input
    else
        y = input ÷ 2
        push!(xform(rf).learner.points, y)
    end
    return next(inner(rf), result, y)
end

function Transducers.__foldl__(rf, val, learner::Learner1)
    i = 1
    while i <= length(learner.points)
        val = @next(rf, val, learner.points[i])
        i += 1
    end
    return complete(rf, val)
end

learner = Learner1([2, 4, 8])
loop = Loop1(learner)

foreach(loop, learner) do x
    @show x learner.points
end

Output:

x = 1
learner.points = [2, 4, 8, 1]
x = 2
learner.points = [2, 4, 8, 1, 2]
x = 4
learner.points = [2, 4, 8, 1, 2, 4]
x = 1
learner.points = [2, 4, 8, 1, 2, 4]
x = 1
learner.points = [2, 4, 8, 1, 2, 4, 1]
x = 2
learner.points = [2, 4, 8, 1, 2, 4, 1, 2]
x = 1
learner.points = [2, 4, 8, 1, 2, 4, 1, 2]
x = 1
learner.points = [2, 4, 8, 1, 2, 4, 1, 2, 1]
x = 1
learner.points = [2, 4, 8, 1, 2, 4, 1, 2, 1]

Strategy 2: Bounce message back using bottom reducing function

using Transducers
using Transducers: R_, xform, inner, next, complete, @next

struct Learner2
    points::Vector{Int}
end

struct Message
    payload
    next_points::Vector{Int}
end

repack(msg, payload) = Message(payload, msg.next_points)

struct Loop2 <: Transducer
    learner::Learner2
end

function Transducers.next(rf::R_{Loop2}, result, msg)
    x = msg.payload
    next_points = msg.next_points
    if isodd(x)
        y = x
    else
        y = x ÷ 2
        next_points = vcat(next_points, y)
    end
    return next(inner(rf), result, Message(y, next_points))
end

function Transducers.__foldl__(rf, val, learner::Learner2)
    i = 1
    while i <= length(learner.points)
        val = @next(rf, val, Message(learner.points[i], Int[]))
        val :: Message
        append!(learner.points, val.next_points)
        i += 1
    end
    return complete(rf, val)
end

learner = Learner2([2, 4, 8])
loop = Loop2(learner)
xf = loop |> Map() do msg
    @show msg.payload msg.next_points learner.points
    return msg
end

foldl(right, xf, learner)

Output:

msg.payload = 1
msg.next_points = [1]
learner.points = [2, 4, 8]
msg.payload = 2
msg.next_points = [2]
learner.points = [2, 4, 8, 1]
msg.payload = 4
msg.next_points = [4]
learner.points = [2, 4, 8, 1, 2]
msg.payload = 1
msg.next_points = Int64[]
learner.points = [2, 4, 8, 1, 2, 4]
msg.payload = 1
msg.next_points = [1]
learner.points = [2, 4, 8, 1, 2, 4]
msg.payload = 2
msg.next_points = [2]
learner.points = [2, 4, 8, 1, 2, 4, 1]
msg.payload = 1
msg.next_points = Int64[]
learner.points = [2, 4, 8, 1, 2, 4, 1, 2]
msg.payload = 1
msg.next_points = [1]
learner.points = [2, 4, 8, 1, 2, 4, 1, 2]
msg.payload = 1
msg.next_points = Int64[]
learner.points = [2, 4, 8, 1, 2, 4, 1, 2, 1]

Notes

It is important to note that both strategies “break” some of the rule of transducers to implement feedback loop. Strategy 1 breaks that transducers are pure; it mutates Learner object. Strategy 2 breaks that reducible (= Learner2) should be agnostic about the accumulator; it unpacks val. But… is it bad to “break” the rule? I actually don’t know. Maybe it’s more like you can do extra stuff by adding structure to the framework (e.g., accumulator is always of type Message).

2 Likes

It allows the executor of the transducer to decide where to initialise the Learner. Also decouples it from the environment so it could be considered as pure (output depends on input + some stochasticity from computation race).

The transducer implementations look a little bit too cryptic for me. Perhaps I can manage to implement Loop(f,op,x->Learner()) as a closure of the function which one passes to map. I will try to do the example by evening.

Unparalized Loop

In the case where there is no parallel execution, I can write:

function Loop(f,op,learner)
    l = learner()
    function finner(input)
        feedinput = ask!(l)
        x = op(input,feedinput)
        y = f(x)
        tell!(l,(x,y))
        return (x,y)
    end
end

Then I could execute it on the with the map function:

map(Loop(f,op,x->AdaptiveLearner1D((0,1))),1:10) 

where AdaptiveLearner1D could come from the package Adaptive.jl.

Paralized Loop

To generalise the Loop given above I introduce a concept of work distributor Master which captures workers to do the work as it comes in:

mutable struct Master
    tasks
    results
    slaves 
    unresolved
end

"""
Gives the slave a duty to follow orders of his new Master
"""
function captureslave!(pid,f::Function,master::Master)
    tasks, results = master.tasks, master.results
    wp = @spawnat pid begin
        while true
            x = take!(tasks)
            if x==nothing
                break
            else
                y = f(x)
                put!(results,(x,y))
            end
        end
    end
    push!(master.slaves,wp)
end

Master(tasks,results,slaves) = Master(tasks,results,slaves,0)
Master(tasks,results) = Master(tasks,results,[],0)

function Master(f::Function,wpool::AbstractWorkerPool)
    tasks = RemoteChannel(()->Channel{Any}(10))
    results = RemoteChannel(()->Channel{Tuple{Any,Any}}(10))

    master = Master(tasks,results)
    for p in wpool.workers
        captureslave!(p,f,master)
    end

    return master
end

Master(f::Function) = Master(f,WorkerPool(nprocs()==1 ? [1] : workers()))

Then I could write paralel version of the Loop rather simply:

function LoopP(f,op,learner)
    l = learner()
    master = Master(f)
    function finner(input)
        while master.unresolved < length(master.slaves)
            feedinput = ask!(l)
            if feedinput==nothing
                break
            else
                x = op(input,feedinput)
                put!(master.tasks,x)
                master.unresolved += 1
            end
        end

        if master.unresolved==0
            return nothing
        else
            y = take!(master.results)
            tell!(l,(x,y))
            master.unresolved -= 1
            return (x,y)
        end
    end
end

Which could be executed with map in the same manner.

Thus since it is written out as a function on which one maps, it seems that it could also be generalised as a transducer. But is it possible?

EDIT I made few mistakes in LoopP which I have now fixed. The code is however not yet tested.

OK. Thanks for the MWEs. I’m now reasonably certain that the strategies I demonstrated above are the right approach if you want to use transducers. Probably strategy 1 (mutation-based) is easier. It should be fine as long as you don’t need to use it for very small models where “small-size optimization” (e.g., using StaticArrays) becomes important.

But, as you observed, the implementation based on reducibles and transducers is rather “cryptic.” It may be the indication that the combination of reducibles and transducers is not the right tool for your problem. Having said that, it may be useful if you want to fuse pre- and post- processings with learning. It would be great if you can explore this area because I’ve never tried. But I don’t want to blindly encourage this direction as there is a chance that it may not be as fruitful as you would have hoped.

2 Likes

I just finalised the design of the TaskMaster.jl package which takes inputs and puts outputs in the channels. I thought that it would be trivial to couple them also with transducers, but it just halts. The relevant code:

using Distributed
addprocs(2)
using TaskMaster
using Transducers

@everywhere f(x) = x^2

master = ProcMaster(f)
learner = IgnorantLearner(1:10)
loop = Loop(master,learner)

@sync begin
    inch = Channel(Map(identity),1:10)
    outch = Channel(1)
    @async evaluate(loop,inch,outch)

    for o in outch
        @show o
    end
    ### Gets stuck at the first element
    collect(Map(identity),outch)
end

To see how I use evaluate(loop,inch,outch) see evaluate.jl file on the GitHub.

It would also be aesthetically pleasing if I could couple Loop with transducers with |>. So, for example, I could do:

collect(Map(identity) |> Loop(master,learner) |> Map(identity),1:10)

Also, it seems easily possible to initiate Master with transducer instead of a function with channels. Not sure how that would be useful. Maybe it would allow modelling electrical circuits.

Haven’t read this whole thread as it’s bit long. However, it seems this issue is relevant regarding waiting on channel results.

https://github.com/JuliaLang/julia/issues/13763

The clojure equivalent to select is alts! which is transducer friendly.

    for o in outch
        @show o
    end
    ### Gets stuck at the first element
    collect(Map(identity),outch)

As you used outch in the for loop, outch is closed by the time it is passed to collect. Although it should not block. I’d try:

  • Use @show collect(Map(identity),outch) and see if it is the end of @sync that is blocking (which would imply there are some quirks in evaluate).
  • Use collect(itr) instead of collect(Map(identity), itr) and see if it is due to a bug in Transducers.jl

Note that Map(identity) |> xf |> Map(identity) is equivalent to xf because Map(identity) does nothing (Map(identity) is the identity transducer).

Map(identity) is just one of the transducers. I meant xf1 |> Loop(master,learner) |> xf2.

1 Like

It makes sense. I added that for loop subconsciously for debugging purposes. It just halts.

Ok. So I found that the bug is on the my side. A following code also halts:

inch = Channel(4)
put!(inch,3)
put!(inch,5)
put!(inch,9)
put!(inch,0)
#put!(inch,nothing)

outch = Channel(10)

@sync begin
    @async evaluate(loop,inch,outch)

    for o in outch
        @show o
    end
end

Putting nothing as the last element closes the channel in the evaluate. Perhaps I need only to add conditional to test if inch is closed instead of value of the last element.

And that indeed fixed the issue. Now I can couple Transducers with TaskMaster simply:

@everywhere f(x) = x^2

master = ProcMaster(f)
learner = IgnorantLearner(1:10)
loop = Loop(master,learner)

inch = Channel(Map(identity),1:10)
outch = Channel(10)

@sync begin
    @async evaluate(loop,inch,outch)
    collect(Map(identity),outch)
end

Would be nice to have a |> syntax for that though. How have you implemented |> syntax in transducers?

|> for Transducers means composition (I probably should have used ). So, if you want to sandwich a Loop with transducers, you need to turn Loop into a transducer. I think this would require one of the strategies I outlined above.

Note that it is quite easy to turn evaluate in an iterator transform:

evaluating(loop) = inch -> Channel() do outch
    evaluate(loop, inch, outch)
end

outch = evaluating(loop)(inch)
collect(outch)
# or equivalently
inch |> evaluating(loop) |> collect

where |> here is the standard x |> f == f(x).

1 Like

Cool! Iterator transform allowed me to reduce the code by 50 lines and improved readability a lot :slight_smile: Seems then that iterator transforms are the way to go if one wants to couple TaskMaster.jl with Transducers.jl. The original problem thus is solved.

1 Like

Note that iterator transforms are not transducers (they are related, though). But iterator transforms are also powerful tool for this kind of task. For example, Query.jl provides a nice interface for creating and composing iterator transforms. So, if you expose the API as an iterator transform, your package will interop nicely with with Queryverse ecosystem.

2 Likes