Training Deep Neural Network using Data Parallel?

Hi,
I need to train some simpler networks on large datasets and would like to use multiple GPUs for training. Is there a tutorial or writeup on how to do this in Julia? I know about this one in torch: Multi-GPU Examples — PyTorch Tutorials 1.12.1+cu102 documentation

Best, Ralph

Don’t know about anything exactly like the PyTorch example, but you can probably put something together.

This thread could maybe be of interest

where the cuda docs are mentioned, where you can find a little about some basic data handling over gpus.

I have implemented data-parrallel training on CPUs to better utilize multi-threadding in Hierarchical Multiple-instance learning models. It is realtively easy to do by hand. You can check https://github.com/pevnak/PrayTools.jl/blob/master/src/threadedgrad.jl for inspiration. Feel free to contact me for discussion.

Tomas

@Tomas_Pevny Thanks, that’s helpful. I am now sure how to handle Zygote.grads in the distributed setting. Here is my code:

# MNIST classifier using data-parallel on multiple GPUs

using Distributed
using CUDA

addprocs(length(devices()))

# Include Flux, Zygote, and defines calculation of gradients.
#@everywhere include("/global/homes/r/rkube/source/julia_test/mnist_class_distributed.jl")
@everywhere using Flux
@everywhere using Zygote
@everywhere function grads_now(x, y, mm, pp)
    # xx: Batch of images, size (28)x(28)x(1)x(batch_size)
    # y:  One-hot encoded labels of the image, 0-9
    # mm: Flux model
    # pp: Parameters of the flux model
    loss, back = Zygote.pullback(pp) do   
        Flux.Losses.binarycrossentropy(y, mm(x))
    end
    grads = back(1f0)
end

using MLDatasets
using Flux.Data: DataLoader
using Zygote
using Flux

num_gpus = 2 #length(devices())
examples_per_worker = 64;
batch_size = examples_per_worker * num_gpus

# Load data
xtrain, ytrain = MLDatasets.MNIST.traindata(Float32);
xtest, ytest = MLDatasets.MNIST.testdata(Float32);

xtrain = reshape(xtrain, (28, 28, 1, :)) |> gpu;
xtest = reshape(xtest, (28, 28, 1, :)) |> gpu;

# One-hot-encode the labels
ytrain = Flux.onehotbatch(ytrain, 0:9) |> gpu;
ytest = Flux.onehotbatch(ytest, 0:9) |> gpu;

# Batching
train_loader = DataLoader((xtrain, ytrain), batchsize=batch_size, shuffle=true);
test_loader = DataLoader((xtest, ytest), batchsize=batch_size);

# Optimizer
opt = ADAM(0.9);

# Define a simple MLP with sigmoid final layer of size 10 for probability 
model = Chain(Conv((3, 3), 1=>8, relu), 
              Conv((3, 3), 8=>16, relu), 
              Conv((5, 5), 16 =>16, relu), 
              Conv((5, 5), 16=>16, relu), 
              x -> flatten(x), 
              Dense(4096, 10, sigmoid)) |> gpu;

# Extract model params
ps = Flux.params(model);

(x,y) = first(train_loader);

# Split x and y into parts that are sent to each worker
xy_per_worker = [(x[:,:,:, (k-1) * examples_per_worker + 1: k * examples_per_worker],
                  y[:, (k-1) * examples_per_worker + 1: k * examples_per_worker]) for k ∈ 1:num_gpus]

# Calculate gradients over current example distributed on the worker pool
grads_from_workers = pmap(xy -> grads_now(xy[1], xy[2], model, ps), xy_per_worker)
# Average gradients that we just received from the workers
for p in ps
    # Hard-coded for num_gpus=2
    grads_from_workers[1][p] = (grads_from_workers[1][p] + grads_from_workers[2][p]) / 2.0
end

The last loop is inspired from your addgrad function. Julia throws an error when accessing the elements from grads_from_workers:

julia> for p in ps
           # Hard-coded for num_gpus=2
           grads_from_workers[1][p] = (grads_from_workers[1][p] + grads_from_workers[2][p]) / 2.0
       end
ERROR: KeyError: key [0.26950487 0.09152142 -0.25693536; -0.05512565 -0.0565367 0.26960212; -0.081430614 -0.18807764 -0.07629797;;;; -0.04641332 -0.10099426 0.039984215; 0.19868578 -0.059893914 0.13311733; -0.029146401 -0.15549782 0.13652915;;;; -0.26943153 0.24420884 0.22624107; 0.107807994 0.25059542 0.053223163; -0.19172552 0.11626054 -0.250573;;;; 0.09726549 -0.11461907 -0.21769923; 0.09831371 -0.16952133 -0.2452086; 0.012307492 -0.1203438 -0.20718424;;;; 0.07682779 -0.0055676983 0.008418384; -0.090335764 0.038072642 -0.23139168; -0.1753688 -0.17191374 0.07041458;;;; 0.1565108 0.25166494 0.05122282; 0.057927087 0.14263465 -0.2567391; 0.01940246 -0.09167486 0.2549128;;;; 0.00838662 0.12844574 0.2326228; 0.025148213 0.244805 -0.09040289; -0.05097455 -0.040772133 -0.017126724;;;; -0.13500807 0.00867979 -0.23491067; 0.10669534 0.14557783 -0.16608055; -0.032498617 0.0036792245 0.14004922] not found
Stacktrace:
 [1] getindex
   @ ./iddict.jl:108 [inlined]
 [2] getindex(gs::Zygote.Grads, x::CuArray{Float32, 4, CUDA.Mem.DeviceBuffer})
   @ Zygote ~/.julia/packages/Zygote/kUefI/src/compiler/interface.jl:274
 [3] top-level scope
   @ ./REPL[2]:3

When I run this code on a single CPU, accessing the gradients works fine like this:

julia> gg = grads_now(x, y, model, ps);

julia> gg[ps[1]]
3×3×1×8 Array{Float32, 4}:
[:, :, 1, 1] =
 0.0441429  0.0318292    0.0203954
 0.0292745  0.015554     0.0153562
 0.0194541  0.00176881  -0.00113646

...

The best and most effective is to convert them to vector and then restore them back for Grad structure on the master. I have found it very effective, because sending the vector is nice to serialization which is used to send the data between different julia processes.

The cause of error needs a bit deeper understanding of Zygote. Grad contains an IdDict, which as a key has a “pointer” to an array and the value is a gradient of that array. As you can imagine, sending pointers over the network does not work well.

You can write a variant of these functions taken from Zygote

function copy!(gs::Grads, x::AbstractVector, ps)
  i = 0
  for p in ps
    gs[p] .= reshape(x[i+1:i+length(p)], size(p))
    i += length(p)
  end
  gs
end

function copy!(x::AbstractVector,  gs::Grads, ps)
  i = 0
  for p in ps
    !haskey(gs, p) && continue
    gs[p] === nothing && continue
    x[i+1:i+length(p)] .= vec(gs[p])
    i += length(p)
  end
  x
end

I have adapted them without testing, but you need to pass them an output of Flux.params(model) on both sides. Also, do not forget to copy the gradient from gpu if needed using cpu.

I hope you got the idea, how to do it. I once had a student, who was implementing some distributed algorithms as a master thesis, but I guess he never released the source code.

Best,

Tomas

Yeah, per my comments in that thread, FluxMPI is probably the most turnkey library available at present. Definitely worth checking out.