After watching the JuliaCon talk on Transducers.jl, I started to wonder about multiple cases where iterators are not optimal. One of the instances that seem suited for transducers are the systems with a feedback loop.
Let’s consider an intelligent algorithm (or a person ;D) which by learning the unknown function f(x) in the evaluation would schedule optimal values on which to evaluate the function. The algorithm could also take into account unresolved (not yet evaluated) points to schedule new ones and so enabling parallelism [1].
So far, by learning the Transducers.jl I came up with a code:
using Transducers
l = Learner() # the data for the intelligent algorithm which has ask!=iterate and tell! methods.
jobsch = Channel(TakeWhile(x->npoints(l)<10 && while unresolved(l)>4 sleep(1) end),l)
resch = Channel(Map(x->(x,x^2)),jobsch)
foreach(Map(identity),resch) do input
@show input
tell!(l,input)
end
It is unclear to me how the evaluation from jobsch to resch happens. Does Transducers.jl create a separate loop for this transformation? If so, is it threaded? How to execute it parallel instead?
I also wonder if there is a better way to express waiting, which is now that dirty while loop.
[1]: Some time ago, I implemented the model with iterators in TaskMaster.jl
Unfortunately, telling foldl or reduce functions to “what to do next” is something impossible in the transducers. The only possible interaction is reduced which is something like the break in for-loop. It may be possible to extend transducers to implement feedback loops but nailing down a class of possible feedbacks with good compositionality sounds like a very challenging task.
If you can come up with a way to formalize your computation as associative binary operator you can use reduce to parallelize it (especially now that Transducers.jl 0.4 supports effcient early termination based on reduced). Turning a complex computation into an associative operator requires creativity. Guy Steele has many talks about this topic and I highly recommend searching them in YouTube etc.; for example, this Google Tech Talk.
Having said that, my impression is that what you are doing is hard to formulate as an associative operation because it looks like you are “adding elements” to the set to be processed. It may be possible to do what you want by violating all the purity assumptions of transducers and reducing functions (i.e., a transducer or reducing function mutates the Learner object). The Learner object can then implement these (totally undocumented internal) interfaces to control “what to do next.” But it is not only violating the purity assumption but is highly relying on the particular depth-first scheduling nature of Julia’s parallel runtime. So I’m not sure if I should even be mentioning this
I think I have came up with a vague idea of how could one implement feedback loops in transducers. I am a bit stuck/overwhelmed at writing a minimal working example. Perhaps we could get something through the discussion here.
For a moment, let’s consider a single feedback loop as transducer Loop. In the case I am interested, my feedback Loop needs to be able to store a learner state; it evaluates a function f in forward direction; combines inputs one by one coming in the transducer and from the learner ask! method with op(input,feedback) function. Thus the interface for initializing the Loop transducer would be:
Loop(f,op,x->Learner())
When evaluated on the input it would ask as many points as possible given by ask! method.
For the feedback to be still usefull one might want to limit the number of parallel jobs being executed at a moment. One way to do that would be to limit the number of points executed by checking a learners state in the ordinary while loop. Also, the ask! method could return some “reduced” state to finish the execution when some convergence criteria would have been satisfied.
Is it possible to write such a transducer?
If it would be possible to make such a Loop transducer, then one could combine them in series. By extending interface Loop to accept transducer xf instead of function f one could put loops inside loops. That perhaps could be useful to write a “job scheduler” as one could use the input as a way of telling to which Loop the resources are given.
Sorry, it’s hard to tell how it works without a MEW. For example, I don’t know how
would work; what x->Learner() does?
Just to clarify what I was talking about, I created two demos:
Strategy 1: Passing Learner to Loop
using Transducers
using Transducers: R_, xform, inner, next, complete, @next
struct Learner1
points::Vector{Int}
end
struct Loop1 <: Transducer
learner::Learner1
end
function Transducers.next(rf::R_{Loop1}, result, input)
if isodd(input)
y = input
else
y = input ÷ 2
push!(xform(rf).learner.points, y)
end
return next(inner(rf), result, y)
end
function Transducers.__foldl__(rf, val, learner::Learner1)
i = 1
while i <= length(learner.points)
val = @next(rf, val, learner.points[i])
i += 1
end
return complete(rf, val)
end
learner = Learner1([2, 4, 8])
loop = Loop1(learner)
foreach(loop, learner) do x
@show x learner.points
end
Strategy 2: Bounce message back using bottom reducing function
using Transducers
using Transducers: R_, xform, inner, next, complete, @next
struct Learner2
points::Vector{Int}
end
struct Message
payload
next_points::Vector{Int}
end
repack(msg, payload) = Message(payload, msg.next_points)
struct Loop2 <: Transducer
learner::Learner2
end
function Transducers.next(rf::R_{Loop2}, result, msg)
x = msg.payload
next_points = msg.next_points
if isodd(x)
y = x
else
y = x ÷ 2
next_points = vcat(next_points, y)
end
return next(inner(rf), result, Message(y, next_points))
end
function Transducers.__foldl__(rf, val, learner::Learner2)
i = 1
while i <= length(learner.points)
val = @next(rf, val, Message(learner.points[i], Int[]))
val :: Message
append!(learner.points, val.next_points)
i += 1
end
return complete(rf, val)
end
learner = Learner2([2, 4, 8])
loop = Loop2(learner)
xf = loop |> Map() do msg
@show msg.payload msg.next_points learner.points
return msg
end
foldl(right, xf, learner)
It is important to note that both strategies “break” some of the rule of transducers to implement feedback loop. Strategy 1 breaks that transducers are pure; it mutates Learner object. Strategy 2 breaks that reducible (= Learner2) should be agnostic about the accumulator; it unpacks val. But… is it bad to “break” the rule? I actually don’t know. Maybe it’s more like you can do extra stuff by adding structure to the framework (e.g., accumulator is always of type Message).
It allows the executor of the transducer to decide where to initialise the Learner. Also decouples it from the environment so it could be considered as pure (output depends on input + some stochasticity from computation race).
The transducer implementations look a little bit too cryptic for me. Perhaps I can manage to implement Loop(f,op,x->Learner()) as a closure of the function which one passes to map. I will try to do the example by evening.
Unparalized Loop
In the case where there is no parallel execution, I can write:
function Loop(f,op,learner)
l = learner()
function finner(input)
feedinput = ask!(l)
x = op(input,feedinput)
y = f(x)
tell!(l,(x,y))
return (x,y)
end
end
Then I could execute it on the with the map function:
map(Loop(f,op,x->AdaptiveLearner1D((0,1))),1:10)
where AdaptiveLearner1D could come from the package Adaptive.jl.
Paralized Loop
To generalise the Loop given above I introduce a concept of work distributor Master which captures workers to do the work as it comes in:
mutable struct Master
tasks
results
slaves
unresolved
end
"""
Gives the slave a duty to follow orders of his new Master
"""
function captureslave!(pid,f::Function,master::Master)
tasks, results = master.tasks, master.results
wp = @spawnat pid begin
while true
x = take!(tasks)
if x==nothing
break
else
y = f(x)
put!(results,(x,y))
end
end
end
push!(master.slaves,wp)
end
Master(tasks,results,slaves) = Master(tasks,results,slaves,0)
Master(tasks,results) = Master(tasks,results,[],0)
function Master(f::Function,wpool::AbstractWorkerPool)
tasks = RemoteChannel(()->Channel{Any}(10))
results = RemoteChannel(()->Channel{Tuple{Any,Any}}(10))
master = Master(tasks,results)
for p in wpool.workers
captureslave!(p,f,master)
end
return master
end
Master(f::Function) = Master(f,WorkerPool(nprocs()==1 ? [1] : workers()))
Then I could write paralel version of the Loop rather simply:
function LoopP(f,op,learner)
l = learner()
master = Master(f)
function finner(input)
while master.unresolved < length(master.slaves)
feedinput = ask!(l)
if feedinput==nothing
break
else
x = op(input,feedinput)
put!(master.tasks,x)
master.unresolved += 1
end
end
if master.unresolved==0
return nothing
else
y = take!(master.results)
tell!(l,(x,y))
master.unresolved -= 1
return (x,y)
end
end
end
Which could be executed with map in the same manner.
Thus since it is written out as a function on which one maps, it seems that it could also be generalised as a transducer. But is it possible?
EDIT I made few mistakes in LoopP which I have now fixed. The code is however not yet tested.
OK. Thanks for the MWEs. I’m now reasonably certain that the strategies I demonstrated above are the right approach if you want to use transducers. Probably strategy 1 (mutation-based) is easier. It should be fine as long as you don’t need to use it for very small models where “small-size optimization” (e.g., using StaticArrays) becomes important.
But, as you observed, the implementation based on reducibles and transducers is rather “cryptic.” It may be the indication that the combination of reducibles and transducers is not the right tool for your problem. Having said that, it may be useful if you want to fuse pre- and post- processings with learning. It would be great if you can explore this area because I’ve never tried. But I don’t want to blindly encourage this direction as there is a chance that it may not be as fruitful as you would have hoped.
I just finalised the design of the TaskMaster.jl package which takes inputs and puts outputs in the channels. I thought that it would be trivial to couple them also with transducers, but it just halts. The relevant code:
using Distributed
addprocs(2)
using TaskMaster
using Transducers
@everywhere f(x) = x^2
master = ProcMaster(f)
learner = IgnorantLearner(1:10)
loop = Loop(master,learner)
@sync begin
inch = Channel(Map(identity),1:10)
outch = Channel(1)
@async evaluate(loop,inch,outch)
for o in outch
@show o
end
### Gets stuck at the first element
collect(Map(identity),outch)
end
Also, it seems easily possible to initiate Master with transducer instead of a function with channels. Not sure how that would be useful. Maybe it would allow modelling electrical circuits.
It makes sense. I added that for loop subconsciously for debugging purposes. It just halts.
Ok. So I found that the bug is on the my side. A following code also halts:
inch = Channel(4)
put!(inch,3)
put!(inch,5)
put!(inch,9)
put!(inch,0)
#put!(inch,nothing)
outch = Channel(10)
@sync begin
@async evaluate(loop,inch,outch)
for o in outch
@show o
end
end
Putting nothing as the last element closes the channel in the evaluate. Perhaps I need only to add conditional to test if inch is closed instead of value of the last element.
And that indeed fixed the issue. Now I can couple Transducers with TaskMaster simply:
@everywhere f(x) = x^2
master = ProcMaster(f)
learner = IgnorantLearner(1:10)
loop = Loop(master,learner)
inch = Channel(Map(identity),1:10)
outch = Channel(10)
@sync begin
@async evaluate(loop,inch,outch)
collect(Map(identity),outch)
end
Would be nice to have a |> syntax for that though. How have you implemented |> syntax in transducers?
|> for Transducers means composition (I probably should have used ∘). So, if you want to sandwich a Loop with transducers, you need to turn Loop into a transducer. I think this would require one of the strategies I outlined above.
Note that it is quite easy to turn evaluate in an iterator transform:
evaluating(loop) = inch -> Channel() do outch
evaluate(loop, inch, outch)
end
outch = evaluating(loop)(inch)
collect(outch)
# or equivalently
inch |> evaluating(loop) |> collect
Cool! Iterator transform allowed me to reduce the code by 50 lines and improved readability a lot Seems then that iterator transforms are the way to go if one wants to couple TaskMaster.jl with Transducers.jl. The original problem thus is solved.
Note that iterator transforms are not transducers (they are related, though). But iterator transforms are also powerful tool for this kind of task. For example, Query.jl provides a nice interface for creating and composing iterator transforms. So, if you expose the API as an iterator transform, your package will interop nicely with with Queryverse ecosystem.