Distributed Conjugate Gradient Numerical Errors

Hi All,

I was implementing a distributed conjugate gradient method in the course of my research and have run into a numerical issue when distributing the code to multiple processes. Algorithmically, my single process and multi-process versions of code should produce the exact same values, but on my main work desktop (Intel 5930k, Ubuntu 16.04, Julia 0.6) there are numerical differences when I run the code async in one process vs. in two processes.

The test code is as follows. It is a little long, but I cannot reduce further without affecting the reproducibility.

NTEST = 1
addprocs(NTEST; topology=:master_slave)

@everywhere function par_cg_solver{T<:AbstractFloat}(fim::Matrix{T},
                                                     vpg::Vector{T},
                                                     toworkers,
                                                     tomaster,
                                                     cg_iters::Integer=10,
                                                    id::Int=max(myid()-1, 1))
   put!(tomaster[id], vpg) # send vpg
   x = zeros(T, size(vpg))
   vpg[:] = take!(toworkers[id])

   r = copy(vpg)
   p = copy(vpg)
   rdr = dot(vpg, vpg)  # outputs a scalar

   z = fim*p
   put!(tomaster[id], copy(z))

   for i=1:cg_iters
      z = take!( toworkers[id] )
      v = rdr/dot(p, z)      # scalar

      x += v*p
      if i == cg_iters
         break
      end

      r -= v*z

      rdr_new = dot(r, r)    # scalar
      ratio = rdr_new/rdr
      rdr = rdr_new

      p .*= ratio
      p += r

      A_mul_B!(z, fim, p)

      put!(tomaster[id], copy(z))
   end
   put!(tomaster[id], x) # send npg for sync
   x[:] = take!(toworkers[id])
   return x
end
function scatter{T<:AbstractFloat}(tomaster, val::Vector{T}, nworkers::Int)
   for i=1:nworkers
      put!(tomaster[i], copy(val))
   end
end
function gather!{T<:AbstractFloat}(toworkers, val::Vector{T}, nworkers::Int)
   val[:] .= T(0.0)
   for i=1:nworkers
      val[:] += take!(toworkers[i])
   end
   val[:] /= nworkers
end
function par_cg_manager(nparam::Int,
                        nworkers::Int,
                        toworkers,
                        tomaster,
                        cg_iters::Int=10)
   vpg = zeros(nparam)
   z = zeros(nparam)

   gather!(tomaster, vpg, nworkers)
   scatter(toworkers, vpg, nworkers)

   for i=1:cg_iters
      gather!(tomaster, z, nworkers)
      scatter(toworkers, z, nworkers)
   end

   gather!(tomaster, z, nworkers)
   scatter(toworkers, z, nworkers)
   return vpg, z # z == npg
end

function cpu_cg_solve{T<:AbstractFloat}(fim::Matrix{T},
                                        vpg::Vector{T},
                                        cg_iters::Int=10)
   # Initialize cg variables
   r = copy(vpg)
   p = copy(vpg)
   x = zeros(T, size(vpg))
   rdr = dot(vpg, vpg)  # outputs a scalar
   z = fim*p

   for i=1:cg_iters
      v = rdr/dot(p, z)      # scalar

      x += v*p
      r -= v*z

      rdr_new = dot(r, r)    # scalar
      ratio = rdr_new/rdr
      rdr = rdr_new

      p[:] .*= ratio
      p[:] += r

      A_mul_B!(z, fim, p)
   end
   return x
end  

nparam = 512 #128

T = Float32
iters = 10 #100

const get_cg_z  = [ RemoteChannel(()->Channel{Vector{T}}(1)) for i=1:(NTEST) ] 
const send_cg_z = [ RemoteChannel(()->Channel{Vector{T}}(1)) for i=1:(NTEST) ] 

# start work threads for par cg
srand(12345)
A = rand(T, nparam, nparam)
b = rand(T, nparam)
x1 = zeros(T, nparam, NTEST)
x2= zeros(T, nparam, NTEST)
A = A*A' # make A matrix psd
x = cpu_cg_solve(A, b, iters) # test cpu

println("1 Process, Async")
@sync begin
   @async par_cg_manager(nparam, NTEST, get_cg_z, send_cg_z, iters)
   for i=1:NTEST
      @async x1[:, i] = par_cg_solver(A, b, get_cg_z, send_cg_z, iters, i)
   end
end
x1 = mean(x1, 2)
println("   Maximum Diff: ", maximum(x-x1) )
diff = maximum(x-x1)
if diff != 0.0
   print_with_color(:red, "  BAD Value\n")
end

