I am trying to learn my way around DataStreams. I thought I would implement a simple source which just gives random values (code below). Yet when I write a CSV from it, the file only contains the header. What am I doing wrong? 0.6, released versions of both packages.
using DataStreams
using CSV
struct RandSource <: Data.Source
schema::Data.Schema{true} # number of rows are known from the schema
end
randfield(T) = rand(T)
randfield(String) = fill(rand('A':'Z'), rand(1:3))
Data.schema(rs::RandSource) = rs.schema
Data.schema(rs::RandSource, ::Type{Data.Field}) = rs.schema
Data.isdone(rs::RandSource, row, col) = row ≤ size(rs.schema, 1)
Data.streamtype(::Type{RandSource}, ::Type{Data.Field}) = true
function Data.streamfrom{T}(rs::RandSource, ::Type{Data.Field}, ::Type{T}, row, col)
randfield(rs.schema.types(row))
end
s = Data.Schema(["A","B","C"], [Int,Int,Int], 100)
rs = RandSource(s)
size(rs) # (100,3), from the schema
CSV.write("/tmp/test.csv", rs) # file only has a header
Note also that in your code only randfield(String) (i.e. rand(T) won’t get called). This is because the CSV sink will try to pull data from your source as strings. The type parameter to Data.streamfrom is interpreted as “stream as type”. This is definitely confusing and I think needs to be made clearer.
I think certain aspects of the interface should be rethought. I think it would make sense for streamfrom always try to convert to the type specified by the schema, allowing users to define a Data.vectortype to define what types of vectors should get pulled. I also think it’s very important that ranges of rows can be pulled. In fact, I think I’ll write a post.
Also, if you’re interested, I wrote Estuaries.jl which is a DataTables-like interface for anything implementing the basic DataStreams interface. It should make it much easier to pull data for anything implementing Data.Field based streaming, but I haven’t tested it very much for Data.streamto! yet.
I certainly think we’re coming up due for another iteration of the DataStreams interface, so any input/feedback is really useful; we should start a new thread where everyone can chime in and we can make sure to design to the evolving set of use-cases.
I’m currently doing a lot of thinking and experimenting on null-value representation and am working towards having the next iteration of DataStreams being based on a new foundation of missingness that will enjoy compiler optimizations and pave the way to 1.0 for all these packages. @ExpandingMan, mind starting the new thread and I’ll chime in there with thoughts as well?
Absolutely, I’ve been thinking about this a lot too, and I have a lot of ideas. I think the DataStreams interface is a really good idea, and I’d love to see every tabular data source/sink in Julia have a really robust interface for it, and for said interfaces to only need to be < 100 lines of code.
Give me a bit to put my thoughts together coherently.
Thanks for the help. I believe that randfield is passed a column type from the schema though, so with the correct signature, it will be called. MWE here:
using DataStreams
using CSV
struct RandSource <: Data.Source
schema::Data.Schema{true} # number of rows are known from the schema
end
randfield(::Type{Int}) = rand(1:100)
randfield(::Type{String}) = String(fill(rand('A':'Z'), rand(1:3)))
Data.schema(rs::RandSource) = rs.schema
Data.schema(rs::RandSource, ::Type{Data.Field}) = rs.schema
Data.isdone(rs::RandSource, row, col) = row > size(rs.schema, 1)
Data.streamtype(::Type{RandSource}, ::Type{Data.Field}) = true
function Data.streamfrom{T}(rs::RandSource, ::Type{Data.Field}, ::Type{T}, row, col)
randfield(rs.schema.types[col])
end
rs = RandSource(Data.Schema(["A","B","C"], [Int,String,Int], 100))
CSV.write("/tmp/test.csv", rs)
Another thing: for the Data.Field methods, is traversal guaranteed to be row-major (ie inner index columns, outer index rows, all contiguous?).
Yeah, let’s face it: currently writing these interfaces is really confusing largely because of the typing. I’ve posted my ideas for an overhaul here. Certainly we also need much better documentation.
Currently traversal is always column-major (as in columns first, as far as I know), but one of my proposals is to change that.
Now I am confused, as modifying the above routine with
function Data.streamfrom{T}(rs::RandSource, ::Type{Data.Field}, ::Type{T}, row, col)
@printf("row %02d col %02d\n", row, col)
randfield(rs.schema.types[col])
end
gives an output like
row 01 col 01
row 01 col 02
row 01 col 03
row 02 col 01
row 02 col 02
row 02 col 03
row 03 col 01
row 03 col 02
row 03 col 03
...
so it looks row-major. Even when using a format that essentially column-based, like Feather.write.
I think there are two different things going on here:
What types of batches will the data be pulled in? It is possible to pull the data one element at a time, or in some sort of batches. I originally took the meaning of Data.Columns to mean that data will be pulled in batches equal to entire columns. This is the behavior of DataTables and DataFrames, but this is not the behavior of ODBC, which is why I was originally confused when using that package.
Will pulling be done in row-major or column-major ordering? If batches are only one element long (i.e. Data.Field) it’s possible to do either.
It wasn’t clear (to me at least) which one of these things Data.Field and Data.Column are talking about. If I specify Data.Field it should still be possible for me to pull in column-major ordering.