How to execute tasks in parallel in a for loop

I’m quite new to Julia programming, and I’m looking for a better way to parallelize the execution of a for loop. Basically, the logic I have in the following code needs to be executed in parallel.

        data = []
        for point in points
            features_api = get_data_from_api(point)
            features_db = get_data_from_db(point)
            push!(data, [features_api, features_db])
        end

When I looked for options, I saw that there are different ways to achieve this, such as @threads, @spawn, etc. However, it seems like these are not parallel but rather concurrent utilities.

My expectation is to utilize all the cores in the CPU to run each iteration in parallel. Any thoughts are welcome.

        data = []
        for point in points
           do_task_in_parallel()
        end

       // Wail all tasks to be finished

1 Like

Preallocate the array then use loopvectorization.

Also, use tuple or struct to store each element of result instead of an array. An array will allocate new memory and add indirection.

I don’t understand the distinction you are making?

The julia manual adopts a certain nonstandard terminology, where “concurrent” means many cpu-threads living inside the same julia process / address space, and “parallel” means many cpu-threads that each live in separate processes (optionally: on different nodes of a cluster), and “async” means that somebody else can run while a julia Task waits on IO (“cooperative multi-tasking”).

============

Critical question is how expensive your tasks (get_data_from_api, get_data_from_db) are, how many you have and how uniform their costs are.

If you need to parallelize cheap tasks, then you need to bunch them together into fewer larger ones, because of synchronization overhead. If your tasks are already largish, then you can create concurrent julia Tasks for them immediately.

If you have many many tasks, most of which are super cheap, but most of the time is taken by few expensive ones, and you don’t know which are the bad tasks, then you need a fancy work-stealing scheduler and are in for a world of pain.

If your situation is “fewish slowish tasks”, then you can just do

        futures = []
        for point in points
            features_api = Threads.@spawn get_data_from_api(point)
            features_db = Threads.@spawn get_data_from_db(point)
            push!(futures, (features_api, features_db))
        end
       data = [[fetch(api), fetch(db)] for (api, db) in futures ]
3 Likes

You’d probably benefit in trying first to optimize the serial version of your code. We can’t reproduce what you want to do, but there you have already arrays of type Any, which are basic issues for performance.

5 Likes

Make sure to pass the -t auto option to Julia to launch your theead pool.

2 Likes

Thanks for the suggestions.

My tasks are somewhat long-running. Each task would take between 10 and 60 seconds to complete. If that is the case, what are the options I have? My expectation is to reduce the fetch_data function execution time while utilizing multiple cores to run each iteration in a separate core within the loop.

Also, I would like to know what the impact would be if I combine the above two separate API and DB calls into one method and try to parallelize them.

function fetch_data()
  futures = []
  for point in points
      features_api = Threads.@spawn get_data_from_api(point)
      features_db = Threads.@spawn get_data_from_db(point)
      push!(futures, (features_api, features_db))
  end
  data = [[fetch(api), fetch(db)] for (api, db) in futures ]
end

function fetch_data(point)
   features_api = get_data_from_api(point)
   features_db = get_data_from_db(point)
 return features_api, features_db
end

Thanks for highlighting that @imiq, I do have type annotations, I just removed them for clarity.

I believe this may not be suitable for I/O-bound tasks.?

Also, I tested the following code, I’ve created 4 threads, and In the system I’ve 8 CPU cores.

function simulate_api_call()
    println("API call started with Thread : $(Threads.threadid())")
    
    # Simulate a long-running API call
    sleep(30)
    
    response = HTTP.get("https://jsonplaceholder.typicode.com/todos/1")
    
    println("API call completed with Thread : $(Threads.threadid())")
    
    return response
end


function test_spawn()
    @sync for i in 1:Threads.nthreads()
        println("Iteration $i")
        Threads.@spawn begin
            simulate_api_call()
        end
    end
end

function test_threads()
    Threads.@threads for i in 1:4
        println("Iteration $i")
        simulate_api_call()
    end
end

And here are the results I got.

Looking at the results, It seems like both approaches take the same amount of time to process each function (i.e., ~30s - equal to sleep time.). Appears to be all functions are processed in parallel. The interesting thing here is that when I use Threads.@spawn, it only uses a few threads out of the 4 threads I have. Also, the results are a bit quicker compared to Threads.@threads, But with Threads.@threads it always uses 4 threads to run the code. I can’t understand why it behaves like that. Also, in my case, what would be the best approach? Ultimately, I need to wait to fetch all the results after the for-loop.

When you sleep, that thread becomes available to do other work. If your threads were actually busy, @spawn would use them all, too.

1 Like

What is the output of Threads.nthreads()? Julia may think you are using hyperthreading and are only using four cores. You can specify julia -t 8 to force Julia to launch 8 threads.

Threads is actually a misnomer because in reality we are launching tasks, virtual green threads, that can migrate between threads. When you sleep, the thread becomes available to execute another Task.

1 Like

@mkitti , I’m using only 4 threads to test the functionality. Yes I’ve used -t to launch 4 threads in this instance and, hence the output to Threads.nthreads() is 4.

I thought the same, even if I remove sleep, behaviour is still the same.

But this time, the performance of Threads.@threads is much better. In my case, do you think I should use Threads.@threads? I’m looking to parallelize a for-loop task, and those tasks are I/O-bound

If you’re IO bound then definitely start a Task separate task for each IO operation that can run at the same time, up to maybe 10k - 10M many tasks at the same time.

Stay the hell away from @threads for – that is intended for the many-cheap-cpu-bound parallelism where synchronization overhead is the issue. It aggregates multiple jobs into the same julia Task / thread, and cannot unaggregate them if the first one blocks.

5 Likes

Try

data = asyncmap(x->(get_data_from_api(x),get_data_from_db(x)),points)

@foobar_lv2

How do I do that, Would you be able to provide some sample code for that? Sorry about that, I’m pretty new to Julia.

I already did. 2 posts above. Did it work?

@Tarny_GG_Channie .

Yes I tried, look at the following code:

function simulate_api_call()
    println("API call started with Thread : $(Threads.threadid())")
   
    response = HTTP.get("https://randomuser.me/api/?results=50000")
    
    println("API call completed with Thread : $(Threads.threadid())")
    
    return response
end

function test_spawn()
   results = Vector{HTTP.Response}(undef, Threads.nthreads())

   @sync for i in 1:Threads.nthreads()
       Threads.@spawn begin
           results[i] = simulate_api_call()
       end
   end

   return results
end

function test_threads()
    results = Vector{HTTP.Response}(undef, Threads.nthreads())

   @sync Threads.@threads for i in 1:Threads.nthreads()
       println("Iteration $i")
       results[i] = simulate_api_call()
   end

   return results
end

function test_asyncmap()
   results = asyncmap(_ -> simulate_api_call(), 1:Threads.nthreads())
   return results
end

And following are the results:

Using asyncmap:
Screenshot 2023-11-27 at 5.40.07 pm

Using Threads.@threads

Using Threads.@spawn:
Screenshot 2023-11-27 at 5.41.09 pm

It seems that asyncmap uses a single thread to achieve the results. However, if that’s the case, there won’t be any parallel processing happening. A single HTTP call takes only 30 seconds in the above example. Still, all of the methods take around three times that to process the requests. My use case involves running everything in parallel to reduce the method execution time.

Single HTTP Call:
Screenshot 2023-11-27 at 5.48.02 pm

1 Like

As said before, the task is I/O bound. The CPU uses a single thread, true, but it doesn’t wait for one request to finish and go call another request. That’s what actually speeds up the process.

Maybe the discussion here gives you some insights: Home · ChunkSplitters.jl