Proposed overhaul of DataStreams

The DataStreams package is a great idea, and I’ve very much embraced it, but I think most agree it’s time for a major overhaul as it’s confusing to users and doesn’t cover enough use cases. I’ve written my ideas out in my Estuaries.jl package (which is for providing a DataTables-like interface for anything implementing DataStreams) here. I’ve copied and pasted below, for convenience. I volunteer to provide lots of help for changing DataStreams.jl itself, as well as any packages implementing it.

Hopefully this will get a conversation going about how we can make DataStreams as useful as possible.

Proposed Overhaul of the DataStreams Interface

At the risk of getting to broad, let me try to describe the problems that I think DataStreams is trying to solve (of course, this is just my opinion and may
not be settled on). There are \ge 3 major problems which need to be addressed when performing data manipulation:

  1. Transformation and Integration: This includes joining, grouping, mapping and is the only one of these three problems which is usually addressed. It is
    (more or less) solved by the various implementations of dataframes and databases. It is intended that this sort of stuff is pretty much done by the time we
    get to DataStreams (though it already allows for simple 1 to 1 mappings).
  2. The API Problem: Different data sources have a bewildering variety of different API’s. Having a uniform interface for all of them is really hard.
  3. The ABI Problem: You would think the problem of binary formats would be solved by having API’s, but this isn’t quite true. The reason for this is that
    the optimal way of calling said API’s depends on the binary format. (This is largely just the distinction between row-wise and column-wise data.)

I think DataStreams should attempt to solve 2 and 3 by providing a way to create a uniform interface for any type of data source/sink by writing a bare
minimum (\ll 100 lines of code in most cases) interface. It should take as little time and effort as possible to implement DataStreams on a new tabular
data source/sink, I believe that was the original goal. I think the special properties of Julia (speed, multiple-dispatch/overloading, metaprogramming) present
us with a unique opportunity to do this well.

I’ll list only things that I think should be changed rather than describing the whole existing API.

Overhaul of Data.streamfrom

At least one of the following methods should be defined for every source:

  • Data.streamfrom(src, row, col). Typing is handled by the schemas, see below.
  • Data.streamfrom(src, row::AbstractVector, col). This allows column, or partial column based streaming.
  • Data.streamfrom(src, col). This allows column based streaming, but only for the whole column.

I propose that we eliminate Data.Column and Data.Field entirely, and instead use Julia’s built-in introspection functions to determine which of the above
methods is defined for the source (Data.StreamType can be used for something else, see below). This cuts down on how much code needs to be written by the
user. For the time being row and col should be Integers (or, in the case of row, AbstractVector{<:Integer}). Eventually we should try to address
non-integer indexing schemes, so we won’t assert this, but let’s put that on the back-burner.

Note that the user can’t define Data.streamfrom(src, row, col::AbstractVector). This is because there’d be little advantage to doing this because of Julia’s
in-memory data format. Eventually we could allow this to be defined possibly using NamedTuples, but it doesn’t seem like a priority.

Typing

Instead of explicitly passing type parameters to the above, the appropriate types will be inferred based on the schema. For the case of columns, by default it
is assumed that a Vector{T} is returned unless the element-type is Nullable{T}, in which case it is assumed to be NullableVector{T}. After 1.0, the types
in the schema should be Union types if there are nulls, and we can always assume Vector{T}.

To override these assumptions, we allow the user to optionally define:

  • Data.vectortype(src, ::Type{T}). This would return the type to be returned for column with element type T.
  • Data.vectortype(src, col). This allows users to specify special vector types for specific columns. It needn’t be defined, but if it is, it overrides the
    above.

Repurposed Data.StreamType

The source should define

  • Data.streamtype(src). This would return either Data.Column or Data.Row, defaulting to Data.Column. See below.

If Data.Column, streaming occurs one column at a time using Data.streamfrom(src, col) or Data.streamfrom(src, row::AbstractVector, col) if available. If
Data.Row, streaming occurs one row at a time using Data.streamfrom(src, row, col). The reason for this is that some data sources (i.e. some SQL) are
serialized in rows (I think, regardless they are easier to access by row, whether this is an artifact of the API I’m not always sure).

Batching

It should be possible to do streaming in batches. This is because, depending on the nature of the source and sink, data may be stored in memory when streaming,
which becomes a problem if it is too big for memory. This is only a problem if internally Data.streamfrom involves some sort of buffering. I haven’t thought
this through yet, but I think it should involve the optional declaration of something like Data.batchsize(src). We should probably implement the above before
working on batching, but we should keep in mind that we ultimately want to be able to do it.

Overhaul of Data.streamto!

This one is harder. Here’s my stab at it (again, typing handled by schemas, see below), users should define at least one of the following:

  • Data.streamto!(snk, row, col, val)
  • Data.streamto!(snk, row::AbstractVector, col, val::AbstractVector) obviously one should have length(val) == length(row).
  • Data.streamto!(snk, col, val::AbstractVector) obviously one should have length(val) == size(snk, 1).

These are analogous to Data.streamfrom.

Typing

When streaming one field at a time, a single convert(Tsnk, val) for each element is called in Data.stream! where Tsnk is the element type of the sink
column and val is the output of Data.streamfrom. If this would fail, users are required to compensate for this using the transforms dictionary
(DataStreams won’t make any further guesses about how to achieve the conversion).

When streaming one column (or partial column) at a time, a single convert(Data.vectortype(snk, Tsnk), val) is called in Data.stream! where Tsnk is the
element type of the sink column and val is the output (vector) from Data.streamfrom. As with sources, Data.vectortype(snk, Tsnk) will default to
Vector{Tsnk} or NullableVector{Tsnk}.

Data.StreamType

In most cases it probably makes sense to use the streaming type of the source. I suppose in cases where we want the sink to have a different StreamType from
the source, we could impose some sort of buffering scheme, but I think we should put that on the back burner.

Batching

Again, for now let’s inherit any sort of batching from the source, eventually we can implement some sort of buffering scheme.

Conclusion

Alright, that’s my attempt. Everything I didn’t mention here I’d keep pretty much the same. The three really crucial pieces of this (in my mind) are that we

  1. allow partial column streaming,
  2. allow Data.stream! to cycle through either rows first or columns first and
  3. that we simplify the (currently very confusing) typing situation for the end users.

I’m sure those who wrote DataStreams in the first place will have lots of ideas about where this goes wrong (or perhaps it would go so wrong they’ll reject it
outright).

Update: I’ve spent some time thinking about this and working out a prototype. There seem to be 2 new challenges with doing things this way that were not originally a problem with DataStreams

  1. One has to get data from a source to a sink which may accept data in a completely different way than the source. For example, we might have a source that streams by columns, entire columns at a time into a sink that does N rows at a time. I don’t see any way around this other than batching/buffering. This seems a bit beyond the original scope of DataStreams, but it seems important if DataStreams is to work on large scales.

  2. All the inferences I mentioned might introduce some type instability. This can probably be resolved, but it may be difficult. It won’t be clear if this is a problem until item 1 is sorted out more.

3 Likes

Thanks for the write-up @ExpandingMan. Here are a couple thoughts in response:

DataStream Aims

From the beginning, the aims of DataStreams have included:

  • Source/Sink decoupling: this solves the combinatorial problem of introducing a new source/sink into the ecosystem and having to worry about compatibility with other formats; a new format should be able to implement the source and/or sink interfaces separately and automatically be able to stream to/from any other format. The 1st iteration has accomplished this goal.
  • Performance: we also want this framework to be performant; users expect things in Julia to be fast and there’s no reason they shouldn’t be. There’s inevitably going to be slight overhead by utilizing a completely decoupled framework, but my goals has always been to keep that overhead to a minimum (< 5% of total runtime). The 1st iteration has also accomplished this goal.
  • Functionality with simplicity: we also want the framework to provide flexibility and power in how we stream data (including doing transformations, partial row filtering like you’ve mentioned, etc.). We also want the framework to be dead simple as you said: a new format should be able to quickly implement the interfaces. I think this point is where the 1st iteration has fallen short; some partial functionality has been provided, but as you’ve mentioned, the simplicity of the framework leaves some to be desired.

Next Steps

