Optimize Apache Arrow data streaming over HTTP

Following a request on the Arrow.jl project, I started implementing a simple Julia example here. With the exeption of an upstream issue on macOS, it seems to work. However, the implementation seems to be less performant than others, e.g. the Julia server is 3x slower than the one written in Python.

The task is to serve a table of four (nullable) columns of Int64 data with 100 million rows in batches of 4096 rows as an Arrow IPC stream of record batches over an HTTP connection.

How can we make this as fast as possible in Julia?

4 Likes

Are you using OpenSSL instead of mbedTLS for these connections?

I’m just using default settings in the HTTP.serve call, which, if I’m not mistaken, is defaulting to no encryption.

After taking a look at the implementation I think the performance issue may be b/c the Julia code is allocating a buffer for the entire arrow stream before it starts sending data over the wire. If you look at the Python code you can see that they are writing out one batch at a time to a stream and then writing those bytes to the response stream (arrow-experiments/http/get_simple/python/server/server.py at main Β· apache/arrow-experiments Β· GitHub).

1 Like

I am new to Julia (been using it for 6 months) so I am not sure exactly how to accomplish that but will take a look over the next few days.

Yeah, it would be great if we could somehow just pass the Arrow stream directly to HTTP.jl.

You can pass an IO object instead of bytes to the HTTP.Response: API Reference Β· HTTP.jl. So I wonder if something like this could work:

function get_stream(::HTTP.Request)
    total_records = 100_000_000
    batch_len = 4096
    stream = Tables.partitioner(Iterators.partition(1:total_records, batch_len)) do indices
        nrows = length(indices)
        return (
            a = randint_nullable(nrows),
            b = randint_nullable(nrows),
            c = randint_nullable(nrows),
            d = randint_nullable(nrows)
        )
    end
    buffer = IOBuffer()
    t = Threads.@spawn (Arrow.write(buffer, stream); close(buffer))
    Base.errormonitor(t)
    return HTTP.Response(200, buffer)
end

(untested…). It is also unsatisfactory since if Arrow.write errors you will get a printed error but not an actual exception thrown… not sure how to handle that.

I believe that Arrow.write will internally use @wkspawn or @async so I’m not sure if the @spawn is needed as well in this code.

Here I am using it so we can start returning the response before write completes, which sounds like the differences with other implemetnations.

That does not work, HTTP.Response() expects a byte array. I’ve also tried writing directly to the HTTP.Stream and that failed as well. My last attempt was to use a BufferedOutputStream around the HTTP.Stream of the response and that also failed.

using Arrow, HTTP, Random, Tables, BufferedStreams

function randint_nullable(n::Integer)
    v = Vector{Union{Missing, Int}}(undef, n)
    rand!(v, Int)
    return v
end

function get_stream()
    total_records = 10_000
    batch_len = 4096
    stream = Tables.partitioner(Iterators.partition(1:total_records, batch_len)) do indices
        nrows = length(indices)
        return (
            a = randint_nullable(nrows),
            b = randint_nullable(nrows),
            c = randint_nullable(nrows),
            d = randint_nullable(nrows)
        )
    end
    return stream
end

HTTP.listen("127.0.0.1", 8008) do http::HTTP.Stream
    @show http.message
    @show HTTP.header(http, "Content-Type")
    while !eof(http)
        println("body data: ", String(readavailable(http)))
    end
    HTTP.setstatus(http, 200)
    HTTP.setheader(http, "Content-Type" => "application/octet-stream")
    HTTP.startwrite(http)

    buffered_io = BufferedOutputStream(http, 64 * 1024 * 1024)
    Arrow.write(buffered_io, get_stream())
    flush(buffered_io)
    close(buffered_io)
end

Here’s the error from the server side:

