How to plumb together download -> uncompress -> untar without writing full downloaded file

I would like to extract a particular file from a large (say dozens of Gb) gzipped tar archive located somewhere else on the internet. I don’t want to actually write that whole archive to local disk, but instead stream the bytes to /dev/null with just one specified archive member being skimmed off and written to disk.

So I am already able to do this for small files with a plain IOBuffer and Downloads.download, Tar.jl, and TranscodedStreams. However for very large files the IOBuffer simply grows too large for my RAM, and when I provide a maxsize argument to IOBuffer Tar.extract simply complains about a premature EOF, so I’m assuming the the download filled the buffer and returned.

So I think I could go to some effort using @async or maybe Channels but I thought I’d ask here first as I have a suspicion that this shouldn’t be that hard with the right sort of Stream object.

1 Like

One idea would be to use an anonymous memory map:
https://docs.julialang.org/en/v1/stdlib/Mmap/#Mmap.Anonymous

Technically, this does write some of the data to disk

However, I think may need to access the stream API of zlib.

Currently CodecGzip.jl only exposes some of the API via ccall here:
https://github.com/JuliaIO/CodecZlib.jl/blob/master/src/libz.jl

You may need to access functions not currently bound by CodecZlib.jl. In that case, you may need to ccall these for yourself referring to the C header:
https://github.com/madler/zlib/blob/master/zlib.h

Perhaps the problem is actually with Tar.jl?

Anyways, perhaps showing us your code would help?

Here is simple code that fails. I’m using a local file for testing. We can leave Gzip out of it for now for simplicity:

p = IOBuffer(maxsize=100000)
Download.download("file:./local_archive_for_testing.tar",p)
loc = Tar.extract(x->x.path="file_I_want",p)

What I’m asking for is for p to work like a pipe between download and Tar.extract

import Downloads, Tar
io = PipeBuffer()
Downloads.download("file://localhost/path/to/file.tar", io)
loc = Tar.extract(x -> x.path == "file_I_want", io)

(but I think this downloads the whole file into the buffer before the Tar.extract, which is not what you want)

I think you’ll want some @async here because you want the decoding to overlap in time with the download. (In principle it’s possible to avoid the @async if you implement a custom stream to pass to download. This custom stream would need to drive the selection of your desired files out of the archive from within that custom stream’s write() method. But this seems fairly messy, unless someone happens to have implemented that already.)

For an async solution, you want a buffered blocking stream instead. There is one such stream in the undocumented Base.BufferStream which can be used for this purpose:

import Downloads, Tar, 
using CodecZlib

# Some necessary piracy - BufferStream doesn't have
# an implementation for `Base.skip()`
function Base.skip(io::Base.BufferStream, n)
    if n > 0
        read(io, n)
    else
        error("Can't skip backward in BufferStream")
    end
end

io = Base.BufferStream()
@sync begin
    @async begin
        Downloads.download("file://localhost/home/chris/tmp/testdata.tgz", io)
        @info "Download complete"
        close(io)
    end
    @async begin
        loc = Tar.extract(x -> x.path == "1.dat" ? (@info("Extracting", x); true) : (@info("Ignoring", x); false), GzipDecompressorStream(io))
        @info "Untar complete" loc
    end
end

Testing, we can see that extraction happens in a streaming manner:

┌ Info: Ignoring
└   x = Tar.Header("10.dat", :file, 0o644, 100000000, "")
┌ Info: Extracting
└   x = Tar.Header("1.dat", :file, 0o644, 100000000, "")
┌ Info: Ignoring
└   x = Tar.Header("2.dat", :file, 0o644, 100000000, "")
┌ Info: Ignoring
└   x = Tar.Header("3.dat", :file, 0o644, 100000000, "")
┌ Info: Ignoring
└   x = Tar.Header("4.dat", :file, 0o644, 100000000, "")
┌ Info: Ignoring
└   x = Tar.Header("5.dat", :file, 0o644, 100000000, "")
┌ Info: Ignoring
└   x = Tar.Header("6.dat", :file, 0o644, 100000000, "")
┌ Info: Ignoring
└   x = Tar.Header("7.dat", :file, 0o644, 100000000, "")
┌ Info: Ignoring
└   x = Tar.Header("8.dat", :file, 0o644, 100000000, "")
┌ Info: Ignoring
└   x = Tar.Header("9.dat", :file, 0o644, 100000000, "")
[ Info: Download complete
┌ Info: Untar complete
└   loc = "/tmp/jl_p3kvwB"
6 Likes

This looks like what I want. So I tried it out on a real internet file but it failed: I got no ‘Ignored’ messages after the first file in the archive, although the download proceeded to completion. It looks like Tar.extract ended up somewhere it shouldn’t in the stream, stopped consuming the buffer, and meanwhile download kept filling up the buffer and my RAM was starved.

julia> progress(total,now) = begin
       print("\r$now")
       end
progress (generic function with 1 method)

julia> io = Base.BufferStream()
BufferStream() bytes waiting:0, isopen:true

julia> @sync begin
       @async begin
       Downloads.download("https://data.proteindiffraction.org/ssgcid/3lls.tar",io,progress=progress)
       @info "download complete"
       close(io)
       end
       @async begin
       loc = Tar.extract(x -> x.path == "3lls/series/200873f12_x0181.img.bz2" ? (@info("Extracting",x); true) : (@info("Ignoring", x); false), io)
       @info "Untar complete" loc
       end
       end
┌ Info: Ignoring
└   x = Tar.Header("3lls/", :directory, 0o775, 0, "")
┌ Info: Ignoring
└   x = Tar.Header("3lls/series/", :directory, 0o755, 0, "")
┌ Info: Ignoring
└   x = Tar.Header("3lls/series/200873f12_x0152.img.bz2", :file, 0o664, 1250416, "")
499578880[ Info: download complete
ERROR: TaskFailedException
   nested task error: This does not appear to be a TAR file/stream — invalid version string for tar file: "\x1c>". Note: Tar.jl does not handle decompression; if the tarball is compressed you must use an external command like `gzcat` or package like CodecZlib.jl to decompress it. See the README file for examples.

Note that if I download the file to local disk, then run the same Tar.extract command, everything works correctly.

julia> oo = open("3lls.tar","r")
IOStream(<file 3lls.tar>)

julia> loc = Tar.extract(x -> x.path == "3lls/series/200873f12_x0181.img.bz2" ? (@info("Extracting",x); true) : (@info("Ignoring", x); false), oo)
┌ Info: Ignoring
└   x = Tar.Header("3lls/", :directory, 0o775, 0, "")
┌ Info: Ignoring
└   x = Tar.Header("3lls/series/", :directory, 0o755, 0, "")
┌ Info: Ignoring
└   x = Tar.Header("3lls/series/200873f12_x0152.img.bz2", :file, 0o664, 1250416, "")
┌ Info: Ignoring
└   x = Tar.Header("3lls/series/200873f12_x0318.img.bz2", :file, 0o664, 1248919, "")
#truncated
Info: Extracting
└   x = Tar.Header("3lls/series/200873f12_x0181.img.bz2", :file, 0o664, 1246912, "")
┌ Info: Ignoring
└   x = Tar.Header("3lls/series/200873f12_x0056.img.bz2", :file, 0o664, 1246759, "")
#truncated

You may need to modify the definition of skip() I gave you to ensure exactly n bytes are read. Try the following?

function Base.skip(io::Base.BufferStream, n)
    if n > 0
        read(io, n, all=true)
    else
        error("Can't skip backward in BufferStream")
    end
end

So that didn’t work, it turns out all is not a valid option when io is a BufferStream.

I suspect it is indeed the skip method that is causing the problem. The code paths in stream.jl for readbytes! are different depending on how large the read is.

I’ve also verified that everything works for a large tar file “downloaded” using file: protocol from local storage, it just fails when downloading across the network, which sounds like some problem with Tar waiting for the buffer to fill.

1 Like

which sounds like some problem with Tar waiting for the buffer to fill

Yes I thought there’s some short read happening here. If all doesn’t work, I was hoping just calling read again with the remaining size would be ok, but this doesn’t seem to work either.

The main trouble we’re having here is that Base.BufferStream isn’t a public API so it’s not that well tested, some things like skip are missing and the precise blocking behavior isn’t really documented. (I noticed something a bit weird about blocking — BufferStream seems to only block on read, not write, so the internal buffer might be able to grow indefinitely. Which is fine in this case because the Tar reader is likely faster than the writer doing the download, but really not great if you were tarring and uploading!)

Another alternative is to use Pipe, which is a publicly defined API and widely used for several things. IIUC the cost of blocking on a read or write to Pipe will be a lot higher than BufferStream because the Pipe needs to go through the operating system kernel. But the pipe should have a fixed size buffer, so should block on both read or write side which should be a lot more sensible in general.

The following seemed to work for me:

using Tar, Downloads

function Base.skip(io::Union{Base.BufferStream,Pipe}, n)
    if n > 0
        while n > 0 && isopen(io)
            buf = read(io, n)
            n -= length(buf)
            #if n > 0
            #    @info "Short read" length(buf)
            #end
        end
    else
        error("Can't skip backward in Pipe or BufferStream")
    end
end

io = Pipe()
# Initialize the pipe. I'm not sure there's a public API for this ??
Base.link_pipe!(io)
# Alternatively, use BufferStream... which should work
# but seems to get stuck for some reason
# io = Base.BufferStream()
@sync begin
    @async try
        Downloads.download("https://data.proteindiffraction.org/ssgcid/3lls.tar", io)
        @info "Download complete"
    catch exc
        @error "Caught exception" exc
    finally
        close(io)
    end
    @async try
        loc = Tar.extract(x -> x.path == "3lls/series/200873f12_x0181.img.bz2" ? (@info("Extracting", x); true) : (@info("Ignoring", x); false), io)
        @info "Untar complete" loc
    catch exc
        @error "Caught exception" exc
    finally
        close(io)
    end
end

If you wanted to abort the download once the particular file of interest has been read and extracted, you may be able to do close(io).

1 Like

There’s an issue for this, by the way: Document `Base.BufferStream` · Issue #42424 · JuliaLang/julia · GitHub

1 Like

Yes, the solution with Pipe works, thanks for figuring that out for me! Extra time in the OS is not an issue as download speed will dominate. My diagnosis with BufferStream had got to the point where I was pretty sure that the download task wasn’t filling the buffer fast enough for the Tar task, and instead of waiting the Tar task was doing something else. So the same code “downloading” from a local file works - the file read from local disk can keep up with Tar.

1 Like

What should ideally happen is that the tar Task should block indefinitely waiting for read() to complete with the requested bytes. While it’s blocked, the download Task should be running, slowing write()ing to the stream. Internally it seems that Base.readbytes!() can sometimes return short reads, but I don’t know the situations when this can happen, or whether it’s ever applicable to BufferStream. Also the fact that BufferStream doesn’t seem to block on write() at some max buffer size seems concerning.

Probably @jameson would have some insight into these issues.

yes, feel free to submit PRs to improve it

1 Like

See also https://github.com/staticfloat/SimpleBufferStream.jl.

1 Like

Nice suggestion, I didn’t know about this package.

@frtps it looks like SimpleBufferStream.jl is ideal and can be used instead of Pipe(). And certainly instead of Base.BufferStream as it currently exists.

I guess this should eventually replace the Base.BufferStream implementation but for now it’s nice to have in a separate package.

1 Like

I’ll try that too then, although Pipe works fine. My current problem is that raising an error in the Tar.extract predicate function before complete traversal of the tar file (in order to save download time) seems to make the extracted file disappear from the filesystem but that’s probably a new topic.