I’m currently working on updates to the framework that will hopefully address this 3rd point without sacrificing the first two points. In particular, I’d like to simplify what a Data.Schema is and how to make/generate one, the required Data.Sink constructors, as well as the Data.streamfrom/Data.streamto! functions you’ve already mentioned. A few notes on the updates I’m considering:

  • We still have to include type information has we stream to/from; as you mentioned in your “Update”, I think you’ve already discovered that just having a Data.streamfrom(src, row, col) isn’t going to work, we need to pass some type information around so that subsequent functions can remain type stable.
  • It’s also unfortunate that you seem to not understand the various Data.StreamTypes; admittedly, they’re a bit hard to name and fulfill slightly different roles for sources vs sinks. For a Source, the Data.StreamType indicates the data access pattern that the Source supports, recognizing that the data WILL ALWAYS be accessed column-by-column. Data.Field is meant to express that only a single row can be accessed at a time (again, column-by-column), whereas Data.Column expresses that the Source can support the streaming of an entire batch of rows at a time (again, for a single column at a time, column-by-column). This is most evident when we compare a CSV.Source vs. a DataFrame. For a CSV.Source, it’s an inherent limitation of the format that you cannot stream more than one row at a time; when reading a csv file, you must read sequential lines out of the file, one row at a time, reading each column for each row. For a DataFrame, however, you could use the same access pattern as a csv file, accessing each cell individually, starting with row 1, then accessing columns 1, 2, 3, …N. You could also access an entire batch of rows, 1:M, and provide a vector of values for columns 1, 2, 3, …N. A DataFrame, then, is more flexible in the types of streaming it supports while csv is inherently limited due to the row-wise focus of its format.

As a final note, I’m also currently experimenting with using an alternative NULL value representation in the form of Union{T, Null} and Vector{Union{T, Null}}. Core Julia developers have indicated that this representation has many possible optimizations that can be implemented in Base Julia soon and could provide long-term benefits for data analysis over using a Nullable{T} approach and incurring the cost of wrapping every value. You can see current progress for DataStreams.jl, CSV.jl, and DataFrames.jl, by cloning each repo and doing Pkg.checkout(pkg, "jq/gangy") for each.

Hi @quinnj, thanks for taking the time to read.

DataStreams Aims: I hear you loud and clear and these were always things I considered to be goals of the package. (Except to say, if it’s the case that we need to sacrifice a bit of performance on small datasets in order to gain some on large datasets, I think we should do that.)

Next Steps: (addressed by bullet point)

  • Yes, I’m definitely seeing that it is more difficult that it would at first seem to have Data.streamfrom(src, row, col) without passing around any sort of type information. I’m still not totally convinced, however, that this is something the user has to deal with by default. In the vast majority of cases I’d think that Vector types would be adequate, especially if the miraculous-sounding unions optimizations that have been advertised for 1.0 actually happen. Note that even in the current implementation Data.stream! gets type arguments from the schmema, so I’d think we could do streamfrom(src, row, col) just as well. I think it should also be made to clear to users exactly when and how conversions take place, if at all.

  • Thanks a lot for this explanation. Indeed, I was very confused about this before. Judging from comments I suspect I wasn’t the only one. At the very least, an explanation like the one you just gave should go into the documentation. I think part of what was so confusing about this to me is that it is still possible to do column based streaming one field at a time, so the label Data.Field misled me.

I still think the following needs to be addressed however:

Batching: If I’m understanding you correctly, one is required to stream only entire columns every time one does Data.Column based streaming. It needs to be possible for users to access a range of rows (or even just a single field) in the Data.Column case. Users should also be able to do the streaming in batches in case the dataset is too large to fit in memory. Any suggestion on what those methods should look like? Perhaps Data.streamfrom(src, ::Type{Data.Column}, ::Type{T}, row, col) where T <: AbstractVector and row is optionally an AbstractVector? It seems that Data.stream! would need some sort of batch_size argument.

Finally, I’m a bit confused about what happens if, for instance, the source is Data.Column but the sink is Data.Field, could you provide an overview?

@quinnj, I’m going to write an update of Feather.jl to work with your new DataStreams implementation. My plan is as follows:

  • Data.Batch streaming: this will stream entire columns like it already does.
  • Data.Row streaming: by default this will stream one field at a time. If row isa AbstractVector it will attempt to stream a contiguous set of fields from a column.

