Greetings,
I’m working on a project that is a candidate for parallel map/reduce. So, I wrote a MWE of a framework for doing that below, using RemoteChannels and remote_do(), and am interested in feedback. I’m just coming over to Julia from R, and am getting my sea legs here…
For context, the program’s intent is to read and analyze FFT data from radio telescopes., In production, I’d send a small set of pulse data from a single FFT to each worker instead of rand() and sample(). This is a small quick data transfer, compared to the long processing time of each worker job. The full run will cover thousands of FFT’s, over a billion signal patterns,and I’m hoping to get it all done on my 32-core machine. No way R could do this!
Documentation was a little thin on parallel map/reduce - I based this off of https://docs.julialang.org/en/stable/manual/parallel-computing#Channels-and-RemoteChannels-1, but removed a number of things that seemed extraneous.
In particular, these are the changes that I made - am I on the right track?
- The example had RemoteChannels on a const, I just put them in main so they wouldn’t stick around in the module.
- Job scheduling is through a single RemoteChannel ch_jobs - workers pick up new work FIFO.
- The example used @schedule to feed the jobs to the RemoteChannel - I just did em in a loop. Why did they @schedule?
- Workers emit key-value findings to a single RemoteChannel ch_emit. The real job will have maybe 8 different key types to capture counts, sums, and histograms.
- I run the reducer code in the main() loop after sending out the jobs. Is there a reason I’d want to schedule this out instead?
- I chose to bin the histograms in the reducer, not in the mapper. Could do either??
- We could have a separate Task for each emit type. A single reducer seemed easier, especially on shutdown coordination?
- I put in a nifty progress meter, which isn’t possible in pmap().
- I was concerned about detecting the end, and there still may be a little race condition in there. The final “done” may arrive before its data. Any suggestions?
- I didn’t find much guidance on housekeeping to leave the workers ready for other tasks. I make them exit their loops when the channel closes, but should I also clear!() them?
Another approach is pmap(), but that operates on return values rather than multiple map/reduce emits. Likewise, this example scheduler https://docs.julialang.org/en/stable/manual/parallel-computing#Scheduling-1 is in the same boat. Am I missing something there??
Thanks for any ideas from the team, and I hope that the code might help someone else doing the same at a later date.
Best,
Pasha.
MWE Code:
using Distributions
using ProgressMeter
function main(nfft::Int = 100)
# TODO the RemoteChannels were on a const, but why not here?
ch_jobs = RemoteChannel(()->Channel{Tuple{Int, Int}}(nfft))
ch_emit = RemoteChannel(()->Channel{Tuple{String, Int}}(3 * nfft))
### MAP phase
# number of pulses per fft are poisson distributed.
# feed jobs channel with jobnumber, pulse count
fft_dist = rand(Distributions.Poisson(16.7), nfft)
for (jobid, npulses) in enumerate(fft_dist)
put!(ch_jobs, (jobid, npulses))
end
info("Created $nfft jobs with average length $(mean(fft_dist))")
# start tasks on the workers to process requests in parallel
for wkr in workers()
@async remote_do(do_work, wkr, ch_jobs, ch_emit)
end
info("Started $(nworkers()) / $(Sys.CPU_CORES) workers")
### REDUCE phase
# hving sent Task to run mapper jobs,
# now sit back, receive and tally emitted messages
# TODO reduce was on a separate Tasks, but why not here in main?
ndone = 0
pulse_cnt = 0
pulse_sum = 0
pulse_hist = zeros(Int, 13)
prog = Progress(nfft, 1)
while ndone < nfft || isready(ch_emit)
# take key, value from emitter channel, agg them below to keep channel clean
(e_key, e_val) = take!(ch_emit)
#= println("reducing message $e_key: $e_val") =#
if e_key == "pulse_cnt"
pulse_cnt += e_val
elseif e_key == "pulse_sum"
pulse_sum += e_val
elseif e_key == "pulse_hist"
hbin = Int(floor(e_val / 1e6) + 1) # round the incoming value to the nearest million - ok for MWE
pulse_hist[hbin] += 1
elseif e_key == "done"
ndone += 1
update!(prog, ndone) # update progress bar
else
error("Unknown emit key: $e_key")
end
end
## OUTPUT results
println("Total of $pulse_cnt pulses over $nfft fft, so average fft length was $(pulse_cnt/nfft).")
println("Pulse sum was $pulse_sum, so average bin value was $(pulse_sum/pulse_cnt)")
println("This is the pulse histogram")
println(pulse_hist)
## CLEANUP phase
@assert ! isready(ch_jobs)
@assert ! isready(ch_emit)
close(ch_jobs)
close(ch_emit)
# TODO do I need to clear!() the workers?
end
@everywhere function do_work(ch_jobs, ch_emit)
# it would be nice to have isopen(ch_jobs)
try
while true
jobid, npulses = take!(ch_jobs)
#= println("starting job $jobid with $npulses pulses") =#
# this FFT has 12 mil bins, uniform distribution
pulses = StatsBase.sample(1:Int(12e6), npulses, replace = false)
# do real work here... emit results to separate reducers as below
# emit the fft length to the length reducer
put!(ch_emit, ("pulse_cnt", npulses))
# emit the fft sum to the sum reducer
put!(ch_emit, ("pulse_sum", sum(pulses)))
# emit each pulse to the histogram reducer
map(x->put!(ch_emit, ("pulse_hist", x)), pulses)
# mark this job done
put!(ch_emit, ("done", jobid))
end
catch err
if isa(err, InvalidStateException) || isa(err, RemoteException)
#= info("Worker $(myid()) exiting loop") =#
else
warn("Worker $(myid()) exiting with error $err")
end
end
end