Hi all,
I recently started working with Dagger.jl and have a few questions about controlling the scheduler and merging shards. In my workflow I am doing a mapreduce-like operation over hierarchical data. An abstract version of the workflow looks like this, which prints the desired result:
example_data = [
[
[:a=>1, :b=>2, :b=>3],
[:b=>3, :b=>2, :a=>1],
], [
[:a=>1, :c=>10, :d=>-1, :c=>10, :d=>-1],
[:c=>11, :d=>-2, :d=>-2, :c=>11],
], [
[:e=>3, :e=>3],
[:e=>4, :e=>4, :a=>1],
]
]
#Kepp track of sum and count
function accumulate_data(x,agg)
for (name,val) in x
n,s = get!(agg,name,(0,0))
agg[name] = (n+1,s+val)
end
nothing
end
function global_merge_and_flush_outputs(aggregator)
merged_aggregator = reduce(aggregator) do d1, d2
merge(d1,d2) do (n1,s1),(n2,s2)
n1+n2,s1+s2
end
end
for k in keys(merged_aggregator)
n,s = merged_aggregator[k]
@assert n==4
println("$k: $s")
delete!(merged_aggregator,k)
end
end
function local_merge_and_flush_outputs(aggregator)
#code should go here
nothing
end
using Dagger
aggregator = Dagger.@shard per_thread=true Dict{Symbol,Tuple{Int,Int}}()
r = map(example_data) do group
Dagger.spawn(group) do group
r = map(group) do subgroup
Dagger.@spawn accumulate_data(subgroup,aggregator)
end
fetch.(r)
#The following line is the one of question: How can I make sure that exactly the
#Processors that participated in the last computation participate in this reduction and are not
#scheduled to do some other work at the same time
local_merge_and_flush_outputs(aggregator)
end
end
fetch(r)
global_merge_and_flush_outputs(fetch.(map(identity,aggregator)))
Of course, the actual problem is much larger, the accumulation and IO more complicated etc…
-
The first point is this: The data is structured in a way that some keys only occur in a certain subgroup. We also know the number of times each key must occur (in this case 4 for every key). So after processing a group I want to already merge all aggregators that participated in this sub-task, write the output and remove the corresponding keys from all aggregators, but have no idea how to write such a
local_merge_and_flush_outputs
function, so any hints would be appreciated. -
I think in my use case it is probably best to distribute workers in a width-first way, i.e. in my example to first distribute them among groups and let them individually descend into subgroups instead of assigning all workers to the sub-tasks of the first group. Is there a way to nudge the task scheduling into this direction to reduce communication?
-
I realized that currently Dagger spawns on each worker one processor for each thread. Is there a way to let Dagger only spawn one processor per worker? My real-world
accumulate_data
function can work on multiple threads on its own, so that I would prefer a moreDistributed.pmap
-like behavior where there is only one task running per distributed worker and not one per thread.
Thanks a lot in advance for your help