HTTP.header(http, "Content-Type") = ""
β”Œ Error: handle_connection handler error. 
β”‚ 
β”‚ ===========================
β”‚ HTTP Error message:
β”‚ 
β”‚ ERROR: BufferedOutputStream sink failed to write all data
β”‚ Stacktrace:
β”‚  [1] error(s::String)
β”‚    @ Base ./error.jl:35
β”‚  [2] flushbuffer!
β”‚    @ ~/.julia/packages/BufferedStreams/TNN7X/src/bufferedoutputstream.jl:57 [inlined]
β”‚  [3] flushbuffer!
β”‚    @ ~/.julia/packages/BufferedStreams/TNN7X/src/bufferedoutputstream.jl:54 [inlined]
β”‚  [4] flush
β”‚    @ ~/.julia/packages/BufferedStreams/TNN7X/src/bufferedoutputstream.jl:140 [inlined]
β”‚  [5] (::var"#133#134")(http::HTTP.Streams.Stream{HTTP.Messages.Request, HTTP.Connections.Connection{Sockets.TCPSocket}})
β”‚    @ Main ./REPL[169]:13
β”‚  [6] #invokelatest#2
β”‚    @ ./essentials.jl:887 [inlined]
β”‚  [7] invokelatest
β”‚    @ ./essentials.jl:884 [inlined]
β”‚  [8] handle_connection(f::Function, c::HTTP.Connections.Connection{Sockets.TCPSocket}, listener::HTTP.Servers.Listener{Nothing, Sockets.TCPServer}, readtimeout::Int64, access_log::Nothing)
β”‚    @ HTTP.Servers ~/.julia/packages/HTTP/PnoHb/src/Servers.jl:469
β”‚  [9] (::HTTP.Servers.var"#16#17"{var"#133#134", HTTP.Servers.Listener{Nothing, Sockets.TCPServer}, Set{HTTP.Connections.Connection}, Int64, Nothing, ReentrantLock, Base.Semaphore, HTTP.Connections.Connection{Sockets.TCPSocket}})()
β”‚    @ HTTP.Servers ~/.julia/packages/HTTP/PnoHb/src/Servers.jl:401
β”‚   request =
β”‚    HTTP.Messages.Request:
β”‚    """
β”‚    GET / HTTP/1.1
β”‚    Host: localhost:8008
β”‚    Accept: */*
β”‚    User-Agent: HTTP.jl/1.10.0
β”‚    Content-Length: 0
β”‚    Accept-Encoding: gzip
β”‚    
β”‚    """
β”” @ HTTP.Servers ~/.julia/packages/HTTP/PnoHb/src/Servers.jl:483
1 Like

And here’s the error from the client:

