Threaded double loop

I am trying to write threaded code for the following calculation:

  1. given an opaque object x, and indices i, calculate y_i = g(x, i),

  2. given each y_i and indices j, calculate {z_i}_j = h(x, y_i, j),

  3. reduce all the z's with a bivariate function r, which is commutative and associative.

Here is the single-threaded MWE, I would appreciate some help with making this multi-threaded.

### elements of the the calculation
g(x, i) = (; x, i)              # y_i = g(x, i)
function h(x, g_xi, j)          # z_i_j = h(x, g(x, i), j)
    (; i) = g_xi
    sleep(rand() / 100)
    Dict((x, i, j) => "$(i)_$(j)")
end
r(a, b) = merge(a, b)           # merge z_i_j

# single-threaded implementation
function single_threaded(g, h, r, x, all_i, all_j)
    function inner(i)
        g_xi = g(x, i)
        mapreduce(j -> h(x, g_xi, j), r, all_j)
    end
    mapreduce(inner, r, all_i)
end

# example
ALL_I = 1970:1980
ALL_J = 1:24
single = single_threaded(g, h, r, nothing, ALL_I, ALL_J)
1 Like

The first thing that occured to me is to parallelize inner and the outer loop above with eg OhMyThreads.tmapreduce, but that is slightly suboptimal as the calculation will wait for all j to finish, when it could proceed to the next g(x, i).

I have a very vague idea that I could have threads throwing g(x, i) into a Channel and threads taking values from there and performing the h, but I have not succeeded implementing it (frankly, I am at loss how to start).

Not quite sure I follow here. It seems to me that you could just write

using OhMyThreads
function multi_threaded(g, h, r, x, all_i, all_j)
    tmapreduce(r, all_i) do i
        g_xi = g(x, i)
        tmapreduce(j -> h(x, g_xi, j), r, all_j)
    end
end

i.e. make both tmapreduces parallel, then there’s no waiting for anything right?

It at least seems that everything is out of order:

julia> function h2(x, g_xi, j) 
           (; i) = g_xi
           @info "running" i j
           sleep(rand() / 1)
           Dict((x, i, j) => "$(i)_$(j)")
       end;

julia> multi_threaded(g, h2, r, nothing, ALL_I, ALL_J);
┌ Info: running
│   i = 1970
└   j = 2
┌ Info: running
│   i = 1970
└   j = 1
┌ Info: running
│   i = 1971
└   j = 3
┌ Info: running
│   i = 1971
└   j = 2
┌ Info: running
│   i = 1970
└   j = 3
┌ Info: running
│   i = 1973
└   j = 1
┌ Info: running
│   i = 1971
└   j = 1
┌ Info: running
│   i = 1973
└   j = 3
┌ Info: running
│   i = 1972
└   j = 3
┌ Info: running
│   i = 1973
└   j = 2
┌ Info: running
│   i = 1972
└   j = 2
┌ Info: running
│   i = 1972
└   j = 1
2 Likes

I’ll just warn that afaik sleep does not block the thread, so one can get misleading results when micro-benchmarking or testing with that. Better is to use a function that actually computes something and is designed to take some amount of time.

3 Likes

Since your aggregation function is commutative and associative, I would try to apply it only once (unless your data is large enough that this would be expensive):

function single_threaded2(g, h, r, x, all_i, all_j)
    function inner(i)
        g_xi = g(x, i)
        map(j -> h(x, g_xi, j), all_j)
    end
    all_inners = map(inner, all_i)(i) for i in all_i]
    flat_inners = reduce(vcat, all_inners)
    reduce(r, flat_inners)
end

map can be straightforwardly parallelized with Threads.@spawn, so we can launch a task for each iteration of both the inner and outer loop:

function multi_threaded(g, h, r, x, all_i, all_j)
    function inner(i)
        g_xi = g(x, i)
        fetch.(map(j -> (Threads.@spawn h(x, g_xi, j)), all_j))
    end
    all_inners = fetch.(map(i -> (Threads.@spawn inner(i)), all_i))
    reduced_inners = reduce(vcat, all_inners)
    reduce(r, reduced_inners)
end

multi = multi_threaded(g, h, r, nothing, ALL_I, ALL_J)

Note that this doesn’t parallelize the reduction step, @Mason 's nested tmapreduce is better if you want to do that

1 Like

doing the reduction this way doesn’t incur less applications of the reducing operator r. As far as I’m aware, what you’re doing here is strictly slower.

Yeah, I didn’t mean to imply it’s faster, it’s just easier to parallelize using Julia’s built-in Threads.@spawn, But if you’re using OhMyThreads anyway it’s a moot point.

Ah I see. In that case, if one is trying to cut down on dependencies, they could define their own tmapreduce like so:

using Base.Threads: nthreads, @spawn
function tmapreduce(f, op, itr; tasks_per_thread::Int = 2, kwargs...)
    chunk_size = max(1, length(itr) ÷ (tasks_per_thread * nthreads()))
    tasks = map(Iterators.partition(itr, chunk_size)) do chunk
        @spawn mapreduce(f, op, chunk; kwargs...)
    end
    mapreduce(fetch, op, tasks; kwargs...)
end

This’ll still be less robust and less fast than OhMyThreads.tmapreduce but it’s fine for quick-and-dirty applications.

1 Like

Thanks for all the replies. I should have clarified: I am OK with any reasonable dependencies. I certainly did not want to roll my own solution if that could be avoided.

Yes, this seems to be correct, with the actual code (not the MWE) I am getting high CPU utilization so nothing seems to be blocking anything. I had the wrong intuition about threads, and did not realize that something like tmapreduce would be composable (with the dynamic scheduler).

2 Likes

Yeah, in most languages that would be a good intuition, but in Julia our tasks are composable so you wont get catastrophic performance losses when you nest them (so long as you use the dynamic scheduler).

2 Likes

I just realized that my computation is more complex than described above. Specifically, some parts depend on earlier parts, best described by a DAG.

So I am wondering if I would be losing anything by working with eg

My understanding is that simple calculations like OhMyThreads.tmapreduce are special cases (describable by very regular DAGs) so I am not losing anything. Is this correct?

Or is Dagger.jl overkill for multithreaded calculations happening on a single machine? I anticipate that the bookkeeping overhead is small compared to my actual calculation blocks.