Lazy generators

Hello Julia Community,

I like to translate a chunked file loader that uses generators to Julia. The idea is to implement it in a most lazy way that triggers the execution and continuation of the costly file-loading only when requested (i.e. on take!). I have assembled an example (without the file loading) to describe the issue:

The Julia translation of the application looks like this:

println("A  ::use it");
# if(somecondition)
    for x in ch_unbuf
        println("A  ::got ",x);
        # if(someothercondition); break; end
    end
# end

for a simple “generator” g

N = 0;
g = function (c::Channel)
    println("  G::init!");
    put!(c,N+=1);
    println("  G::calc!");
    put!(c,N+=1);
    println("  G::calc!");
    put!(c,N+=1);
    println("  G::end!");
    # close(c); # don't need this when using `bind(ch_unbuf,task)`
end

I like to achieve the following program flow:

A  ::instanciate
A  ::use it
  G::init!
A  ::got 6
  G::calc!
A  ::got 7
  G::calc!
A  ::got 8
  G::end!

So the g task should only run/continue when requested by the first “user” code-snippet.
Observe, how the “generator” acts more like a source that might not run at all depending on somecondition or might be interrupted by someothercondition.

I got this running with the following setup after a little fiddling around with it. Again: I am interested in the case where a take! on the channel should block the taking task and schedule the putting task “on the other side”.

So here is what I already got to make the above program flow happen (including the commented-out fiddle):

println("A  ::instanciate");
N = 5;
# ch_unbuf = Channel(g); # ↯ this does instantly run g one time
ch_unbuf = Channel(0); # use an unbuffered `0` channel to trigger execution only when needed
# task = @task g(ch_unbuf); # ↯ creates a "runnable" task that won't run instantly, but does not run at all since the task is not scheduled and somehow the for-in loop does not dare to do so either
# task = schedule(@task g(ch_unbuf)); # creates a "queued" task that won't run instantly, but will (since its scheduled) run at least once, even if the for-in does not iterate at all
task = @async g(ch_unbuf); # this is somewhat the same as the previous line
bind(ch_unbuf,task); # this automatically closes the channel when the tasks finishes execution (and therefore terminates the iteration of for … in …)

There are some Issues with the current approach:

It seems that creating the task as a “runnable” one, without scheduling it, and then iterating over with for … in … is the most elegant way. Unfortunately, for … in …, when waiting on the channel of that “runnable” task, won’t schedule it (and therefore nothing happens at all). Scheduling the task in the instanciate phase results in executing it at least once (at some time lateron), even if the for … in … loop does not execute at all (because it might get skipped by somecondition).

So the only solution I could come up with, is scheduling the ch_unbuf channels task exactly at the point when it is first used. This is easy to determine for this example, but in a more complex decision tree it becomes more elaborate.

• Is there a better approach to this?
• Shouldn’t a for … in … loop be able to schedule the task (I assume that it is possible to find out the channels one task to which lifetime the channel is bound and shedule it if its just “runnable” and not “queued”)?
• Or what am I missing here conceptually?

regards,
Christian

How about implementing an iterable type as below? You could manually call iterate if you do not want to process data in a loop.


struct WorkGenerator
end

function generate_data(i)
    println("I generated data $i")
    i^2
end

function Base.iterate(wg::WorkGenerator, state=1)
    if state == 10
        return nothing
    end
    data = generate_data(state)
    data, state+1
end

for data in WorkGenerator()
    println("I got data $data")
end

# Output:
I generated data 1
I got data 1
I generated data 2
I got data 4
I generated data 3
I got data 9
I generated data 4
I got data 16
I generated data 5
I got data 25
I generated data 6
I got data 36
...

A package that implement a resumable function pattern is

I once wrote a Lazy JSON parser that kept the parser state in a seperate task and used yieldto to explicitly pass control to that task as needed.

It worked, but the task switching overhead was a huge performance hit. In the end I reimplemented it using the iteration interface (as @baggepinnen suggested).

@baggepinnen Thank you very much for the working iterator example!

Yes, I am torn between explicitly managing all the state manually for the advantage of the easier iterator interface (which you yet teached me) and the power of a Julia-coroutine doing all this transparently (hopefully).

Or is it possible to avoid mentioning this state all the time e.g. in

it = WorkGenerator();
data, state = iterate(it);
println("I got data $data");
data, state = iterate(it,state);
println("I got data $data");
data, state = iterate(it,state);
println("I got data $data");

for data in it # ↯ starts again from 1 … how to continue at `state`?
  println("I got data $data")
  # if(someothercondition); break; end
end

?

I think that this should not be an issue for the file chunk application, since each task processes some tens of megabytes of data → so there should be few task-switches in comparison to the data. But yes, this is the most likely redesign to keep in mind. I’ll have a look into that code on the weekend.

@samoconnor do you have some insights on how costly it is to use this channel-based instant-scheduling mechanism in comparison to an iterator? I.e. how yieldto-to, yieldto-back does perform in comparison to a plain, non-inlined function call?

regards,
Christian

how costly it is to use this channel-based instant-scheduling mechanism in comparison to an iterator

@christianl I can’t really quantify this. In my case I was parsing a few bytes of JSON at a time and the overhead was a complete show stopper. In may cases the complier can completely inline an iterator method, so it’s hard to beat that. However, as you say, if you’re processing big chunks of data between each task switch, the overhead may be unimportant in your case.

There is one important insight about Julia’s unique capabilities that I realised while working on this stuff is the ability to return multiple values from a function.

Writing stateful data-stream processors in C, you tend to have to store all the state variables away in a struct, and that gets messy, so you end up wanting threads so that you can have a simpler readable function that just uses local variables for state.

In Julia, you can return multiple values from a function and the compiler will still do good optimisation (inlining, passing values in registers, etc). So in my code I have a pattern where the parsing functions take (s, i, c) and return (s, i, c), a string, an index and the current character. The code ends up being terse and easy to read because there is no struct unpacking/packing at the beginning and end of each function and performance is good because there are no mutable structs and no boxing of values and passing by reference.

You could use a Stateful iterator:

julia> it = Iterators.Stateful(1:10)
Base.Iterators.Stateful{UnitRange{Int64},Union{Nothing, Tuple{Int64,Int64}}}(1:10, (1, 1), 0)

julia> iterate(it)
(1, nothing)

julia> iterate(it)
(2, nothing)

julia> iterate(it)
(3,nothing)