Help with DataStreams toy problem

I am trying to learn my way around DataStreams. I thought I would implement a simple source which just gives random values (code below). Yet when I write a CSV from it, the file only contains the header. What am I doing wrong? 0.6, released versions of both packages.

using DataStreams
using CSV

struct RandSource <: Data.Source
    schema::Data.Schema{true} # number of rows are known from the schema
end

randfield(T) = rand(T)
randfield(String) = fill(rand('A':'Z'), rand(1:3))

Data.schema(rs::RandSource) = rs.schema
Data.schema(rs::RandSource, ::Type{Data.Field}) = rs.schema
Data.isdone(rs::RandSource, row, col) = row ≤ size(rs.schema, 1)
Data.streamtype(::Type{RandSource}, ::Type{Data.Field}) = true

function Data.streamfrom{T}(rs::RandSource, ::Type{Data.Field}, ::Type{T}, row, col)
    randfield(rs.schema.types(row))
end

s = Data.Schema(["A","B","C"], [Int,Int,Int], 100)

rs = RandSource(s)

size(rs)                        # (100,3), from the schema

CSV.write("/tmp/test.csv", rs)  # file only has a header

You should do

Data.isdone(rs::RandSource, row, col) = row > size(rs.schema, 1)

Note also that in your code only randfield(String) (i.e. rand(T) won’t get called). This is because the CSV sink will try to pull data from your source as strings. The type parameter to Data.streamfrom is interpreted as “stream as type”. This is definitely confusing and I think needs to be made clearer.

I think certain aspects of the interface should be rethought. I think it would make sense for streamfrom always try to convert to the type specified by the schema, allowing users to define a Data.vectortype to define what types of vectors should get pulled. I also think it’s very important that ranges of rows can be pulled. In fact, I think I’ll write a post.

Also, if you’re interested, I wrote Estuaries.jl which is a DataTables-like interface for anything implementing the basic DataStreams interface. It should make it much easier to pull data for anything implementing Data.Field based streaming, but I haven’t tested it very much for Data.streamto! yet.

1 Like

I certainly think we’re coming up due for another iteration of the DataStreams interface, so any input/feedback is really useful; we should start a new thread where everyone can chime in and we can make sure to design to the evolving set of use-cases.

I’m currently doing a lot of thinking and experimenting on null-value representation and am working towards having the next iteration of DataStreams being based on a new foundation of missingness that will enjoy compiler optimizations and pave the way to 1.0 for all these packages. @ExpandingMan, mind starting the new thread and I’ll chime in there with thoughts as well?

2 Likes

Absolutely, I’ve been thinking about this a lot too, and I have a lot of ideas. I think the DataStreams interface is a really good idea, and I’d love to see every tabular data source/sink in Julia have a really robust interface for it, and for said interfaces to only need to be < 100 lines of code.

Give me a bit to put my thoughts together coherently.

1 Like

Thanks for the help. I believe that randfield is passed a column type from the schema though, so with the correct signature, it will be called. MWE here:

using DataStreams
using CSV

struct RandSource <: Data.Source
    schema::Data.Schema{true} # number of rows are known from the schema
end

randfield(::Type{Int}) = rand(1:100)
randfield(::Type{String}) = String(fill(rand('A':'Z'), rand(1:3)))

Data.schema(rs::RandSource) = rs.schema
Data.schema(rs::RandSource, ::Type{Data.Field}) = rs.schema
Data.isdone(rs::RandSource, row, col) = row > size(rs.schema, 1)
Data.streamtype(::Type{RandSource}, ::Type{Data.Field}) = true

function Data.streamfrom{T}(rs::RandSource, ::Type{Data.Field}, ::Type{T}, row, col)
    randfield(rs.schema.types[col])
end

rs = RandSource(Data.Schema(["A","B","C"], [Int,String,Int], 100))
CSV.write("/tmp/test.csv", rs)

Another thing: for the Data.Field methods, is traversal guaranteed to be row-major (ie inner index columns, outer index rows, all contiguous?).

Documentation and examples would help, even though the source is nicely written and understandable.

Yeah, let’s face it: currently writing these interfaces is really confusing largely because of the typing. I’ve posted my ideas for an overhaul here. Certainly we also need much better documentation.

Currently traversal is always column-major (as in columns first, as far as I know), but one of my proposals is to change that.

Now I am confused, as modifying the above routine with

function Data.streamfrom{T}(rs::RandSource, ::Type{Data.Field}, ::Type{T}, row, col)
    @printf("row %02d col %02d\n", row, col)
    randfield(rs.schema.types[col])
end

gives an output like

row 01 col 01
row 01 col 02
row 01 col 03
row 02 col 01
row 02 col 02
row 02 col 03
row 03 col 01
row 03 col 02
row 03 col 03
...

so it looks row-major. Even when using a format that essentially column-based, like Feather.write.

Ugh yea, now I’m confused too. Like I said, this really needs to be selectable, it’s one of the major problems with the current implementation.

I don’t quite follow why this is confusing:

  • Data.streamfrom(src, Data.Field, ...) is streaming via rows, so data will be accessed, row-by-row, and for each row, column-by-column
  • Data.streamfrom(src, Data.Column, ...) is streaming via columns, so data will be accessed column-by-column

Hopefully everyone has been able to find the documentation here.

I think there are two different things going on here:

  1. What types of batches will the data be pulled in? It is possible to pull the data one element at a time, or in some sort of batches. I originally took the meaning of Data.Columns to mean that data will be pulled in batches equal to entire columns. This is the behavior of DataTables and DataFrames, but this is not the behavior of ODBC, which is why I was originally confused when using that package.
  2. Will pulling be done in row-major or column-major ordering? If batches are only one element long (i.e. Data.Field) it’s possible to do either.

It wasn’t clear (to me at least) which one of these things Data.Field and Data.Column are talking about. If I specify Data.Field it should still be possible for me to pull in column-major ordering.