Any thoughts?

1 Like

So I’ve been thinking about partial column filtering and I wonder if we should take a non-DataStreams approach. The problem w/ trying to include it in DataStreams is then you have to somewhat assume that all Data.Source that support Data.Column streaming can also support partial column streaming. For Feather or DataTables/Frames, it’s not really that big of a deal because you only ever need to stream each column once, but for ODBC, columns on very large resultsets require continuing to iterate thru them until the resultset is exhausted. So even though it supports streaming columns at a time, it might not be the the final entire column; in that case, what does rows::AbstractVector really mean for an ODBC result? It doesn’t make sense to assume you want to filter the same row indices for each set of result columns.

2 different thoughts I’ve had on this:

  • Allow the user to include a “filter” function when streaming: this would apply equally to Data.Field and Data.Column filtering, but the user would pass in slightly different types of functions. For Data.Field, the user could pass a function that took an entire row (tuple) as input and returned true or false to indicate if the row should be included in the final output. For Data.Column, the function could take a single column as input and return a boolean vector to indicate the rows that should be included in the final output.
  • Alternatively, I think this is probably easiest implemented for individual Data.Sink types; i.e Feather.Sink could take a limit or filter function or rows::AbstractVector argument that would be applied when streaming. That way the sink itself can better implement the partial-column streaming (and more efficiently probably)

I’m confused about something: why not allow only Data.Field streaming for ODBC (and the like)? There’s no benefit to having Data.streamfrom(src, Data.Batch,...) for something like that. You could then implement column-wise streaming (by default) on columnar datasets that implement Data.Batch (and optionally Data.Field) and row-wise streaming on datasets that only implement Data.Field and still allow for partial column streaming (in the Data.Batch case).

Indeed, I have found the way ODBC and other SQL API’s work to be extremely frustrating, and they really screw up schemes like DataStreams. They just absolutely were not designed to accomodate anything other than pulling all and only the results of specific queries passed as strings of valid SQL code. They really are spectacularly terrible. I wonder if there are better alternatives for Postgres, but it’s a moot point if not all of your databases are Postgres.

I think your filtering idea is a good one, but it would be nice if we could keep DataStreams simple and let the packages it plugs into figure out that type of filtering. Incidentally, something similar was one component of a package that I started a while back to try to easily stream data (via batches) to machine-ingestable formats (something I’ve still yet to achieve).

There absolutely is a benefit to allowing ODBC to stream columns (current Data.Column); if I remember correctly, you perhaps were dealing specifically with a DB vendor which didn’t implement their ODBC driver correctly and were never able to stream more than one row at a time anyway, but that is the exception: most database ODBC drivers support batch streaming of columns and thus benefit greatly rather than doing requests one field (row really) at a time. The problem is that in almost all cases, this “fetching” of results from the DB incurs high network costs, so any way to reduce those network costs can greatly help performance, i.e. only making a single network request for an entire batch of data instead of one request for each row.

But I think we’re in agreement, it makes the most sense to have individual sinks implement partial row filtering for now if they support it and we can revisit this at a later time if it there’s a way to make DataStreams support it generically.

Hm… that doesn’t quite jive with my experience trying to figure ODBC out for MSSQL and postgres. The documentation isn’t exactly spectacular, so it’s quite possible I was just confused.

Let me describe the type of use case I’m really trying to get at:

Suppose I have an ~200GB source (for concreteness, let’s say it’s a feather) and I want to ~500MB batches of it at a time into memory. Certainly one can do this any time Data.Field streaming is supported, however, this will be inefficient because it requires the computation of each individual pointer. In practice, I have found that calculating individual pointers in Feather is way faster than any method of pulling data from something like SQL, as should be expected. So, perhaps it’s not really an issue right now, we can just use Data.Field streaming in this type of use case.

The only alternative I can think of right now would be to have a 3rd Data.StreamType, which certainly does not seem ideal. I’ll post here if I think of anything better.

The other thing we haven’t really discussed yet is if a source or sink only defines Data.Field based streaming, how does one know whether this should be done row-wise or column-wise?