Multi-threading for nested loops with inner loop collection depending on outer loop

What is the most efficient way to parallelize an embarrassingly parallel problem with nested loops, while the inner loop’s collection depending on outer loop? MWE is

using Base.Threads
using ThreadsX
using FLoops

xs = 4:8
ys = Dict(xs .=> [1:rand(1:3) for _ in xs])

for x in xs
    for y in ys[x]
        # f(x, y)   # an expensive function most of its time for file IO and database IO jobs.   
        @show threadid(), x, y
    end
end

The line f(x, y) is an expensive function. The workload of f is highly unbalanced, which depends on specific (x, y). What f does is downloading data form ClickHouse, processing them, and then save to local CSV files.

It seems I cannot use ThreadsX.foreach or ThreadsX.map directly.

Also there are many ways to put @threads or @spawn or @floop before two fors. Which one is best?

I’ve been playing with

using Base.Threads

xs = 4:8
ys = Dict(xs .=> [1:rand(1:3) for _ in xs])

@threads for (x, y) in [(x, y) for x in xs for y in ys[x]]
    # f(x, y)   # an expensive function most of its time for file IO and database IO jobs.   
    @show threadid(), x, y
end

Not sure if this helps with your real task though.

1 Like

I don’t know if this works for your problem, but one lazy option to consider is just pre-computing (x,y) and then doing a simple loop:

xypairs = reduce(vcat, map(x->[(x, yj) for yj in ys[x]], xs)) # or whatever
@threads for (x,y) in xypairs
  #[...]
end

maybe that isn’t possible, but whatever overhead that incurs might be more than made up with better parallel execution in the loop.

1 Like

Is this range realistic (only 4 steps)? If so, merging the loops is probably the way to go, one way or the other. If xs length is large then the story might be different.

This is a MWE. The actual length of xs should be around 200-1000, and for each ys its length is about 2000-5000. f(x,y) roughly takes 500-1000 ms, when x, y points missing data, it immediately returns (about 1 ms).

If there is no concurrency, I think the most effective way is to use @spawn, like this:

Threads.@sync for x in xs
    for y in ys[x]
        Threads.@spawn f(x, y)   # an expensive function most of its time for file IO and database IO jobs.   
    end
end

which will run f(x,y) on any free thread available.

Yes, my experiment confirms it. However, the CPU loading is oscillating between 10% to 80% every 1-10 seconds. I have already set number of threads to 32 which is 2x number of cores of my machine. Maybe it is caused by crazy number of allocations? I notice the GC time occupies upto 50%. And the size of memory allocated is upto 1T.

It may be, but also if what the tasks do is to write stuff to the disc, you may have a bottleneck in the concurrency for the disk resources. (but in general, avoiding allocations also allows threading code to run faster, yes)