Setting up worker local buffers when using Distributed.jl

Hello all,

I am in the process of incrementally optimizing an algorithm, having focused first on single-core performance, then threading, and now want to scale to Distributed workers.
One key element of my optimizations is reducing allocations by allocating a large buffer once, and then using it for every iteration. However, I haven’t been able to setup up “worker local buffers” to the Distributed.jl setting.

To illustrate, I want to speed up something roughly like this:

using StatsBase: sample
using LinearAlgebra: svd!

# set up struct that holds the preallocated buffer
mutable struct MyMethod{T}
  buffer::Matrix{T}
end

# set up some work....
data = rand(100, 100_000);
# we will pick random columns from the data and work on them, by copying them to a buffer
column_index_batches = [sample(axes(data, 2), 1_000) for _ in 1:500]

method = MyMethod(zeros(size(data, 1), 1_000))   # <- preallocate buffer here once

# do the actual work
for c_inds in column_index_batches
  method.buffer .= data[:, c_inds]     # <- I'm using the buffer here!
  res = svd!(method.buffer)            # <- no more allocations, yay!
  # do something with the res, e.g.
  return res.S[1] / res.S[end]
end

Note that this is not my actual code, but roughly outlines my approach.

Multi-threading also works fine with this approach. I can similarly preallocate a buffer for each thread (although you need to be weary of recent discussion around thread-local state).

mutable struct MyMethodThreaded{T}
  buffers::Vector{Matrix{T}}
end

method_threaded = MyMethodThreaded(
    [zeros(size(data, 1), 1_000)
     for _ in 1:Threads.nthreads()]);

# Note :static scheduling to allow using Thread local storage.
Threads.@threads :static for c_inds in column_index_batches
  buffer = method.buffers[Threads.threadid()]   # <- Grab the Thread local buffer here.
  buffer .= data[:, c_inds]     # <- I'm using the buffer here!
  res = svd!(buffer)
  # do something with the res, e.g.
  return res.S[1] / res.S[end]
end

So far so good. Now, coming to my actual question, is how to do this via Distributed.jl, when using multiple compute nodes.

Naively, I would again try something similar, i.e. preallocate all the buffers on the main worker, and then use them on the other workers. However, this doesn’t work for two reasons:

  1. If I work with multiple compute nodes, the total buffer size may far exceed the memory of the main worker.
  2. This would require the buffers to be sent over the network somehow, with is something I don’t want.
    Indeed, I want the ownership of the buffers to be fully with the workers, and it is even fine if the main worker can never access the buffers. However, I haven’t found a reliable way to do this.

The target result would look something like this

Distributed.@distributed for c_inds in column_index_batches
  buffer = rand(size(data, 1), 1_000) # <- How can we preallocate this?
  buffer .= data[:, c_inds]     # <- Use it here.
  res = svd!(buffer)
  # do something with the res, e.g.
  return res.S[1] / res.S[end]
end

but with the buffers preallocated per worker somehow.

I have for example tried

@everywhere mutable struct MyMethodDistributed{T}
    buffers::Dict{Int, Matrix{T}}
end

for w in workers()
    remotecall_fetch(w, method) do method
        method.buffers[myid()] = rand(size(data, 1), 1_000)
    end
end

@distributed for (c_inds, method) in collect(zip(column_index_batches,
                                                 Iterators.repeated(method)))
  buffer = method.buffers[myid()]   # <- Grab the Thread local buffer here.
  buffer .= data[:, c_inds]     # <- I'm using the buffer here!
  res = svd!(buffer)
  # do something with the res, e.g.
  return res.S[1] / res.S[end]
end

but for larger buffers, this totally crashes my computer, which has me believe that the buffers are actually still send around, or there is some memory leak. However, I would imagine that the solution should look something like that.

Any insights are appreciated. Thanks!

On second thought, maybe I can try once more using DistributedArrays.jl. I have tried it unsuccesfully in the past, but looking at the documentation again it might work the second time around :wink:

Indeed, this seems to do the trick!

using Distributed
addprocs(4)
@everywhere using DistributedArrays, LinearAlgebra
@everywhere mutable struct MyMethodDistributed{T}
    buffers::DArray{T}
end

data = rand(100, 100_000);  # have to run this again after adding procs

method = MyMethodDistributed(
    dzeros((size(data, 1), 1_000*nworkers()),
           workers(), [1, nworkers()])
)

@sync @distributed vcat for c_inds in column_index_batches                         
  buffer = localpart(method.buffers)   # <- Grab the worker local buffer here.
  buffer .= data[:, c_inds]     # <- use the buffer here!               
  res = svd!(buffer)                                                          
  # do something with the res, e.g.                                           
  return res.S[1] / res.S[end]                                                
end