println("$(NTEST+1) Process")
@sync begin
   @async par_cg_manager(nparam, NTEST, get_cg_z, send_cg_z, iters)
   for i=1:NTEST
      @async x2[:, i] = remotecall_fetch(par_cg_solver, i+1, A, b, get_cg_z, send_cg_z, iters)
   end
end
x2 = mean(x2, 2)
println("   Maximum Diff: ", maximum(x-x2) )
diff = maximum(x-x2)
if diff != 0.0
   print_with_color(:red, "  BAD Value\n")
end

Which on the affected machine produces this output:

1 Process, Async
   Maximum Diff: 0.0
2 Process
   Maximum Diff: 0.018099353
  BAD Value

I would expect the differences between the too procedures to be no different considering it’s calling the same functions just in different ways. There is a single process (no communication version of CG included as well.

I cannot reproduce this bug on my laptop and on a few other servers, but a different machine with the same 5930k processor as well as a machine with a 3930k processor both have the same issue. At this point I’m not sure if this is a bug in the math libraries affecting these processors, or some issue with multi-process serialization. When I change the nparam variable to much smaller values, this does not trigger the bug on my main machine, which is a clue, but still does not indicate a search direction. Julia 0.5 on the same machine does not trigger this, and the same code on a Mac (different processor) also does not trigger this.

Any guidance or help would be appreciated; I’ve found that this part of the code is affecting the rest of my research, so I’m glad to have narrowed it down to this point. It seems to be critical enough that it’s producing erroneous code when algorithmically this shouldn’t be the case…

To summarize my current thinking, the issue is caused by:

  1. Linear Algebra functions that call processor dependent functionality
  2. remote process serialization issue, but only on specific processors?
  3. combination of the two, or something else?

I’ve made a significantly smaller test case but still It only triggers on certain computers:

addprocs(1)

@everywhere function par_cg_solver{T<:AbstractFloat}(fim::Matrix{T},
                                                     vpg::Vector{T})
   x = zeros(T, size(vpg))
   z = fim*vpg
   v = T(0.5)

      x += v*z
   return x
end

nparam = 128

T = Float32

srand(12345)
A = rand(T, nparam, nparam)
b = rand(T, nparam)
x1 = zeros(T, nparam)
x2 = zeros(T, nparam)
x3 = zeros(T, nparam)
A = A*A' # make A matrix psd

x1[:] = par_cg_solver(A, b)
x2[:] = remotecall_fetch(par_cg_solver, 2, A, b)
x3[:] = remotecall_fetch(par_cg_solver, 1, A, b)

println("   Max Diff local vs proc 1: ", maximum(x1-x3) )
println("   Max Diff local vs proc 2: ", maximum(x1-x2) )
println("   Index of max diff       : ", findmax(x1-x2))
@printf "%8.16f\n" x1[20]
@printf "%8.16f\n" x2[20]

This, on the offending machine, produces:

   Max Diff local vs proc 1: 0.0
   Max Diff local vs proc 2: 0.00036621094
   Index of max diff       : (0.00036621094f0, 20)
1218.9730224609375000
1218.9726562500000000

The only difference, unless I’m missing something tremendously obvious, is that I am remote-calling the function par_cg_solver on process 2. This, to me, is utterly bizarre.

That’s only a few ULPs, so it could arise from different operation orders between different environments for BLAS. Can you try it with BLAS.set_num_threads(1) everywhere? That might make things more consistent.

1 Like

This is pretty interesting. For addprocs(1) on the local machine, setting set_num_threads(<8) fixes the issue, but it still persists if I add a process on a remote machine. If I keep everything on one machine, and basically wrap everything with a consistent number of BLAS threads ( @everywhere) then it seems to produce consistent results – this is still odd, however, that different processes would use different number of BLAS threads, but I can manually rectify this for the time being.

The remote machine I tested with was not exactly the same processor, although it does have the same number of hardware cores. The trick to make it work for one machine doesn’t extend to a remote machine and I’m still getting numerical issues. I’m running with julia -O0 to avoid any processor specific optimizations, but would BLAS really not produce the same results with the same number of threads between different processors? I’m relatively confident now, thanks to your advice, that it’s not a remote process serialization issue, but I’m not sure how to guarantee my conjugate gradient code is actually doing what CG expects if BLAS is actually at fault here. Any thoughts?

Well, addition is not associative in floating point, so if you simply distribute a dot product by making every processor sum its own part of the array and then summing the result, you will get a different result than by simply summing. You should always expect a small amount of (possibly non deterministic) noise when running a parallel algorithm.

In this formulation we are trying to solve Ax=b for x where each process has it’s own A matrix and b vector. We first average the b value, and then average the hessian vector product, z in my code, across all processes. This means for each cg iteration, each process should be doing the same thing with the same values, up until each process calculates its own hessian vector product again. Your point of a non-associative floating point is valid, but would two different processes really produce two different values for dot product on the same vector?

More interestingly, the cumulative effect of this, within my larger code base, is that the distributive version of my CG can sometimes produce an x that has a negative inner product with the input b. This is bizarre because I can guarantee each process’ A matrix is PSD, and I’m averaging the b vector; each parallel processes’ x vector is the same and can produce the same negative inner product, so even if the algorithm is consistent, it’s consistently wrong. The alternative is to average all the A and b’s and do CG, which produces the correct value – this is more expensive bandwidth wise than the distributed.

  1. is there a way to encourage consistency between parallel processes?
  2. are floating points that sensitive that math can algorithmically break like in my second paragraph?

EDIT: as an additional thought, if floating points were that sensitive, wouldn’t we also see inconsistencies in a single process CG similar to the parallel version? I’m still inclined to think that its more likely a weird calculation is happening over floats breaking the algorithm…

Ah, then I had a similar bug in a Fortran/MPI a few years ago, which took ages to debug: different processors doing the exact same work ended up being slightly out of sync when computing a preconditionner. This turned out to destabilize an iterative solver using that preconditionner. We ended up synchronizing the preconditionner, which fixed the bug, but I never really understood what was going on. That bug only occurred with one specific compiler (I believe ifort but can’t be sure), on one specific machine.

It’s surprising however that CG should be that sensitive. What’s the conditioning of A?

For a matrix of size ~ 2500x2500 they have a conditioning of 2e6 approx (but can get larger… this is a high variance kind of problem); does seem kind of high but for the most part the distributed algorithm works. I have been running my actual code with 64bit floats to avoid other issues. I did run into synchronization issues previously, but that was fixed with using a RemoteChannel for each worker node (only 4 nodes) which helped with some issues. Now i’m faced with two issues: one is scientific reproducibility and the other is algorithmic correctness.

If I could guarantee that the algorithm was doing a correct CG but just slightly off (I’m using it to calculate a natural gradient) then that shouldn’t be a problem as I’m taking many gradient steps over time. However, if these numerical issues prevent algorithmic stability, then I could be getting a bad gradient direction: this kills my work (RL for robotics). I can fall back to a synchronous version of this, but then i’m not using Julia for all it can do :slight_smile:

If matrix multiplication is not reproducible machine to machine, then reproducing work seems difficult. Take the following code as an example:

#addprocs(1)
addprocs(["hulk"]) # remote machine
@everywhere function foo(n)
   BLAS.set_num_threads(1)
   srand(12345)
   NUM = n #128
   x = rand(Float32, NUM, NUM)
   y = x*x'
   return y
end

for i=1:12
   #n = i
   n = 2^i
   v =foo(n)
   v1=remotecall_fetch(foo, 1, n)
   v2=remotecall_fetch(foo, 2, n)
   d1 = v-v1
   d2 = v-v2
   if d1 != d2
      println("$n")
      @printf "L: %8.16f\n" maximum(d1)
      @printf "R: %8.16f\n" maximum(d2)
      println()
   end
end

We have a function that does a little bit of matrix math. We compare that function call to that function called through remotecall_fetch, and we’d expect the results to be the same. However, if the remote process is on a different machine in my cluster (unfortunately it’s a different processor than my main desktop), we return a different value v2. This triggers when the matrix size is 4x4, so i’m guessing there’s some BLAS call with some vector instructions affecting this, but at the end of the day, which result do we trust, v1 or v2? Same data, different returned values–increasing with matrix size–at least in my configuration.

4
L: 0.0000000000000000
R: 0.0000002384185791

8
L: 0.0000000000000000
R: 0.0000002384185791

16
L: 0.0000000000000000
R: 0.0000004768371582

32
L: 0.0000000000000000
R: 0.0000019073486328

That is indeed weird, but still in the range of machine precision (which is relative, hence the fact the error grows with n). The conservative answer to “what’s the correct one, v1 or v2” would be “neither, but they’re both within O(eps) of the correct one”. Conditioning of 1e6 on float32 is cutting it quite close, so it’s not that surprising if CG breaks down…

I’ve been running with Float64 to try and avoid issues; the conditioning of 1e6 is for that. When running synchronous (I average all processes’ A and b’s) CG everything is fine, but having to ship A matrices is bandwidth costly. Instead, I average the b vectors and each CG iteration starts with an averaged z vector. The error seems to occur when different processes take the same z vector and produce different results with the intermediate computations; this causes each process to start diverging from each other when they should be the same.

Thus the error is not so much my CG code but how to deal with different processes producing different outputs for the same input. I think I do a more centralized version for my CG with a small re-write and still get most of the bandwidth savings, but It make me concerned about all my other reinforcement learning code…