Julia Distributed, AllReduce, and Distributed Training

Short Update,

I got this bit of code working… I think.

using Distributed
using MyClusterManagerAprun

                                                                                                                                                                                                                                                                                                                
addprocs_aprun(16)



@everywhere function RingAllReduce(channelTable, masterChannel)
    m_id = myid()
    n_w = nworkers()
    dat = rand(1:200, (4,4,156,12));
    w_l = sort(workers())
                                                                                                                                                                                                                                                                                                 
    w_i_i_map = Dict(w_i => i for (i,w_i) in zip(0:(n_w-1), w_l))
    i_w_i_map = Dict(i => w_i for (i,w_i) in zip(0:(n_w-1), w_l))

    right_id = i_w_i_map[(w_i_i_map[ m_id] + 1) % n_w]
                                                                                                                                                                                                                           
    _dat_accum = copy(dat)
    time_el = @elapsed for _i in 1:(nworkers()-1)
        put!(channelTable[myid()], _dat_accum)
        _dat_accum = take!(channelTable[right_id]) + dat
    end
    put!(channelTable[myid()], _dat_accum)
    _dat_accum = take!(channelTable[right_id])                                                                                                                                                                                                                                                                                               
    put!(masterChannel, (m_id, time_el))
end
masterChannel = RemoteChannel(()->Channel{Tuple}(20), 1)
channelTable = Dict(w_i => RemoteChannel(()->Channel{Array{Int64,4}}(1), w_i) for w_i in workers())

futures = [remotecall(RingAllReduce, w_i, channelTable, masterChannel) for w_i in workers()]

Then I can use masterChannel to monitor progress like this,

[take!(masterChannel) for i in 1:100 if isready(masterChannel)]

This pattern seems to be working for me at the moment. I’ll post again when I have a better test set up for this.

1 Like