julia> execution_time = @elapsed get_batches()
ERROR: HTTP.RequestError:
HTTP.Request:
HTTP.Messages.Request:
"""
GET / HTTP/1.1
Host: localhost:8008
Accept: */*
User-Agent: HTTP.jl/1.10.0
Content-Length: 0
Accept-Encoding: gzip

"""Underlying error:
EOFError: read end of file
Stacktrace:
  [1] (::HTTP.ConnectionRequest.var"#connections#4"{…})(req::HTTP.Messages.Request; proxy::Nothing, socket_type::Type, socket_type_tls::Nothing, readtimeout::Int64, connect_timeout::Int64, logerrors::Bool, logtag::Nothing, kw::@Kwargs{…})
    @ HTTP.ConnectionRequest ~/.julia/packages/HTTP/PnoHb/src/clientlayers/ConnectionRequest.jl:140
  [2] connections
    @ ~/.julia/packages/HTTP/PnoHb/src/clientlayers/ConnectionRequest.jl:58 [inlined]
  [3] (::Base.var"#96#98"{…})(args::HTTP.Messages.Request; kwargs::@Kwargs{…})
    @ Base ./error.jl:308
  [4] (::HTTP.RetryRequest.var"#manageretries#3"{…})(req::HTTP.Messages.Request; retry::Bool, retries::Int64, retry_delays::ExponentialBackOff, retry_check::Function, retry_non_idempotent::Bool, kw::@Kwargs{…})
    @ HTTP.RetryRequest ~/.julia/packages/HTTP/PnoHb/src/clientlayers/RetryRequest.jl:75
  [5] manageretries
    @ ~/.julia/packages/HTTP/PnoHb/src/clientlayers/RetryRequest.jl:30 [inlined]
  [6] (::HTTP.CookieRequest.var"#managecookies#4"{…})(req::HTTP.Messages.Request; cookies::Bool, cookiejar::HTTP.Cookies.CookieJar, kw::@Kwargs{…})
    @ HTTP.CookieRequest ~/.julia/packages/HTTP/PnoHb/src/clientlayers/CookieRequest.jl:42
  [7] managecookies
    @ ~/.julia/packages/HTTP/PnoHb/src/clientlayers/CookieRequest.jl:19 [inlined]
  [8] (::HTTP.HeadersRequest.var"#defaultheaders#2"{…})(req::HTTP.Messages.Request; iofunction::Nothing, decompress::Nothing, basicauth::Bool, detect_content_type::Bool, canonicalize_headers::Bool, kw::@Kwargs{…})
    @ HTTP.HeadersRequest ~/.julia/packages/HTTP/PnoHb/src/clientlayers/HeadersRequest.jl:71
  [9] defaultheaders
    @ ~/.julia/packages/HTTP/PnoHb/src/clientlayers/HeadersRequest.jl:14 [inlined]
 [10] (::HTTP.RedirectRequest.var"#redirects#3"{…})(req::HTTP.Messages.Request; redirect::Bool, redirect_limit::Int64, redirect_method::Nothing, forwardheaders::Bool, response_stream::Nothing, kw::@Kwargs{…})
    @ HTTP.RedirectRequest ~/.julia/packages/HTTP/PnoHb/src/clientlayers/RedirectRequest.jl:25
 [11] redirects
    @ ~/.julia/packages/HTTP/PnoHb/src/clientlayers/RedirectRequest.jl:14 [inlined]
 [12] (::HTTP.MessageRequest.var"#makerequest#3"{…})(method::String, url::URIs.URI, headers::Nothing, body::Vector{…}; copyheaders::Bool, response_stream::Nothing, http_version::HTTP.Strings.HTTPVersion, verbose::Int64, kw::@Kwargs{})
    @ HTTP.MessageRequest ~/.julia/packages/HTTP/PnoHb/src/clientlayers/MessageRequest.jl:35
 [13] makerequest
    @ ~/.julia/packages/HTTP/PnoHb/src/clientlayers/MessageRequest.jl:24 [inlined]
 [14] request(stack::HTTP.MessageRequest.var"#makerequest#3"{…}, method::String, url::String, h::Nothing, b::Vector{…}, q::Nothing; headers::Nothing, body::Vector{…}, query::Nothing, kw::@Kwargs{})
    @ HTTP ~/.julia/packages/HTTP/PnoHb/src/HTTP.jl:457
 [15] request(stack::Function, method::String, url::String, h::Nothing, b::Vector{UInt8}, q::Nothing)
    @ HTTP ~/.julia/packages/HTTP/PnoHb/src/HTTP.jl:455
 [16] #request#20
    @ ~/.julia/packages/HTTP/PnoHb/src/HTTP.jl:315 [inlined]
 [17] request(stack::Function, method::String, url::String, h::Nothing, b::Vector{UInt8}, q::Nothing) (repeats 2 times)
    @ HTTP ~/.julia/packages/HTTP/PnoHb/src/HTTP.jl:313 [inlined]
 [18] get
    @ ~/.julia/packages/HTTP/PnoHb/src/HTTP.jl:518 [inlined]
 [19] get_batches()
    @ Main ./REPL[111]:2
 [20] top-level scope
    @ ./timing.jl:395

caused by: TaskFailedException

    nested task error: EOFError: read end of file
    Stacktrace:
     [1] closeread(http::HTTP.Streams.Stream{HTTP.Messages.Response, HTTP.Connections.Connection{Sockets.TCPSocket}})
       @ HTTP.Streams ~/.julia/packages/HTTP/PnoHb/src/Streams.jl:378
     [2] macro expansion
       @ ~/.julia/packages/HTTP/PnoHb/src/clientlayers/StreamRequest.jl:61 [inlined]
     [3] (::HTTP.StreamRequest.var"#3#5"{…})()
       @ HTTP.StreamRequest ~/.julia/packages/ConcurrentUtilities/J6iMP/src/ConcurrentUtilities.jl:9
    
    caused by: EOFError: read end of file
    Stacktrace:
      [1] read_to_buffer(c::HTTP.Connections.Connection{Sockets.TCPSocket}, sizehint::Int64)
        @ HTTP.Connections ~/.julia/packages/HTTP/PnoHb/src/Connections.jl:221
      [2] readuntil(c::HTTP.Connections.Connection{…}, f::typeof(HTTP.Parsers.find_end_of_chunk_size), sizehint::Int64)
        @ HTTP.Connections ~/.julia/packages/HTTP/PnoHb/src/Connections.jl:243
      [3] readuntil(c::HTTP.Connections.Connection, f::F, sizehint::Any) where F<:Function
        @ HTTP.Connections ~/.julia/packages/HTTP/PnoHb/src/Connections.jl:238 [inlined]
      [4] readchunksize(io::HTTP.Connections.Connection{Sockets.TCPSocket}, message::HTTP.Messages.Response)
        @ HTTP.Messages ~/.julia/packages/HTTP/PnoHb/src/Messages.jl:558
      [5] ntoread
        @ ~/.julia/packages/HTTP/PnoHb/src/Streams.jl:202 [inlined]
      [6] readall!(http::HTTP.Streams.Stream{HTTP.Messages.Response, HTTP.Connections.Connection{…}}, buf::IOBuffer)
        @ HTTP.Streams ~/.julia/packages/HTTP/PnoHb/src/Streams.jl:311
      [7] read (repeats 2 times)
        @ ~/.julia/packages/HTTP/PnoHb/src/Streams.jl:297 [inlined]
      [8] readbody!(stream::HTTP.Streams.Stream{…}, res::HTTP.Messages.Response, buf_or_stream::HTTP.Streams.Stream{…}, lock::ReentrantLock)
        @ HTTP.StreamRequest ~/.julia/packages/HTTP/PnoHb/src/clientlayers/StreamRequest.jl:169
      [9] readbody(stream::HTTP.Streams.Stream{…}, res::HTTP.Messages.Response, decompress::Nothing, lock::ReentrantLock)
        @ HTTP.StreamRequest ~/.julia/packages/HTTP/PnoHb/src/clientlayers/StreamRequest.jl:142
     [10] macro expansion
        @ ~/.julia/packages/HTTP/PnoHb/src/clientlayers/StreamRequest.jl:54 [inlined]
     [11] (::HTTP.StreamRequest.var"#3#5"{…})()
        @ HTTP.StreamRequest ~/.julia/packages/ConcurrentUtilities/J6iMP/src/ConcurrentUtilities.jl:9
