Outputting string columns using Avro.jl

When I try to convert an Arrow file to Avro using this code:

function t_arrowtoavro(path, file)
    a = Arrow.Table(joinpath(path, "$(file).arrow")) |> DataFrame
    println(Tables.schema(a))
    Avro.writetable(joinpath(path, "$(file).avro"), a; compress=:zstd)
end

I get the following error whenever the input file contains a string column, e.g.

Tables.Schema:
 :IndividualId  Int32
 :ResultDate    Union{Missing, Date}
 :HIVResult     Union{Missing, String}
ERROR: ArgumentError: internal writing error: buffer too small, len = 1563130

Whereas this version does not produce this error:

function t_arrowtoavro(path, file)
    a = Arrow.Table(joinpath(path, "$(file).arrow")) |> DataFrame
    println(Tables.schema(a))
    b = select(a, :IndividualId, :ResultDate)
    println(Tables.schema(b))
    Avro.writetable(joinpath(path, "$(file).avro"), b; compress=:zstd)
end

Output:

Tables.Schema:
 :IndividualId  Int32
 :ResultDate    Union{Missing, Date}
 :HIVResult     Union{Missing, String}
Tables.Schema:
 :IndividualId  Int32
 :ResultDate    Union{Missing, Date}
"D:\\Data\\Demography\\AHRI\\Staging\\HIVResults.avro"

How do I resolve this problem if I need a string column in my output file?

Hi,

I’m aware that this is an old thread, no it I’ve got exactly the same issue in trying to leverage Avro.jl. Is there a fix or way to mitigate?

Regards,

The OP doesn’t share the stacktrace leading to the error. If you get the same error, sharing a more complete stacktrace might speed up pinpointing the issue.

Hi,

thanks for the response - fair point, stack trace below:


ERROR: ArgumentError: internal writing error: buffer too small, len = 320989
Stacktrace:
  [1] buffertoosmall(len::Int64)
    @ Avro C:\Users\username\.julia\packages\Avro\BcJmh\src\utils.jl:1
  [2] macro expansion
    @ C:\Users\username\.julia\packages\Avro\BcJmh\src\utils.jl:10 [inlined]
  [3] _writevalue(B::Avro.Binary, ::Avro.StringType, x::String, buf::Vector{UInt8}, pos::Int64, len::Int64, opts::@Kwargs{})  
    @ Avro C:\Users\username\.julia\packages\Avro\BcJmh\src\types\binary.jl:215
  [4] writevalue
    @ C:\Users\username\.julia\packages\Avro\BcJmh\src\types\binary.jl:205 [inlined]
  [5] writevalue(B::Avro.Binary, UT::Vector{Union{…}}, x::String, buf::Vector{UInt8}, pos::Int64, len::Int64, opts::@Kwargs{})
    @ Avro C:\Users\username\.julia\packages\Avro\BcJmh\src\types\unions.jl:44
  [6] (::Avro.RowWriteClosure{Avro.Binary, Tables.Schema{…}, Vector{…}, @Kwargs{}})(val::String, i::Int64, nm::Symbol)
    @ Avro C:\Users\username\.julia\packages\Avro\BcJmh\src\types\rows.jl:34  
  [7] eachcolumn
    @ C:\Users\username\.julia\packages\Tables\NSGZI\src\utils.jl:75 [inlined]
  [8] writevalue(B::Avro.Binary, T::Avro.RowType{…}, row::DataFrameRow{…}, buf::Vector{…}, pos::Int64, len::Int64, opts::@Kwargs{})
    @ Avro C:\Users\username\.julia\packages\Avro\BcJmh\src\types\rows.jl:39
  [9] writewithschema(io::IOStream, parts::Tuple{…}, rows::DataFrames.DataFrameRows{…}, st::Int64, sch::Tables.Schema{…}, dictrow::Bool, compress::Symbol, kw::@Kwargs{})
    @ Avro C:\Users\username\.julia\packages\Avro\BcJmh\src\tables.jl:89
 [10] writetable(io::IOStream, source::SubDataFrame{…}; compress::Symbol, kw::@Kwargs{})
    @ Avro C:\Users\username\.julia\packages\Avro\BcJmh\src\tables.jl:55
 [11] writetable
    @ C:\Users\username\.julia\packages\Avro\BcJmh\src\tables.jl:42 [inlined]
 [12] #34
    @ C:\Users\username\.julia\packages\Avro\BcJmh\src\tables.jl:37 [inlined]
 [13] open(::Avro.var"#34#35"{@Kwargs{…}, SubDataFrame{…}}, ::String, ::Vararg{String}; kwargs::@Kwargs{})
    @ Base .\io.jl:396
 [14] open
    @ Base .\io.jl:393 [inlined]
 [15] #writetable#33
    @ Avro C:\Users\username\.julia\packages\Avro\BcJmh\src\tables.jl:36 [inlined]
 [16] top-level scope
    @ c:\Users\username\ArrowAvroConversion.jl:23
