Parallel accumulation over large dictionary

I have a large Dict D and a function f that takes as input a pair of dictionary entries and outputs a float. I would like to compute f for all pairs in D and sum the results. f is also symmetric, f(a,b) = f(b,a), so I only need to iterate over unique unsorted pairs, saving a factor of ~2 in runtime. Here is a basic MWE that works on a single thread:

tot = 0
for (i, a) in enumerate(D)
    for (j, b) in enumerate(D)
        if j < i
        elseif j==i
            tot += f(a,b)
            tot += 2*f(a,b)
return tot

f is fairly quick to calculate, so the main thing making this slow is the large size of D. Since the loops are independent apart from the accumulation in tot, I have therefore been trying to parallelize this function.

The problem is that most parallelization tools I have come across seem to require using something like collect to convert D into an array, so that it can be split into segments and distributed across multiple threads. But unfortunately that is not an option here because D is very large and I do not have the memory to make a copy.

Q: What are my options for parallelizing this accumulating function without making a copy of D?

One way to do this would by accessing the internals of Dict which have a simple array of “slots” under the hood, and an isslotfilled function to check whether a slot actually has an element. So, you could use a standard parallelization method to partition the array dict.slots among your threads, have each thread iterate over its slots and sum the pairs for slots that are filled.

An additional advantage over this approach is that you can skip the j < i check—to iterate over unique pairs for a filled slot i, you can just loop over slots \ge i.

Another option is to switch to another Dict-like collection. For example, the OrderedDict structure from OrderedCollections.jl has a simple array of keys that its iteration loops over, so you can parallelize a loop over dict.keys directly without worrying about “unfilled” slots.

1 Like

Seems like Transducers can even parallelize across iterators. Thus, you could try something like the following:

using Transducers

d = Dict(i => rand() for i in 1:10_000)

f(a, b) = last(a) * last(b)

pipeline = Iterators.product(enumerate(d), enumerate(d)) |>
           Filter(xy -> let ((i, _), (j, _)) = xy; i >= j end) |>
           Map(xy -> let ((i, a), (j, b)) = xy; i == j ? f(a, b) : 2 * f(a, b) end)

# Sequential execution
foldl(+, pipeline)
# Threaded execution
foldxt(+, pipeline
1 Like