What is the most efficient way to parallelize an embarrassingly parallel problem with nested loops, while the inner loop’s collection depending on outer loop? MWE is
using Base.Threads
using ThreadsX
using FLoops
xs = 4:8
ys = Dict(xs .=> [1:rand(1:3) for _ in xs])
for x in xs
for y in ys[x]
# f(x, y) # an expensive function most of its time for file IO and database IO jobs.
@show threadid(), x, y
end
end
The line f(x, y) is an expensive function. The workload of f is highly unbalanced, which depends on specific (x, y). What f does is downloading data form ClickHouse, processing them, and then save to local CSV files.
It seems I cannot use ThreadsX.foreach or ThreadsX.map directly.
Also there are many ways to put @threads or @spawn or @floop before two fors. Which one is best?
using Base.Threads
xs = 4:8
ys = Dict(xs .=> [1:rand(1:3) for _ in xs])
@threads for (x, y) in [(x, y) for x in xs for y in ys[x]]
# f(x, y) # an expensive function most of its time for file IO and database IO jobs.
@show threadid(), x, y
end
Not sure if this helps with your real task though.
Is this range realistic (only 4 steps)? If so, merging the loops is probably the way to go, one way or the other. If xs length is large then the story might be different.
This is a MWE. The actual length of xs should be around 200-1000, and for each ys its length is about 2000-5000. f(x,y) roughly takes 500-1000 ms, when x, y points missing data, it immediately returns (about 1 ms).
Yes, my experiment confirms it. However, the CPU loading is oscillating between 10% to 80% every 1-10 seconds. I have already set number of threads to 32 which is 2x number of cores of my machine. Maybe it is caused by crazy number of allocations? I notice the GC time occupies upto 50%. And the size of memory allocated is upto 1T.
It may be, but also if what the tasks do is to write stuff to the disc, you may have a bottleneck in the concurrency for the disk resources. (but in general, avoiding allocations also allows threading code to run faster, yes)