Some Dagger.jl beginner questions

Hi all,

I recently started working with Dagger.jl and have a few questions about controlling the scheduler and merging shards. In my workflow I am doing a mapreduce-like operation over hierarchical data. An abstract version of the workflow looks like this, which prints the desired result:

example_data = [
  [
    [:a=>1, :b=>2, :b=>3],
    [:b=>3, :b=>2, :a=>1],
  ], [
    [:a=>1, :c=>10, :d=>-1, :c=>10, :d=>-1],
    [:c=>11, :d=>-2, :d=>-2, :c=>11],
  ], [
    [:e=>3, :e=>3],
    [:e=>4, :e=>4, :a=>1],
  ]
]

#Kepp track of sum and count
function accumulate_data(x,agg) 
  for (name,val) in x
    n,s = get!(agg,name,(0,0))
    agg[name] = (n+1,s+val)
  end
  nothing
end

function global_merge_and_flush_outputs(aggregator)
  merged_aggregator = reduce(aggregator) do d1, d2
    merge(d1,d2) do (n1,s1),(n2,s2)
      n1+n2,s1+s2
    end
  end
  for k in keys(merged_aggregator)
    n,s = merged_aggregator[k]
    @assert n==4
    println("$k: $s")
    delete!(merged_aggregator,k)
  end
end

function local_merge_and_flush_outputs(aggregator)
  #code should go here
  nothing
end



using Dagger
aggregator = Dagger.@shard per_thread=true Dict{Symbol,Tuple{Int,Int}}()
r = map(example_data) do group
  Dagger.spawn(group) do group
    r = map(group) do subgroup
      Dagger.@spawn accumulate_data(subgroup,aggregator)
    end
    fetch.(r)
    #The following line is the one of question: How can I make sure that exactly the 
    #Processors that participated in the last computation participate in this reduction and are not 
    #scheduled to do some other work at the same time 
    local_merge_and_flush_outputs(aggregator)
  end
end
fetch(r)
global_merge_and_flush_outputs(fetch.(map(identity,aggregator)))

Of course, the actual problem is much larger, the accumulation and IO more complicated etc…

  1. The first point is this: The data is structured in a way that some keys only occur in a certain subgroup. We also know the number of times each key must occur (in this case 4 for every key). So after processing a group I want to already merge all aggregators that participated in this sub-task, write the output and remove the corresponding keys from all aggregators, but have no idea how to write such a local_merge_and_flush_outputs function, so any hints would be appreciated.

  2. I think in my use case it is probably best to distribute workers in a width-first way, i.e. in my example to first distribute them among groups and let them individually descend into subgroups instead of assigning all workers to the sub-tasks of the first group. Is there a way to nudge the task scheduling into this direction to reduce communication?

  3. I realized that currently Dagger spawns on each worker one processor for each thread. Is there a way to let Dagger only spawn one processor per worker? My real-world accumulate_data function can work on multiple threads on its own, so that I would prefer a more Distributed.pmap-like behavior where there is only one task running per distributed worker and not one per thread.

Thanks a lot in advance for your help

1 Like

Hey there! Thanks for the great MWE.

It sounds like you want to enforce sequential execution between groups? If so, this is actually quite easy to enforce with Dagger.spawn_sequential, together with using Dagger.spawn_bulk as an “escape hatch” to get parallelism within each sequential step.

I would think you can do something like this:

local_merge_and_flush_outputs(aggs...) = # TODO: Do what you need to with each aggregator

# Execute the next region sequentially
Dagger.spawn_sequential() do

r = map(example_data) do group
  Dagger.spawn(group) do group
    # The outer sequential region doesn't pass through Dagger tasks, so this loop will still be parallelized:
    r = map(group) do subgroup
      Dagger.@spawn accumulate_data(subgroup,aggregator)
    end
    fetch.(r)
    return nothing # We don't care to return anything
  end
  # Copy aggregators in parallel, which will happen *after* the previous `spawn` call finishes
  # Note that the copy is required as each sub-aggregator may only execute on its owning thread (so we can't just pass them into `local_merge_and_flush_outputs` directly)
  aggregator_copies = Dagger.spawn_bulk() do
    map(copy, aggregator)
  end
  # Merge and flush all aggregator copies
  Dagger.@spawn local_merge_and_flush_outputs(aggregator_copies)
  # Force-empty all aggregators in parallel *after* the previous merge-and-flush
  Dagger.spawn_bulk() do
    foreach(empty!, aggregator)
  end
end

# end spawn_sequential

I would generally avoid trying to force the scheduler to schedule tasks in some manual configuration - it will do its best to accurately model task execution and data movement costs and pick a nearly-optimal schedule, and when it doesn’t, that is a bug that should be reported. If you don’t get the parallelism or performance you expect, please reach out!

This is not yet supported, unfortunately, as it’s hard to make this behavior compose with automatic multithreading of serial tasks (since the scheduler would need to “force-stop” all other work to allow a single full-worker task to execute, which introduces all sorts of performance headaches).

It’s definitely on my TODO list to figure out, but in the meantime, if you must have this behavior, you’ll have to manually force Dagger to schedule one piece of work to each worker with Dagger.scope (see the docs for details: Scopes · Dagger.jl).

Thanks a lot for your reply and corrections to the code. Thanks for pointing to the spawn_sequential and spaw_bulk combination, this might become very useful. I am currently still experimenting with your suggestions and trying to understand things better and might come up with follow-up questions…

1 Like

I just realized that Dagger does in fact have a way to support inherently multithreaded functions, you can just spawn your tasks on only a single thread per worker (where they can then use all the threads they want):

Dagger.with_options(;scope=Dagger.scope(thread=1)) do
     @sync for i in 1:10
         Dagger.spawn() do
             println("Hello from worker ", myid(), ", thread ", Threads.threadid())
         end
     end
end

It won’t stop Dagger from running tasks on other threads at the same time, but you could use spawn_sequential to prevent that.

1 Like

Thanks a lot for the hint, this is great for using multiple threads. Regarding your other comments:

It sounds like you want to enforce sequential execution between groups?

Actually not. The goal is still to allow parallelization across groups as well as subgroups but to have an early reduction (and output) step for each subgroup. My current solution looks like this: I create an extra shard for every subgroup, spawn the Dagger processes and collect the Processors these were run on. Then I only reduce over the processors that have been active in processing this subgroup. The extended MWE, which seems to work as expected is here:

using Dagger

example_data = [
  [
    [:a=>1, :b=>2, :b=>3],
    [:b=>3, :b=>2, :a=>1],
  ], [
    [:a=>1, :c=>10, :d=>-1, :c=>10, :d=>-1],
    [:c=>11, :d=>-2, :d=>-2, :c=>11],
  ], [
    [:e=>3, :e=>3],
    [:e=>4, :e=>4, :a=>1],
  ]
]

#Kepp track of sum and count
function accumulate_data(x,agg) 
  for (name,val) in x
    n,s = get!(agg,name,(0,0))
    agg[name] = (n+1,s+val)
  end
  nothing
end


function merge_and_flush_outputs(aggregator)
  if isempty(aggregator)
    return nothing
  else
  merged_aggregator = fetch(reduce(aggregator) do d1, d2
    merge(fetch(d1),fetch(d2)) do (n1,s1),(n2,s2)
      n1+n2,s1+s2
    end
  end)
  for k in keys(merged_aggregator)
    n,s = merged_aggregator[k]
    if n==4
      @info "$k: $s"
      delete!(merged_aggregator,k)
    end
  end
  merged_aggregator
  end
end

function filter_aggregator(aggregator,used_procs)
  return (v for (k,v) in aggregator.chunks if k in used_procs)
end
function filter_aggregator(aggregator,::Nothing)
  values(aggregator.chunks)
end
function merge_local_outputs(aggregator,procs)
  aggregator_copies = Dagger.spawn_bulk() do
    map(filter_aggregator(aggregator,procs)) do agg
      Dagger.spawn(copy,agg)
    end
  end
  # Merge and flush all aggregator copies
  Dagger.@spawn merge_and_flush_outputs(aggregator_copies)
end


aggregator = Dagger.shard(;per_thread=true) do 
  Dict{Symbol,Tuple{Int,Int}}()
end;
r = map(example_data) do group
  Dagger.spawn(group) do group
    Dagger.spawn_sequential() do
      localaggregator = Dagger.shard(;per_thread=true) do
        Dict{Symbol,Tuple{Int,Int}}()
      end
      r = Dagger.spawn_bulk() do
        map(group) do subgroup
          Dagger.spawn(accumulate_data,subgroup,localaggregator)
        end
      end
      #Return the involved procs
      procs = Dagger.processor.(fetch.(r,raw=true))
      # And pass them to the merge function to ignore empty shard pieces during reduction. 
      unflushed_data = merge_local_outputs(localaggregator,procs)
      wait(Dagger.spawn(unflushed_data,aggregator) do rem_data, agg
        merge!(agg,rem_data) do (n1,s1),(n2,s2)
          n1+n2,s1+s2
        end
      end)
    end
    true
  end
end;
fetch.(r)
wait(merge_local_outputs(aggregator,nothing));

Currently I am implementing and testing this for my real-world use case, thanks a lot for the support.

1 Like

Right, but when working with mutable data like in your aggregators, you need at least one sequential point (w.r.t the aggregators) where you collect their values and clear them; it’s not safe to access them concurrently since Dict can’t be accessed concurrently by multiple threads (not when you’re writing to them). Anyway, this example seems good to me as far as I can tell, although that last wait shouldn’t generally be necessary (except on the last iteration, if you want to ensure that it’s completed by the time the outer map call has completed).

1 Like