Stacktrace:
  [1] sync_end(c::Channel{Any})
    @ Base ./task.jl:448
  [2] macro expansion
    @ ./task.jl:480 [inlined]
  [3] streamlayer(stream::HTTP.Streams.Stream{…}; iofunction::Nothing, decompress::Nothing, logerrors::Bool, logtag::Nothing, timedout::Nothing, kw::@Kwargs{…})
    @ HTTP.StreamRequest ~/.julia/packages/HTTP/PnoHb/src/clientlayers/StreamRequest.jl:35
  [4] streamlayer
    @ ~/.julia/packages/HTTP/PnoHb/src/clientlayers/StreamRequest.jl:21 [inlined]
  [5] (::HTTP.ExceptionRequest.var"#exceptions#2"{…})(stream::HTTP.Streams.Stream{…}; status_exception::Bool, timedout::Nothing, logerrors::Bool, logtag::Nothing, kw::@Kwargs{…})
    @ HTTP.ExceptionRequest ~/.julia/packages/HTTP/PnoHb/src/clientlayers/ExceptionRequest.jl:14
  [6] exceptions
    @ ~/.julia/packages/HTTP/PnoHb/src/clientlayers/ExceptionRequest.jl:13 [inlined]
  [7] (::HTTP.TimeoutRequest.var"#timeouts#3"{…})(stream::HTTP.Streams.Stream{…}; readtimeout::Int64, logerrors::Bool, logtag::Nothing, kw::@Kwargs{…})
    @ HTTP.TimeoutRequest ~/.julia/packages/HTTP/PnoHb/src/clientlayers/TimeoutRequest.jl:18
  [8] (::HTTP.ConnectionRequest.var"#connections#4"{…})(req::HTTP.Messages.Request; proxy::Nothing, socket_type::Type, socket_type_tls::Nothing, readtimeout::Int64, connect_timeout::Int64, logerrors::Bool, logtag::Nothing, kw::@Kwargs{…})
    @ HTTP.ConnectionRequest ~/.julia/packages/HTTP/PnoHb/src/clientlayers/ConnectionRequest.jl:119
  [9] connections
    @ ~/.julia/packages/HTTP/PnoHb/src/clientlayers/ConnectionRequest.jl:58 [inlined]
 [10] (::Base.var"#96#98"{…})(args::HTTP.Messages.Request; kwargs::@Kwargs{…})
    @ Base ./error.jl:308
 [11] (::HTTP.RetryRequest.var"#manageretries#3"{…})(req::HTTP.Messages.Request; retry::Bool, retries::Int64, retry_delays::ExponentialBackOff, retry_check::Function, retry_non_idempotent::Bool, kw::@Kwargs{…})
    @ HTTP.RetryRequest ~/.julia/packages/HTTP/PnoHb/src/clientlayers/RetryRequest.jl:75
 [12] manageretries
    @ ~/.julia/packages/HTTP/PnoHb/src/clientlayers/RetryRequest.jl:30 [inlined]
 [13] (::HTTP.CookieRequest.var"#managecookies#4"{…})(req::HTTP.Messages.Request; cookies::Bool, cookiejar::HTTP.Cookies.CookieJar, kw::@Kwargs{…})
    @ HTTP.CookieRequest ~/.julia/packages/HTTP/PnoHb/src/clientlayers/CookieRequest.jl:42
 [14] managecookies
    @ ~/.julia/packages/HTTP/PnoHb/src/clientlayers/CookieRequest.jl:19 [inlined]
 [15] (::HTTP.HeadersRequest.var"#defaultheaders#2"{…})(req::HTTP.Messages.Request; iofunction::Nothing, decompress::Nothing, basicauth::Bool, detect_content_type::Bool, canonicalize_headers::Bool, kw::@Kwargs{…})
    @ HTTP.HeadersRequest ~/.julia/packages/HTTP/PnoHb/src/clientlayers/HeadersRequest.jl:71
 [16] defaultheaders
    @ ~/.julia/packages/HTTP/PnoHb/src/clientlayers/HeadersRequest.jl:14 [inlined]
 [17] (::HTTP.RedirectRequest.var"#redirects#3"{…})(req::HTTP.Messages.Request; redirect::Bool, redirect_limit::Int64, redirect_method::Nothing, forwardheaders::Bool, response_stream::Nothing, kw::@Kwargs{…})
    @ HTTP.RedirectRequest ~/.julia/packages/HTTP/PnoHb/src/clientlayers/RedirectRequest.jl:25
 [18] redirects
    @ ~/.julia/packages/HTTP/PnoHb/src/clientlayers/RedirectRequest.jl:14 [inlined]
 [19] (::HTTP.MessageRequest.var"#makerequest#3"{…})(method::String, url::URIs.URI, headers::Nothing, body::Vector{…}; copyheaders::Bool, response_stream::Nothing, http_version::HTTP.Strings.HTTPVersion, verbose::Int64, kw::@Kwargs{})
    @ HTTP.MessageRequest ~/.julia/packages/HTTP/PnoHb/src/clientlayers/MessageRequest.jl:35
 [20] makerequest
    @ ~/.julia/packages/HTTP/PnoHb/src/clientlayers/MessageRequest.jl:24 [inlined]
 [21] request(stack::HTTP.MessageRequest.var"#makerequest#3"{…}, method::String, url::String, h::Nothing, b::Vector{…}, q::Nothing; headers::Nothing, body::Vector{…}, query::Nothing, kw::@Kwargs{})
    @ HTTP ~/.julia/packages/HTTP/PnoHb/src/HTTP.jl:457
 [22] request(stack::Function, method::String, url::String, h::Nothing, b::Vector{UInt8}, q::Nothing)
    @ HTTP ~/.julia/packages/HTTP/PnoHb/src/HTTP.jl:455
 [23] #request#20
    @ ~/.julia/packages/HTTP/PnoHb/src/HTTP.jl:315 [inlined]
 [24] request(stack::Function, method::String, url::String, h::Nothing, b::Vector{UInt8}, q::Nothing) (repeats 2 times)
    @ HTTP ~/.julia/packages/HTTP/PnoHb/src/HTTP.jl:313 [inlined]
 [25] get
    @ ~/.julia/packages/HTTP/PnoHb/src/HTTP.jl:518 [inlined]
 [26] get_batches()
    @ Main ./REPL[111]:2
 [27] top-level scope
    @ ./timing.jl:395
