How to execute tasks in parallel in a for loop

All the discussion of nthreads is a red herring.

You don’t care about multi-threading or parallel on your CPU.

You would do exactly the same thing I posted if you had only a single CPU core.

This is because your bottleneck is not on your CPU. It is in IO, which can mean your SSD controller, or can mean the network (a long wire with many routers and buffered queues on the way, that ends at some load-balancer in front of some server with many many CPUs).

Hence you just want to fire off your requests as fast as possible, and then wait until they are completed and responses are available. You should not handle any responses until all requests have left your hands. (OK, up to some sane limit of probably 10k - 10M outstanding requests)

In julia, the standard way for this is future = Threads.@spawn for sending something off, and res = fetch(future) for waiting until the result is ready.

3 Likes

Thanks for the detailed answer @foobar_lv2 ; I’ve one thing to get clarified.

What’s the difference between @async and Threads.@spawn ? Yesterday, I conducted a few tests using both. Despite whether @async or Threads.@spawn is used in the above code you pasted, it gave me almost identical execution times. I just wanted to understand better which one is more suitable in which case.

Following are the some tests I ran.

function simulate_api_call()
     println("API call started with Thread : $(Threads.threadid())")

     # Simulate a long-running API call
     # response_task = @async HTTP.get("https://randomuser.me/api/?results=50000")

     response_task = Threads.@spawn HTTP.get("https://randomuser.me/api/?results=50000")

     println("API call completed with Thread : $(Threads.threadid())")

     return response_task
 end

function test_asyncmap()
   tasks = [simulate_api_call() for _ in 1:Threads.nthreads()]

   # Extract the results
   results = fetch.(tasks)

   return results
end


function test_spawn()
    results = Vector{Task}(undef, Threads.nthreads())

    @sync for i in 1:Threads.nthreads()
        Threads.@spawn begin
            results[i] = simulate_api_call()
        end
    end

    responses = fetch.(results)

    return responses
end

function test_threads()
    results = Vector{Task}(undef, Threads.nthreads())

    @sync Threads.@threads for i in 1:Threads.nthreads()
        println("Iteration $i")
        results[i] = simulate_api_call()
    end

    responses = fetch.(results)

    return responses
end

The boring answer is that @async marks the Task as sticky which prevents it from getting run on a different OS-thread. I think the real answer is that @async predates @spawn, and generally predates multi-threading becoming stable and reliable in julia. So @async is just old and deprecated and should not be used. The manual says

  │ Warning
  │
  │  It is strongly encouraged to favor Threads.@spawn over @async always even when no parallelism is required especially in publicly distributed libraries. This is because a use of @async disables the migration
  │  of the parent task across worker threads in the current implementation of Julia. Thus, seemingly innocent use of @async in a library function can have a large impact on the performance of very different
  │  parts of user applications.

No, no, no. This is bad code. It should be

`   tasks = [simulate_api_call() for _ in 1:100]`

or something. You don’t care about nthreads().

Your parallelism is not between different threads, it is between different kilometers of wire / distance between you and the remote host. (400 mbit means about 100-200 bytes/kilometer at speed of light).

3 Likes

Amazing! Thanks for all the detailed answers. I really appreciate the effort and time you’ve put into explaining some of these things.

@foobar_lv2 , One last question, Sorry about asking questions.

I came across this library: https://github.com/tro3/ThreadPools.jl/tree/master

Just wonder whether you’ve come across this library, it has some methods like below:

function test_spawn()
    results = Vector{Task}(undef, 4)

    ThreadPools.qbforeach(1:4) do i
        results[i] = simulate_api_call()
    end

    responses = fetch.(results)

    return responses
end

Do you think it will do the same? I just wanted to check in case you’ve come across this. I tried to understand the source code, but it’s a little too hard as I’m quite new to Julia, Sorry about this question. Also, not that I’m trying to use this with a Julia Basde API, I’m trying to keep thread 1 (main Thread) always free to accept new requests, that’s why I looked into this library, otherwise Thread.@span will use all the threads, including the Thread 1 (main Thread).

I believe you’re overthinking it, and should just use Threads.@spawn.

You’re right that there used to be (rare) issues, where something interactive is pinned to the same OS thread as some compute-heavy long-running Task.

As far as I know this was partially or fully addressed with the whole Task Migration stuff.

But I’m not up-to-date on that part of julia internals, sorry.

Someone else needs to take over here, I cannot definitely say whether there could be problems that the threadpool package solves and that Base doesn’t solve (in julia 1.9+).

1 Like

Thanks @foobar_lv2

Yes, I’m keen to use Threads.@spawn as it aligns with what I want to achieve. However, when benchmarking, I noticed that the API gets stuck serving other requests if I use the main thread for long-running tasks. That’s what I’m trying to avoid. Another reason is that our APIs are running on Julia version 1.6, which may not have some of the core-level optimizations.

To overcome that I tried to build a macro that tries to avoid using the main thread (Thread 1), but it was a failure, I missing something here, Appreciate if someone can help:

The macro:

macro run_in_thread(expr)
    quote
        result = Threads.@spawn begin
            current_thread = Threads.threadid()
            if current_thread != 1
                return $(expr)
            else
                return run_in_thread(expr)
            end
        end
        result
    end
end

Usage:

function test_spawn()
    results = Vector{Task}(undef, 4)

    for i in 1:4
        results[i] = @run_in_thread simulate_api_call()
    end

    responses = fetch.(results)

    return responses
end

You may be interested in ThreadPinning.jl: