Short Update,
I got this bit of code working… I think.
using Distributed
using MyClusterManagerAprun
addprocs_aprun(16)
@everywhere function RingAllReduce(channelTable, masterChannel)
m_id = myid()
n_w = nworkers()
dat = rand(1:200, (4,4,156,12));
w_l = sort(workers())
w_i_i_map = Dict(w_i => i for (i,w_i) in zip(0:(n_w-1), w_l))
i_w_i_map = Dict(i => w_i for (i,w_i) in zip(0:(n_w-1), w_l))
right_id = i_w_i_map[(w_i_i_map[ m_id] + 1) % n_w]
_dat_accum = copy(dat)
time_el = @elapsed for _i in 1:(nworkers()-1)
put!(channelTable[myid()], _dat_accum)
_dat_accum = take!(channelTable[right_id]) + dat
end
put!(channelTable[myid()], _dat_accum)
_dat_accum = take!(channelTable[right_id])
put!(masterChannel, (m_id, time_el))
end
masterChannel = RemoteChannel(()->Channel{Tuple}(20), 1)
channelTable = Dict(w_i => RemoteChannel(()->Channel{Array{Int64,4}}(1), w_i) for w_i in workers())
futures = [remotecall(RingAllReduce, w_i, channelTable, masterChannel) for w_i in workers()]
Then I can use masterChannel
to monitor progress like this,
[take!(masterChannel) for i in 1:100 if isready(masterChannel)]
This pattern seems to be working for me at the moment. I’ll post again when I have a better test set up for this.