Some type information was truncated. Use `show(err)` to see complete types.

I took a look at the Python implementation, which uses http 1.1 chunked encoding, and attempted to re-create it in Julia. Got the server to respond to the client w/o errors and the client ends up with a byte count that seems ok, however no data is loaded (we end up with zero rows):

Server

function get_stream()
    total_records = 100_000
    batch_len = 4096
    stream = Tables.partitioner(Iterators.partition(1:total_records, batch_len)) do indices
        nrows = length(indices)
        return (
            a = randint_nullable(nrows),
            b = randint_nullable(nrows),
            c = randint_nullable(nrows),
            d = randint_nullable(nrows)
        )
    end
    return stream
end

HTTP.listen("127.0.0.1", 8008) do http::HTTP.Stream
    HTTP.setstatus(http, 200)
    #HTTP.setheader(http, "Content-Type" => "application/vnd.apache.arrow.stream")
    HTTP.setheader(http, "Transfer-Encoding" => "chunked")

    batches = get_stream()
    HTTP.startwrite(http)
    for batch in batches
        buffer = IOBuffer()
        Arrow.write(buffer, batches, file=false)
        bytes = take!(buffer)
        write(http, "$(string(length(bytes), base=16))\r\n")
        write(http, bytes)
        write(http, "\r\n")
    end
    write(http, "0\r\n\r\n")
end

Client REPL

julia> get_batches()
80187255 bytes received
0 record batches received
Arrow.Table[]

We may need some help from folks like @quinnj to shed some light into what we are doing wrong here (maybe I need to encode the bytes or not send the hex string for the chunk sizes?). Or maybe the issue is on the other side (the client is somehow mangling the chunk encoded response).

(Please note that for testing I reduced the target number of rows to 100,000)

2 Likes

Should you be using batch instead of batches?

Sorry, this was a left over from me throwing code around to see if I got different results. I believe writing the individual batch values got the same result. Will confirm once back at home.

Yep, I get the same result. Based on the byte count I would guess the client code probably only sees the last batch that the server writes:

julia> get_batches()
3214429 bytes received
0 record batches received
Arrow.Table[]

Is there a way to avoid that and use Arrow.write as a write to memory?

I tried this the other day to handle the received batches as they arrive. No idea how fast or slow it is, compared:

client:

using Arrow, HTTP

function process_batches(f::Function)

    buffer = Vector{UInt8}()

    HTTP.open("GET", "http://localhost:8008") do io
        startread(io)

        while !eof(io)
            len = parse(Int, readline(io), base = 16)
            len == 0 && break
            resize!(buffer, len)
            n_read = 0
            while n_read < len
                n_read += readbytes!(io, @view(buffer[n_read+1:end]), len - n_read)
            end
            f(Arrow.Stream(buffer))
            readline(io) # empty line
        end
    end

    return
end

n::Ref{Int} = Ref(0)
function accumulate(str)
    n[] += 1
end
# warmup run
process_batches(accumulate)
n[] = 0

execution_time = @elapsed process_batches(accumulate)
println("$(execution_time) seconds elapsed")
println("$(n[]) chunks received")

server:

using Arrow, HTTP, Random, Tables

function randint_nullable(n::Integer)
    v = Vector{Union{Missing, Int}}(undef, n)
    rand!(v, Int)
    return v
end

function get_stream()
    total_records = 100_000_000
    batch_len = 4096
    stream = Tables.partitioner(Iterators.partition(1:total_records, batch_len)) do indices
        nrows = length(indices)
        return (
            a = randint_nullable(nrows),
            b = randint_nullable(nrows),
            c = randint_nullable(nrows),
            d = randint_nullable(nrows)
        )
    end
    return stream
end

HTTP.listen("127.0.0.1", 8008) do http::HTTP.Stream
    HTTP.setstatus(http, 200)
    HTTP.setheader(http, "Transfer-Encoding" => "chunked")

    batches = get_stream()
    HTTP.startwrite(http)

    buffer = IOBuffer()

    for batch in batches
        truncate(buffer, 0)
        Arrow.write(buffer, batch, file=false)
        nbytes = position(buffer)
        seekstart(buffer)
        write(http, "$(string(nbytes, base=16))\r\n")
        write(http, buffer)
        write(http, "\r\n")
    end
    write(http, "0\r\n\r\n")
end
2 Likes

@jules , I made a few small changes to your client code to get back an array of arrow tables and got it all to work.

@simsurace, it would be good to see if you could compare this runtime of this version with your original implementation, my MBP is not as powerful as yours (2020 Intel i5 vintage).

Client

using Arrow, HTTP, BufferedStreams

function process_batches()
    buffer = Vector{UInt8}()
    record_batches = []
    HTTP.open("GET", "http://localhost:8008") do http_io
        startread(http_io)
        buffered_io = BufferedInputStream(http_io)
        while !eof(buffered_io)
            # Get length of chunk
            len = parse(Int, readline(buffered_io), base=16)
            # We are done once we hit a zero-length chunk
            len == 0 && break
            resize!(buffer, len)
            n_read = 0
            while n_read < len
                n_read += readbytes!(buffered_io, @view(buffer[n_read+1:end]), len - n_read)
            end
            # We are getting streams with one record batch in them
            # So before we stack them in record_batches we grab the 
            # one and only record batch in the stream we just downloaded. 
            push!(record_batches, collect(Arrow.Stream(buffer))[1])
            readline(buffered_io) # empty line
        end
    end
    record_batches
end

# warmup run
batches = process_batches()
# benchmark run
execution_time = @elapsed batches = process_batches()

println("$(execution_time) seconds elapsed")
println("$(length(batches)) record batches received")

On my MBP, it takes ~18 seconds to download and deserialize all the record batches. This implementation can run with the 100 million rows dataset since we are returning individual chunks of smaller single table record batches from the server. Many thanks to @jules for showing us the way to the light.

2 Likes

Just got a new MBP with an M3 Pro chip in it and the same code now takes 4.62 seconds on Julia 1.10.3.

@simsurace, @jules, et al., please let me know if you’re ok with me submitting a PR on GitHub for this. I can credit y’all in the request.

1 Like