HTTP.jl async is slow compared to python+aiohttp

So @fabiangans and I have been having discussions about improving the performance of Zarr.jl. The goal is to improve the time it takes to load data from a remote zarr store (eg on HTTP, GCS, or S3). The current implementation for this is very fast in zarr-python because it uses async, so we have been trying to also start loading chunks from remote storage using async. However we have found that retrieving the chunks async via HTTP.jl is several times slower than through python which is using aiohttp. On my connection which is fairly fast (~3 Gbps), the difference is barely noticeable for retrieving very small files but quickly widens for larger files. In this example each file I download is about ~28 MB.

via aiohttp:

In [24]: keys = [f'analysed_sst/{i}.1.0' for i in range(100)]
In [25]: async def get_items(keys):
    ...:     async with aiohttp.ClientSession() as session:
    ...:         tasks = []
    ...:         for ckey in keys:
    ...:             url = f"https://mur-sst.s3.us-west-2.amazonaws.com/zarr-v1/{ckey}"
    ...:             tasks.append(asyncio.create_task(get_cdata(session, url)))
    ...:         cdatas = await asyncio.gather(*tasks)
    ...:         return cdatas
    ...: 

In [26]: async def get_cdata(session, url):
    ...:     async with session.get(url) as resp:
    ...:         cdata = await resp.read()
    ...:         return cdata
    ...: 

In [27]: %time cdatas = asyncio.run(get_items(keys))
CPU times: user 3.27 s, sys: 1.49 s, total: 4.76 s
Wall time: 8.44 s

via HTTP.jl

julia> urls = map(i->"https://mur-sst.s3.us-west-2.amazonaws.com/zarr-v1/analysed_sst/$i.1.0", 0:99);

julia> @time asyncmap(url->HTTP.request("GET", url, status_exception=false).body, urls);
 30.422709 seconds (1.35 M allocations: 4.911 GiB, 1.18% gc time, 0.11% compilation time)

I wanted to post this here in case people know of better ways to do async HTTP requests in Julia, but this (or using @sync … @async macros) seems to be the standard way I see people doing it.

I am no expert on this subject, but my hunch is that the main reason this is much slower is because aiohttp nativel implements even some of the lower level (including reading the response bytes) asynchronously, as explained here. What I would like to know is if there are any better alternative native Julia libraries that could be competitive or if there is a better way to do this in HTTP.jl.

6 Likes

I’m curious about this as well.

Julia uses a fork of libuv:

An alternative to try might be Downloads.download or Base.download. Downloads.downloads seem to be a bit faster.

Some other thoughts:

  1. Have you tried this with @spawn and threads rather than @async?
  2. A lot of memory is being allocated. I wonder if offering a response_stream may help.
2 Likes

I think Scaling IO with number of threads to achieve high throughput · Issue #48232 · JuliaLang/julia · GitHub is related as well; my understanding is RelationalAI folks have worked on pushing up network throughput, and found OpenSSL outperformed MbedTLS, and then after that they were bottlenecked by libuv. The linked issue discusses that in more detail.

4 Likes

