What is the recommended way to run N workers asynchronously?

Given this code for a worker

using HTTP, JSON3, BenchmarkTools

function worker(name, n)
    println("worker-$name")
    response = HTTP.request("GET", "https://qrng.anu.edu.au/API/jsonI.php?length=$(n)&type=uint16")
    json = JSON3.read(String(response.body))
    sum(json["data"])
end

How should I run 10 of these and combine the result to an array?
Can I do it with Channels, 10 workers writing to a channel, and one reader reading from it?

1 Like

You can use “Task farm” pattern from this tutorial.

using HTTP, JSON3
using Base.Threads: @spawn

umap(f, xs; kwargs...) = umap(f, Any, xs; kwargs...)
function umap(f, TY::Type, xs::Channel; ntasks = Threads.nthreads(), buffersize = ntasks)
    return Channel{TY}(buffersize) do ys
        @sync for i in 1:ntasks
            @spawn for x in xs
                put!(ys, f(i, x))
            end
        end
    end
end

function calc(nworkers = 10, nreqs = 100)
    xs = Channel{Int}(Inf)
    for _ in 1:nreqs
        put!(xs, 1)
    end
    close(xs)

    res = umap(Int, xs; ntasks = nworkers, buffersize = nreqs) do name, n
        println("worker-$name")
        response = HTTP.request("GET", "https://qrng.anu.edu.au/API/jsonI.php?length=$(n)&type=uint16")
        json = JSON3.read(String(response.body))
        sum(json["data"])
    end

    collect(res)
end

calc() # long list of worker name and vector of numbers in the end.

Here @spawn is used, which can be an overkill, you can change it to @async, but it really doesn’t matter in this case.

2 Likes

Thanks! It took some time to read it properly. There are a lot of details, but for me, the novelties were that you can fill a Channel, and then close it before reading from it. And the Channel takes the first argument a function that it executes, which in this case @spawns tasks.

1 Like

I was trying to solve this yesterday, but I couldn’t make the code work. I was now able to fix that code, which is a lot simpler, although not as powerful.

using HTTP, JSON

function worker(sums, name, n)
    println("worker-$name")
    r = HTTP.request("GET", "https://qrng.anu.edu.au/API/jsonI.php?length=$(n)&type=uint16")
    r = JSON.parse(String(r.body))
    put!(sums, sum(r["data"]))
end

function runner()
    sums = Channel{Integer}(Inf)
    @sync for i in 1:10
        @async worker(sums, i, 2)
    end
    close(sums)
    collect(sums)
end

If you are using @async only, there is no need for Channel as an output. Since everything is executed on the same thread, there are no race conditions and simple Vector is more than enough.

With that said, Channel is extremely useful and is rather easy to use. So in the long run, it’s better to use it, since you can switch from async to multithreading almost without changing your code.

With regard to your first comment, the most surprising thing for me was that for loop for Channel is asynchronous and multithreaded by default. One can understand how it works, but really, this making writing multithreading code as simple as writing usual synchronous.

The difference between asynchronous and threaded is a bit hazy for me in Julia. I know what it means generally.
If I get it, @spawn starts Tasks that are an asynchronous concept. If Julia has one thread, it’s asynchronous. If Julia has more threads, they may run in multiple threads. Correct?

Maybe there is more depth to it, but as far as I know, you are correct. So, I guess if you have single thread, then from the user point of view it is unimportant what you are using, @spawn or @async. But if you have more than one thread, then you’ll see the difference.

You can compare them with the help of @macroexpand command.

@macroexpand @async begin end
@macroexpand @Threads.spawn begin end

They have the same structure, but second uses Threads tasks and Threads scheduler.

1 Like

The performance of this code (the task farm version) is somewhat baffling. I started a Julia REPL on the command line, and pasted that code there. It has just one thread available to it, so it’s asynchronous.

I ran @btime calc(10, 10) and got 610.483 ms (2990 allocations: 180.42 KiB).

If I then run @time calc(10, 10), the performance is either about 13.6 seconds or 0.3 seconds.

An example log, a bit long, sorry.

julia> @time calc(10, 10)
worker-1
worker-2
worker-3
worker-4
worker-5
worker-6
worker-7
worker-8
worker-9
worker-10
 13.640024 seconds (4.46 k allocations: 1.604 MiB)
10-element Vector{Int64}:
 26171
 18846
  7835
  2331
 16983
  8173
 12383
 48503
 53424
 51733

julia> @time calc(10, 10)
worker-1
worker-2
worker-3
worker-4
worker-5
worker-6
worker-7
worker-8
worker-9
worker-10
  0.316631 seconds (2.97 k allocations: 307.250 KiB)
10-element Vector{Int64}:
 15911
 17659
 43273
  5561
 48779
  7519
 16323
 36958
  6299
 38578

julia> @time calc(10, 10)
worker-1
worker-2
worker-3
worker-4
worker-5
worker-6
worker-7
worker-8
worker-9
worker-10
 13.688530 seconds (4.45 k allocations: 1.604 MiB)
10-element Vector{Int64}:
 40780
 38798
 27300
 24115
 20972
 39100
 35428
 10316
  1090
  9087

julia> 

It’s because of the allocations, right? They vary a lot for some reason.

I do not think that timing is related in any way to the approach itself. It seems that it is related to the uri that you are using, since it’s response time differ wildly. Just as a sanity check, you can change calc function to this

function calc(nworkers = 10, nreqs = 100)
    xs = Channel{Int}(Inf)
    for _ in 1:nreqs
        put!(xs, 1)
    end
    close(xs)

    res = umap(Int, xs; ntasks = nworkers, buffersize = nreqs) do name, n
        println("worker-$name")
        sleep(0.1)
        1
    end

    collect(res)
end

then you can see that @time calc() is always approximately one second (+ some overhead on scheduling operations). You can try to run @time HTTP.request("GET", "https://qrng.anu.edu.au/API/jsonI.php?length=1&type=uint16") manually and measure response time. It looks like the server caches responses, so it can take less than a second and a few seconds, depending on your luck.

Allocations that you see related to internal mechanisms of multithreading, so it is reasonable that if you wait longer, you see more allocations. But this is not affecting performance of the code itself.

That sounds like a reasonable explanation. I made some assumptions of the server, which apparently don’t hold. I did find some code in HTTP.jl that seemed to allocate too much. I discussed that in Slack, and then opened an issue in HTTP.jl.

The reason it’s using that URL is that it’s based on this Python tutorial. The asyncio code isn’t available unless you subscribe. Hands-On Python 3 Concurrency With the asyncio Module – Real Python