Parallel Map/Reduce with RemoteChannels - comments on MWE?


#1

Greetings,

I’m working on a project that is a candidate for parallel map/reduce. So, I wrote a MWE of a framework for doing that below, using RemoteChannels and remote_do(), and am interested in feedback. I’m just coming over to Julia from R, and am getting my sea legs here…

For context, the program’s intent is to read and analyze FFT data from radio telescopes., In production, I’d send a small set of pulse data from a single FFT to each worker instead of rand() and sample(). This is a small quick data transfer, compared to the long processing time of each worker job. The full run will cover thousands of FFT’s, over a billion signal patterns,and I’m hoping to get it all done on my 32-core machine. No way R could do this!

Documentation was a little thin on parallel map/reduce - I based this off of https://docs.julialang.org/en/stable/manual/parallel-computing#Channels-and-RemoteChannels-1, but removed a number of things that seemed extraneous.

In particular, these are the changes that I made - am I on the right track?

  • The example had RemoteChannels on a const, I just put them in main so they wouldn’t stick around in the module.
  • Job scheduling is through a single RemoteChannel ch_jobs - workers pick up new work FIFO.
  • The example used @schedule to feed the jobs to the RemoteChannel - I just did em in a loop. Why did they @schedule?
  • Workers emit key-value findings to a single RemoteChannel ch_emit. The real job will have maybe 8 different key types to capture counts, sums, and histograms.
  • I run the reducer code in the main() loop after sending out the jobs. Is there a reason I’d want to schedule this out instead?
  • I chose to bin the histograms in the reducer, not in the mapper. Could do either??
  • We could have a separate Task for each emit type. A single reducer seemed easier, especially on shutdown coordination?
  • I put in a nifty progress meter, which isn’t possible in pmap().
  • I was concerned about detecting the end, and there still may be a little race condition in there. The final “done” may arrive before its data. Any suggestions?
  • I didn’t find much guidance on housekeeping to leave the workers ready for other tasks. I make them exit their loops when the channel closes, but should I also clear!() them?

Another approach is pmap(), but that operates on return values rather than multiple map/reduce emits. Likewise, this example scheduler https://docs.julialang.org/en/stable/manual/parallel-computing#Scheduling-1 is in the same boat. Am I missing something there??

Thanks for any ideas from the team, and I hope that the code might help someone else doing the same at a later date.

Best,

Pasha.

MWE Code:

using Distributions
using ProgressMeter

function main(nfft::Int = 100)

  # TODO the RemoteChannels were on a const, but why not here?
  ch_jobs = RemoteChannel(()->Channel{Tuple{Int, Int}}(nfft))
  ch_emit = RemoteChannel(()->Channel{Tuple{String, Int}}(3 * nfft))

  ### MAP phase 
  # number of pulses per fft are poisson distributed.
  # feed jobs channel with jobnumber, pulse count
  fft_dist = rand(Distributions.Poisson(16.7), nfft)
  for (jobid, npulses) in enumerate(fft_dist)
    put!(ch_jobs, (jobid, npulses))
  end

  info("Created $nfft jobs with average length $(mean(fft_dist))")

  # start tasks on the workers to process requests in parallel
  for wkr in workers()
    @async remote_do(do_work, wkr, ch_jobs, ch_emit)
  end

  info("Started $(nworkers()) / $(Sys.CPU_CORES) workers")

  ### REDUCE phase 
  # hving sent Task to run mapper jobs, 
  # now sit back, receive and tally emitted messages
  # TODO reduce was on a separate Tasks, but why not here in main?
  ndone = 0
  pulse_cnt = 0
  pulse_sum = 0
  pulse_hist = zeros(Int, 13)
  prog = Progress(nfft, 1)

  while ndone < nfft || isready(ch_emit)
    # take key, value from emitter channel, agg them below to keep channel clean
    (e_key, e_val) = take!(ch_emit)
    #= println("reducing message $e_key: $e_val") =#

    if e_key == "pulse_cnt"
      pulse_cnt += e_val

    elseif e_key == "pulse_sum"
      pulse_sum += e_val

    elseif e_key == "pulse_hist"
      hbin = Int(floor(e_val / 1e6) + 1) # round the incoming value to the nearest million - ok for MWE
      pulse_hist[hbin] += 1

    elseif e_key == "done"
      ndone += 1
      update!(prog, ndone) # update progress bar

    else
      error("Unknown emit key: $e_key")
    end
  end

  ## OUTPUT results
  println("Total of $pulse_cnt pulses over $nfft fft, so average fft length was $(pulse_cnt/nfft).")
  println("Pulse sum was $pulse_sum, so average bin value was $(pulse_sum/pulse_cnt)")
  println("This is the pulse histogram")
  println(pulse_hist)

  ## CLEANUP phase
  @assert ! isready(ch_jobs)
  @assert ! isready(ch_emit)
  close(ch_jobs)
  close(ch_emit)
  # TODO do I need to clear!() the workers?
end

@everywhere function do_work(ch_jobs, ch_emit)
  # it would be nice to have isopen(ch_jobs)
  try
    while true
      jobid, npulses = take!(ch_jobs)
      #= println("starting job $jobid with $npulses pulses") =#

      # this FFT has 12 mil bins, uniform distribution
      pulses = StatsBase.sample(1:Int(12e6), npulses, replace = false) 

      # do real work here... emit results to separate reducers as below

      # emit the fft length to the length reducer
      put!(ch_emit, ("pulse_cnt", npulses))
      # emit the fft sum to the sum reducer
      put!(ch_emit, ("pulse_sum", sum(pulses)))
      # emit each pulse to the histogram reducer
      map(x->put!(ch_emit, ("pulse_hist", x)), pulses)
      # mark this job done
      put!(ch_emit, ("done", jobid))
    end
  catch err
    if isa(err, InvalidStateException) || isa(err, RemoteException)
      #= info("Worker $(myid()) exiting loop") =#
    else
      warn("Worker $(myid()) exiting with error $err")
    end
  end
end

Detecting a closed channel?
#2

Lesson learned - make my post shorter/simpler and ask one question.

The most important question in this MWE is “Why should I put the RemoteChannels on a const?” They seem to work fine as a variable in main, stay in scope for the @schedule tasks that use them, and I like the fact that when it’s all done, they fall out of scope and get cleaned up. Why const?


#3

Your way of doing things is fine.

The examples were written in an interactive style / for demoing, working in the global scope of a REPL.
There, you’re unlikely to want to pass around the channels as function arguments on each call.
Using const gives them both type stability, and interactivity.

You have the former in main, but not the latter unless you don’t mind the overhead of repeatedly creating and destroying channels. Given that it’s main, I’m guessing you only call main once, so that isn’t an issue either.


#4

Putting the feeder into a Task (via @schedule) allows the workers to start processing before all the data is ready. This is frequently important (so good to show in a tutorial example), but apparently not for your case. Also, the loop would deadlock if you didn’t make the buffer big enough for everything.