Some type information was truncated. Use `show(err)` to see complete types.

Regards,

Trying to follow the stacktrace, my impression is the memory reserved by writetable in Avro is based on the first row. But with String values and Union values, the row sizes can differ, and the estimates here:

isn’t calculated correctly, leading to the eventual trouble.

How can this be fixed? Nothing easy comes to mind. An issue should be filed. Meanwhile, if there is some row which is the longest, having it as the first row can be a super rough workaround.

nbytes(...) can be used to calculate row bytes from schema on each row. Still not sure about how variable length fields such as String would be handled.

Hi,

Many thanks for the response, Dan. I came to the same conclusion and altered as follows:

function writewithschema(io, parts, rows, st, sch, dictrow, compress, kw)
    comp = get(COMPRESSORS, compress, nothing)

    schtyp = schematype(sch)
    meta = Dict("avro.schema" => JSON3.write(schtyp))
    if comp !== nothing
        meta["avro.codec"] = String(compress)
    end
    sync = _cast(NTuple{16, UInt8}, rand(UInt128))
    buf = write((magic=MAGIC, meta=meta, sync=sync); schema=FileHeaderRecordType)
    Base.write(io, buf)
    @debug 1 "wrote file header from bytes 1:$(pos - 1)"
    i = 1
    while true
        # if rows didn't have schema or length, we materialized w/ Tables.dictrowtable
        nrow = length(rows)
        @debug 1 "writing block count ($nrow) at pos = $pos"
        rowsstate = iterate(rows)
        pos = 1
        if rowsstate === nothing
            bytes = UInt8[]
            pos = 0
        else
            row, rowst = rowsstate
            # calc nbytes on all rows to find max, then allocate bytes
            bytesperrow = nbytes(schtyp, row)
            while true 
                rowsstate = iterate(rows, rowst)
                rowsstate === nothing && break
                row, rowst = rowsstate
                nb = nbytes(schtyp, row)
                if nb > bytesperrow
                    bytesperrow = nb
                end
            end
            rowsstate = iterate(rows)
            row, rowst = rowsstate
            blen = trunc(Int, nrow * bytesperrow * 1.05) # add 5% cushion
            bytes = Vector{UInt8}(undef, blen)
            n = 1
            nb = nbytes(schtyp, row)
            while true
                pos = writevalue(Binary(), schtyp, row, bytes, pos, blen, kw)
                rowsstate = iterate(rows, rowst)
                rowsstate === nothing && break
                row, rowst = rowsstate
                nb = nbytes(schtyp, row)
                bytesperrow += nb
                n += 1
            end
        end
        # compress
        if comp !== nothing
            finalbytes = transcode(comp[Threads.threadid()], unsafe_wrap(Base.Array, pointer(bytes), pos - 1))
        else
            finalbytes = bytes
        end
        block = Block(nrow, view(finalbytes, 1:length(finalbytes)), sync)
        buf = write(block; schema=BlockType)
        Base.write(io, buf)
        state = iterate(parts, st)
        state === nothing && break
        part, st = state
        rows = Tables.rows(part)
        sch = Tables.schema(rows)
        if dictrow
            rows = Tables.dictrowtable(rows)
        end
    end
    return
end

instead of assessing the first index, the entire table is scanned to establish the maximum row size as that guarantee that the bytes vector is large enough for downstream processing. Definitely bigger than necessary and with a performance cost but will work (memory permitting). I could have summed the bytes and assigned the vector size with this but its not clear to me if this would work in every case.

To your point, there isnt an easy way to address this without altering this method or forcing the sort by size.

Edit: here is the updated issue raised in Avro.jl : Problem writing string columns to avro file · Issue #17 · JuliaData/Avro.jl (github.com) and created the PR here: Updated writewithschema in tables.jl by djholiver · Pull Request #20 · JuliaData/Avro.jl (github.com)

Regards,

1 Like