I’m quite new to Julia programming, and I’m looking for a better way to parallelize the execution of a for loop. Basically, the logic I have in the following code needs to be executed in parallel.
data = []
for point in points
features_api = get_data_from_api(point)
features_db = get_data_from_db(point)
push!(data, [features_api, features_db])
end
When I looked for options, I saw that there are different ways to achieve this, such as @threads, @spawn, etc. However, it seems like these are not parallel but rather concurrent utilities.
My expectation is to utilize all the cores in the CPU to run each iteration in parallel. Any thoughts are welcome.
data = []
for point in points
do_task_in_parallel()
end
// Wail all tasks to be finished
I don’t understand the distinction you are making?
The julia manual adopts a certain nonstandard terminology, where “concurrent” means many cpu-threads living inside the same julia process / address space, and “parallel” means many cpu-threads that each live in separate processes (optionally: on different nodes of a cluster), and “async” means that somebody else can run while a julia Task waits on IO (“cooperative multi-tasking”).
============
Critical question is how expensive your tasks (get_data_from_api, get_data_from_db) are, how many you have and how uniform their costs are.
If you need to parallelize cheap tasks, then you need to bunch them together into fewer larger ones, because of synchronization overhead. If your tasks are already largish, then you can create concurrent julia Tasks for them immediately.
If you have many many tasks, most of which are super cheap, but most of the time is taken by few expensive ones, and you don’t know which are the bad tasks, then you need a fancy work-stealing scheduler and are in for a world of pain.
If your situation is “fewish slowish tasks”, then you can just do
futures = []
for point in points
features_api = Threads.@spawn get_data_from_api(point)
features_db = Threads.@spawn get_data_from_db(point)
push!(futures, (features_api, features_db))
end
data = [[fetch(api), fetch(db)] for (api, db) in futures ]
You’d probably benefit in trying first to optimize the serial version of your code. We can’t reproduce what you want to do, but there you have already arrays of type Any, which are basic issues for performance.
My tasks are somewhat long-running. Each task would take between 10 and 60 seconds to complete. If that is the case, what are the options I have? My expectation is to reduce the fetch_data function execution time while utilizing multiple cores to run each iteration in a separate core within the loop.
Also, I would like to know what the impact would be if I combine the above two separate API and DB calls into one method and try to parallelize them.
function fetch_data()
futures = []
for point in points
features_api = Threads.@spawn get_data_from_api(point)
features_db = Threads.@spawn get_data_from_db(point)
push!(futures, (features_api, features_db))
end
data = [[fetch(api), fetch(db)] for (api, db) in futures ]
end
function fetch_data(point)
features_api = get_data_from_api(point)
features_db = get_data_from_db(point)
return features_api, features_db
end
Also, I tested the following code, I’ve created 4 threads, and In the system I’ve 8 CPU cores.
function simulate_api_call()
println("API call started with Thread : $(Threads.threadid())")
# Simulate a long-running API call
sleep(30)
response = HTTP.get("https://jsonplaceholder.typicode.com/todos/1")
println("API call completed with Thread : $(Threads.threadid())")
return response
end
function test_spawn()
@sync for i in 1:Threads.nthreads()
println("Iteration $i")
Threads.@spawn begin
simulate_api_call()
end
end
end
function test_threads()
Threads.@threads for i in 1:4
println("Iteration $i")
simulate_api_call()
end
end
Looking at the results, It seems like both approaches take the same amount of time to process each function (i.e., ~30s - equal to sleep time.). Appears to be all functions are processed in parallel. The interesting thing here is that when I use Threads.@spawn, it only uses a few threads out of the 4 threads I have. Also, the results are a bit quicker compared to Threads.@threads, But with Threads.@threads it always uses 4 threads to run the code. I can’t understand why it behaves like that. Also, in my case, what would be the best approach? Ultimately, I need to wait to fetch all the results after the for-loop.
What is the output of Threads.nthreads()? Julia may think you are using hyperthreading and are only using four cores. You can specify julia -t 8 to force Julia to launch 8 threads.
Threads is actually a misnomer because in reality we are launching tasks, virtual green threads, that can migrate between threads. When you sleep, the thread becomes available to execute another Task.
@mkitti , I’m using only 4 threads to test the functionality. Yes I’ve used -t to launch 4 threads in this instance and, hence the output to Threads.nthreads() is 4.
But this time, the performance of Threads.@threads is much better. In my case, do you think I should use Threads.@threads? I’m looking to parallelize a for-loop task, and those tasks are I/O-bound
If you’re IO bound then definitely start a Task separate task for each IO operation that can run at the same time, up to maybe 10k - 10M many tasks at the same time.
Stay the hell away from @threads for – that is intended for the many-cheap-cpu-bound parallelism where synchronization overhead is the issue. It aggregates multiple jobs into the same julia Task / thread, and cannot unaggregate them if the first one blocks.
function simulate_api_call()
println("API call started with Thread : $(Threads.threadid())")
response = HTTP.get("https://randomuser.me/api/?results=50000")
println("API call completed with Thread : $(Threads.threadid())")
return response
end
function test_spawn()
results = Vector{HTTP.Response}(undef, Threads.nthreads())
@sync for i in 1:Threads.nthreads()
Threads.@spawn begin
results[i] = simulate_api_call()
end
end
return results
end
function test_threads()
results = Vector{HTTP.Response}(undef, Threads.nthreads())
@sync Threads.@threads for i in 1:Threads.nthreads()
println("Iteration $i")
results[i] = simulate_api_call()
end
return results
end
function test_asyncmap()
results = asyncmap(_ -> simulate_api_call(), 1:Threads.nthreads())
return results
end
It seems that asyncmap uses a single thread to achieve the results. However, if that’s the case, there won’t be any parallel processing happening. A single HTTP call takes only 30 seconds in the above example. Still, all of the methods take around three times that to process the requests. My use case involves running everything in parallel to reduce the method execution time.
As said before, the task is I/O bound. The CPU uses a single thread, true, but it doesn’t wait for one request to finish and go call another request. That’s what actually speeds up the process.