What is the best practice for parallel dictionary comprehensions?

I’m trying to work out a good template for the parallel part of my experiments. I’m looking for rock-solid example code to copy and and extend based on the current ecosystem.

Usually, I’m in the embarrassingly || space, which makes it all the more embarrassing that Im not sure about the best practice. I’m faced with a wide array of options. Threads, Distributed, ThreadX, Dagger, FLoops, Transducers etc.

On a single machine (but might go on a cluster later) I want to save things in a dictionary.

Dict(n => expensive_calculation(n) for n in nlist)

Right now I’ve hit on the following pattern which I show for constructive criticism.

using Distributed
using BangBang
using MicroCollections
addprocs(4) 

@everywhere function expensive_calculation(n)
                    sleep(3)
                    return n => n^2
end

function parallel_dict_computation(n)
    result_dict = EmptyDict() # from MicroCollections
    tasks = @sync [ @spawn expensive_calculation(n) for n in 1:n]
    for task in tasks
        result_dict = push!!(result_dict, fetch(task))   # BangBang's push!! promotes to the concrete type.
    end
    return result_dict
end

result_dictionary = parallel_dict_computation(10)
println(result_dictionary)

Here are my questions
1.) How would YOU write this code? Is there any benefit to adding in Transducers or something like that?

2.) Is it elegant to put all the tasks in an array and fetch them later? Is it better to build the Dict directly inside @sync with a lock somehow?

3.) is the @sync neccesary? doesn’t seem to do anything bad to omit it here. How do I know if a spawn is in a sync’s scope

4.) I keep accidentally rerunning addprocs. Is there a way to manage the number of proc’s automatically? Is there a risk of having too many proc’s?

5.) How do threads and procs interact? Does it matter how many threads I start julia with?

6.) And while I’m here: more sophisticated reductions: if I have an associative operator, expensive_op(x,y) what is the best way to parallelize the reduction piece as well? Like a parallel mapreduce(expensive_fun, expensive_op, list).

In the typical case, I can easily assume the scheduler overhead is minimal compared to all expensive calls, but the distribution in the execution time of expensive calls is power law.

Thanks very very much! (Also is anything happening with JuliaFolds2?)

Just some quick comments/questions regarding 1.) and 4.) from somebody with probably less experience.

  • ad 4.) You could use Julia’s CLI to start directly with the right number of workers: Command-line Interface · The Julia Language

  • ad 1.) Regarding the strategy: It seems much easier to construct an array first and then create the dictionary in a reduce step, for example like:

data = pmap(expensive_calculation, 1:n)
result_dict = Dict(data)

(EDIT: I just noticed that you constructed expensive_calculation already for constructing a Dict. So, I changed the example.)

That way you can use all the pre-existing functions (like pmap or more specialised variants). And you could even use a suitable array type to pre-allocate the memory, e.g.

# with Threads
data = Array{Pair{Int,Int}}(undef, n)
Threads.@threads for i in eachindex(data)
    data[i] = expensive_calucation(i)
end

Is there a particular reason in your application why you need dictionaries already during the parallel stage?

1 Like

Is there a particular reason in your application why you need dictionaries already during the parallel stage?

Not here :sweat_smile:. Indeed for this project I’m using pmap and rolling everything up at the end.

But I wanted to understand spawn and sync better so I can balance the load better in deeper loops, and work out how to write this reduction “properly”. All I can rely on right now is pmap and I’m looking to up my || game, since it’s leaving a lot of CPU on the table because of imbalances.

Update: re 4.) I’m using vs code parameter to set-threads for running a script block by block. Im trying to check if concurrency of pmap is limited by nthreads or not.
I found the pattern from this post helpful

np = addprocs(4)
…
rmprocs(np)

for controlling and freeing ram at the end of a distributed thing.

1 Like

Depends a bit on the pattern of parallelisms you need/want to support, but for map-reduce type computation, Transducers is a nice fit. In particular, you can use (almost) the same code for sequential, threaded or distributed execution:

# Sequential
foldl(merge!!, Map(Dict ∘ expensive_calculation), 1:10; init = EmptyDict())
# Threaded
foldxt(merge!!, Map(Dict ∘ expensive_calculation), 1:10; init = EmptyDict())
# Distributed
foldxd(merge!!, Map(Dict ∘ expensive_calculation), 1:10; init = EmptyDict())

It seems that the threaded/distributed versions currently require a monoid for combining, i.e., two dictionaries with merge!! instead of Dict and element with push!!. (The latter works sequentially via `foldl(push!!, Map(expensive_calculation), 1:10; init = EmptyDict()), but not parallel … could be worth an issue in Transducers).

2 Likes