I don’t think it’s a libuv limitation in this case. My python script actually used the uvloop backend for asyncio (https://uvloop.readthedocs.io/). With default asyncio, it’s maybe around 2 seconds slower actually.

I have also tried Downloads.jl and found it to be just as slow if not slower. Threads.@threads did make a noticeable improvement, but obviously this comes at the cost of much higher CPU usage and it’s still twice as slow as the aiohttp version. I think it would be best to stick with async if we can rather than resorting to using threads for this.

Do you have an example using a response_stream for large batches of async requests like this?

1 Like

Introduction

I went through several steps of optimization of the Julia code. Basically we need to make sure we are running compiled code and that we have optimized memory usage as much as possible.

I used bmon to check my network usage. The first thing I noticed below was that the peak receiving bandwidth was higher with asyncio. This made me wonder if there connection limit was holding HTTP.jl back.

The effective wall times are now comparable for me after optimization. Your situation may require further tuning with a higher bandwidth connection.

Python / asyncio

Here’s what I see with Python asyncio.run:

In [11]: %time cdatas = asyncio.run(get_items(keys))
CPU times: user 18.7 s, sys: 3.81 s, total: 22.5 s
Wall time: 37.7 s

image

Initial Julia code

Here is what I see with your Julia code:

julia> const urls = map(i->"https://mur-sst.s3.us-west-2.amazonaws.com/zarr-v1/analysed_sst/$i.1.0", 0:99);

julia> @time asyncmap(url->HTTP.request("GET", url, status_exception=false).body, urls);
 73.768142 seconds (12.42 M allocations: 5.409 GiB, 0.81% gc time, 15.31% compilation time)

image

Compiled function

Putting this into a function and making sure it gets compiled, I then get the following results via Julia:

julia> function f()
           asyncmap(url->HTTP.request("GET", url, status_exception=false).body, urls);
       end

julia> @time f()
 51.969691 seconds (3.01 M allocations: 4.816 GiB, 0.42% gc time)

image

Julia optimization with preallocated buffers

Optimizations:

  1. Use a function to compile the code
  2. Make all the globals const
  3. Increase the connection_limit
  4. Preallocate the buffers
  5. Using Threads.@spawn to allow tasks to use a threads.
julia> const urls = map(i->"https://mur-sst.s3.us-west-2.amazonaws.com/zarr-v1/analysed_sst/$i.1.0", 0:99);

julia> const buffers = [IOBuffer(; sizehint = 64*1024*1024, maxsize=64*1024*1024) for x in 1:100]

julia> function f()
           seekstart.(buffers)
           @sync map(urls, buffers) do url, buffer
               Threads.@spawn HTTP.request("GET", url, status_exception=false, connection_limit=25, response_stream=buffer)
           end
       end

julia> @time f()
 35.779649 seconds (5.80 M allocations: 176.242 MiB, 0.21% compilation time)

julia> seekstart.(buffers); read.(buffers)
100-element Vector{Vector{UInt8}}:
 [0x02, 0x01, 0x21, 0x02, 0x60, 0x38, 0xdc, 0x03, 0x00, 0x00  …  0x0f, 0x02, 0x00, 0x08, 0x50, 0xb5, 0xb5, 0xb5, 0xb4, 0xb4]
...

image

Discussion

Part of the optimization above is general to Julia. Make your globals const or at least binding them to a type assists in precompilation. Creating a function is also helpful for precompilation. Managing memory is also important and I suspect that this accounts for some difference.

Above we preallocated a lot of memory partially based on prior knowledge. This prior knowledge could be obtained via a single HTTP request to the following URL and then parsing the returned XML:

https://mur-sst.s3.us-west-2.amazonaws.com/?prefix=zarr-v1/analysed_sst

This uses the Amazon S3 ListObjectsV2 API:

We may not have to preallocate all the memory. We just need enough to handle the number of concurrent connections. We could then copy the memory out and then reuse the IOBuffers.

5 Likes

Thanks for attempting to more thoroughly investigate this. I do know that wrapping everything in a function and labeling global variables const is an often mentioned Julia performance best practice, but it made no difference in any of my benchmarks unlike yours (I am not sure why, wondering if they improved upon this in more recent versions of Julia?) so I ended up getting into the habit of timing the single call to asyncmap out of convenience.

Anyways, the more important issue remains that even after applying most of your suggested optimizations (I avoided global variables entirely and made them local to the function since this is what we would want to do in Zarr.jl), it still doesn’t reach parity with the python+asyncio version, at least not for me.

function f()
           urls = map(i->"https://mur-sst.s3.us-west-2.amazonaws.com/zarr-v1/analysed_sst/$i.1.0", 0:99);
           buffers = [IOBuffer(; sizehint = 64*1024*1024, maxsize=64*1024*1024) for x in 1:100]
           seekstart.(buffers)
           @sync map(urls, buffers) do url, buffer
               Threads.@spawn HTTP.request("GET", url, status_exception=false, connection_limit=10, response_stream=buffer)
           end
           seekstart.(buffers)
           read.(buffers)
       end

julia> @time f();
 16.865996 seconds (1.35 M allocations: 9.039 GiB, 8.98% gc time)

So for me at least we are still 2x slower than python+asyncio. When including the buffer reads at the end, the memory usage is now higher than my previous pure-async version. I tried tinkering with the connection_limit and got similar results regardless of the number. Also, I got identical numbers when just using a simpler version of this with no async, just Threads.@threads and without manually writing to stream buffers. In both cases CPU usage is much higher than python+asyncio also since everything is done in one thread.

I suspect our discrepancies are happening because the machine I am running this on has a few times more bandwidth than yours does (probably 3-4x based on your numbers). Just another point of reference, I have confirmed that I can do this with PythonCall.jl with little to no overhead through fsspec:

julia> using PythonCall

julia> function get_cdatas(url, keys)
           fsspec = pyimport("fsspec")
           fs = fsspec.filesystem("http")
           m = fs.get_mapper(url)
           cdatas = m.getitems(keys)
           return map(k->PyArray(cdatas[k]), keys)
       end
get_cdatas (generic function with 1 method)

julia> url = "https://mur-sst.s3.us-west-2.amazonaws.com/zarr-v1"
"https://mur-sst.s3.us-west-2.amazonaws.com/zarr-v1"

julia> keys = ["analysed_sst/$i.1.0" for i in 0:99];

julia> @time get_cdatas(url, keys);
  9.373391 seconds (74.59 k allocations: 3.771 MiB, 0.54% compilation time)
2 Likes

I notice that you are timing the allocation of the buffers. I purposely removed this from the timing since that takes a considerable amount of time.

As I mentioned in the discussion, the actual implementation should consider using a better strategy for buffer use. aiohttp is preallocating and reusing its buffers.

I don’t think that’s a significant factor here. This time I ran everything exactly as you have laid out but don’t see a substantial difference. Allocating the buffers just takes around 2s, the rest is averaging anywhere from 16-20s.

julia> using HTTP

julia> const buffers = [IOBuffer(; sizehint = 64*1024*1024, maxsize=64*1024*1024) for x in 1:100];^C

julia> const urls = map(i->"https://mur-sst.s3.us-west-2.amazonaws.com/zarr-v1/analysed_sst/$i.1.0", 0:99);

julia> @time const buffers = [IOBuffer(; sizehint = 64*1024*1024, maxsize=64*1024*1024) for x in 1:100];
2.016907 seconds (83.07 k allocations: 6.254 GiB, 23.55% gc time, 1.51% compilation time)

julia> function f()
                  seekstart.(buffers)
                  @sync map(urls, buffers) do url, buffer
                      Threads.@spawn HTTP.request("GET", url, status_exception=false, connection_limit=25, response_stream=buffer)
                  end
              end
f (generic function with 1 method)

julia> @time f();
 18.316000 seconds (17.40 M allocations: 967.361 MiB, 48.65% compilation time)

The timing is reporting a significant amount of compilation time (48.65%). What happens when you run @time f() again? I noticed that I had to wait between runs to get the best times. Also how many threads are you using and on what OS?

julia> 18.316000 * (1 - 0.4865)
9.405266000000001

So excluding compilation time, we get about the same as you get with python/calling your python version. This combined with using threading instead of concurrent tasks (asyncmap does not use multithreading! The spawned tasks are bound to the thread they are spawned on.) makes up for the difference.

The preallocation of the buffers, while good practice in general, is a red herring here.

Ok, that did the trick! Now it takes anywhere form 7-10 s.

However, I still also get the same running time without resorting to manually allocating stream buffers, and this is also considerably less complex. I could see that perhaps improving memory efficiency in the long run, but this is code we are considering placing into Zarr.jl to download data remotely from chunks which is itself dependent on the size of array slices which can vary depending on workflow. So I think the added complexity wouldn’t really be worth it, I think most of the performance gains came from resorting to using Threads and increasing the connection limit.

And all of this still goes back to my main question. Ideally, it would be better to avoid using threads and stick to async, the point I am mentioning is that I am able to get these numbers in python with far lower CPU usage since only one thread is being used. So what I was hoping to see was a way to do the same thing in Julia.

2 Likes

Async and threads are not very different concepts in Julia. In both cases, we create a Task. The only difference is whether the task sticks to the current thread or not. If Julia is started with only one thread, no -t option is given, then the two situations are essentially the same with some small differences in overhead, especially when concurrency issues are not directly involved. Take a look at the macro expansion of @async versus Threads.@spawn:

julia> @macroexpand @async println(5)
quote
    #= task.jl:517 =#
    let
        #= task.jl:518 =#
        local var"#229#task" = Base.Task((()->begin
                            #= task.jl:514 =#
                            println(5)
                        end))
        #= task.jl:519 =#
        if $(Expr(:islocal, Symbol("##sync#48")))
            #= task.jl:520 =#
            Base.put!(var"##sync#48", var"#229#task")
        end
        #= task.jl:522 =#
        Base.schedule(var"#229#task")
        #= task.jl:523 =#
        var"#229#task"
    end
end

julia> @macroexpand Threads.@spawn println(5)
quote
    #= threadingconstructs.jl:343 =#
    let
        #= threadingconstructs.jl:344 =#
        local var"#231#task" = Base.Threads.Task((()->begin
                            #= threadingconstructs.jl:340 =#
                            println(5)
                        end))
        #= threadingconstructs.jl:345 =#
        (var"#231#task").sticky = false
        #= threadingconstructs.jl:346 =#
        ccall(:jl_set_task_threadpoolid, Base.Threads.Cint, (Base.Threads.Any, Base.Threads.Int8), var"#231#task", 0)
        #= threadingconstructs.jl:347 =#
        if $(Expr(:islocal, Symbol("##sync#48")))
            #= threadingconstructs.jl:348 =#
            Base.Threads.put!(var"##sync#48", var"#231#task")
        end
        #= threadingconstructs.jl:350 =#
        Base.Threads.schedule(var"#231#task")
        #= threadingconstructs.jl:351 =#
        var"#231#task"
    end
end

Rather than contemplating the nuances of @async versus Threads.@spawn, I think the main question really is how many Tasks to launch at a time. Above we just launched one for each chunk at the beginning. What if we only run four Tasks at a time while reusing four buffers instead of launching all one hundred tasks a time? As one of the tasks complete, we could launch another. Another approach would be to divide the URLs into four groups and have each task loop over their partition of URLs.

aiohttp is a huge project consisting of 30K lines of code. We just combined an off the shelf asynchronous mechanism with a standard HTTP package, and pulled off nearly the same performance. I think we can afford a little additional complexity to do some active resource management.

5 Likes

aiohttp is a huge project consisting of 30K lines of code. We just combined an off the shelf asynchronous mechanism with a standard HTTP package, and pulled off nearly the same performance.

I think this statement has a lot of caveats and isn’t a fair comparison since the key point is that with pure async on one thread, HTTP.jl + @async (or Threads.@spawn with threads=1) is slower than asyncio+aiohttp by a factor of 3-4. Trying to point out that the performance can be matched by using multiple threads doesn’t really hold a lot of meaning considering I could easily do the same in python with requests like so:

In [1]: import requests

In [2]: from concurrent.futures import ThreadPoolExecutor

In [3]: def get_cdatas_threads(keys, url):
    ...:     session = requests.Session()
    ...:     f = lambda url: session.get(url).content
    ...:     with ThreadPoolExecutor(50) as executor:
    ...:        return list(executor.map(f, [url+'/'+key for key in keys]))
    ...: 

In [4]: url = 'https://mur-sst.s3.us-west-2.amazonaws.com/zarr-v1'

In [5]: keys = [f'analysed_sst/{i}.1.0' for i in range(100)]

In [6]: %time cdatas = get_cdatas_threads(keys, url)
CPU times: user 14.3 s, sys: 3.31 s, total: 17.6 s
Wall time: 6.33 s

requests has been around for a longer time than aiohttp and like HTTP.jl has a much simpler API so if just matching the running time for this particular use-case was all we cared about, why would anyone use asyncio+aiohttp when the performance can be matched when enough threads are used?

Despite this, the enterprise web development world has heavily embraced async instead of pure multi-threading over the past decade for use-cases that require processing many requests at once. That’s because async scales better at larger scales, and there is a vast body of literature available on the internet which has stated this time and time again. Here is one such example:

With threads you are also at the mercy of how many CPU cores you have available. In this example I found that I need to use anywhere from 10-20 threads to consistently match single-threaded async performance. So not only could this make it harder to deploy workflows on systems where billing is charged (because CPU usage is now much higher) but also in the cloud where bandwidth is relatively plentiful while the cost of compute scales with the number of available CPUs. Many big data workflows in Earth Science with zarr are bottlenecked by I/O and networking and cloud instance costs so being able to process a batch of data in the same amount of time with far less CPUs is highly preferable since that means I can do it at several times less the cost. Also consider how well this would scale if I instead needed to request 1000 very small chunks instead of 100 larger ones.

I really apologize for not making this more clear in my post since it made it seem that I was just seeking a way to match performance between each version using any possible means, what I really want to emphasize is that matching single-threaded async performance is what we are really after here. My main purpose of making this thread was to see if people knew of better ways to do this with currently available tools in Julia. If not, what I am really hoping for out of this discussion is to help kickstart some motivation to either refactor HTTP.jl to be better optimized for async, or perhaps motivate the development of a pure async Julia HTTP library that exists alongside HTTP.jl much in the same way aiohttp does with requests.

I think we can afford a little additional complexity to do some active resource management.

Can you explain in what way this additional complexity would be worth it, particularly in the context of Zarr.jl and that we now also know that memory allocation is not the main bottleneck in this example in terms of running time. You not only need an automated method figure the right amount of memory (which will be different depending on the storage type) but the size of the buffers also need to be changed dynamically. We also wouldn’t want memory used for downloading the chunks to stick around since these are only meant to be used temporarily, these are just compressed data that are eventually passed to Blosc to be decompressed.

I would rather not have a separate HTTP.jl for async. If your goal is to change HTTP.jl, then this really should be an issue on that repository. So far we have not involved @quinnj in this discussion, so you might try to reach him here:

I suggesting looking into the aiohttp source for how they abbrogate the overhead of launching a HTTP request. The point about providing an IOBuffer is to minimize what needs to be done to create a HTTP Request and Response. The optimal approach might be reuse.

Most of the literature on async vs multithreading isn’t applicable to Julia (or Go or the most recent version of Java). These languages have a feature called tasks (also known as green-threads in some of the research) which allow the language to use dramatically more threads than you have cores. For example,

function fib(n)
    n<1 && return 1
    t1 = Threads.@spawn fib(n-1)
    t2 = Threads.@spawn fib(n-2)
    return fetch(t1)+fetch(t2)
end
fib(30)

will spawn 2 million tasks and finish in 3 seconds.

2 Likes

Thanks, this is the sort of explanation I was looking for. Can you clarify then that I should expect the scaling to match up to asyncio even if I have access to just a small number of cores? That would be my main concern as mentioned in my previous post since there are use cases where it makes sense to batch jobs to small instance sizes (in terms of CPU core) with relatively high bandwidth.

I can test this later tonight on an AWS EC2 instance with only 2 cores and see what happens.

The easy way to test this is to launch Julia with 2 threads julia --threads=2 which will limit to 2 cores of usage.

In case it was not clear above, Threads.@spawn does not spawn a thread. It spawns a Task that can run on another thread. The ability to create new threads only arrives in Julia 1.9. Thus, you may be able to limit CPU activity by controlling when you use Threads.@spawn.

You may be interested in

Also see information about the relatively new dynamic scheduler here:

https://docs.julialang.org/en/v1/base/multi-threading/#lib-multithreading

Ok, ran the same code again with threads=2.

Now it takes 30 seconds, about as slow as just using @async. So I am still not really sure if I am getting it or if this is really a fair comparison. Would it instead be more apt to run this with a higher thread-count on a machine with much fewer cores?

(To clarify, I ran everything before with threads=auto the entire time on a machine with 96 cores).