Hello all,
I am in the process of incrementally optimizing an algorithm, having focused first on single-core performance, then threading, and now want to scale to Distributed workers.
One key element of my optimizations is reducing allocations by allocating a large buffer once, and then using it for every iteration. However, I haven’t been able to setup up “worker local buffers” to the Distributed.jl setting.
To illustrate, I want to speed up something roughly like this:
using StatsBase: sample
using LinearAlgebra: svd!
# set up struct that holds the preallocated buffer
mutable struct MyMethod{T}
buffer::Matrix{T}
end
# set up some work....
data = rand(100, 100_000);
# we will pick random columns from the data and work on them, by copying them to a buffer
column_index_batches = [sample(axes(data, 2), 1_000) for _ in 1:500]
method = MyMethod(zeros(size(data, 1), 1_000)) # <- preallocate buffer here once
# do the actual work
for c_inds in column_index_batches
method.buffer .= data[:, c_inds] # <- I'm using the buffer here!
res = svd!(method.buffer) # <- no more allocations, yay!
# do something with the res, e.g.
return res.S[1] / res.S[end]
end
Note that this is not my actual code, but roughly outlines my approach.
Multi-threading also works fine with this approach. I can similarly preallocate a buffer for each thread (although you need to be weary of recent discussion around thread-local state).
mutable struct MyMethodThreaded{T}
buffers::Vector{Matrix{T}}
end
method_threaded = MyMethodThreaded(
[zeros(size(data, 1), 1_000)
for _ in 1:Threads.nthreads()]);
# Note :static scheduling to allow using Thread local storage.
Threads.@threads :static for c_inds in column_index_batches
buffer = method.buffers[Threads.threadid()] # <- Grab the Thread local buffer here.
buffer .= data[:, c_inds] # <- I'm using the buffer here!
res = svd!(buffer)
# do something with the res, e.g.
return res.S[1] / res.S[end]
end
So far so good. Now, coming to my actual question, is how to do this via Distributed.jl, when using multiple compute nodes.
Naively, I would again try something similar, i.e. preallocate all the buffers on the main worker, and then use them on the other workers. However, this doesn’t work for two reasons:
- If I work with multiple compute nodes, the total buffer size may far exceed the memory of the main worker.
- This would require the buffers to be sent over the network somehow, with is something I don’t want.
Indeed, I want the ownership of the buffers to be fully with the workers, and it is even fine if the main worker can never access the buffers. However, I haven’t found a reliable way to do this.
The target result would look something like this
Distributed.@distributed for c_inds in column_index_batches
buffer = rand(size(data, 1), 1_000) # <- How can we preallocate this?
buffer .= data[:, c_inds] # <- Use it here.
res = svd!(buffer)
# do something with the res, e.g.
return res.S[1] / res.S[end]
end
but with the buffers preallocated per worker somehow.
I have for example tried
@everywhere mutable struct MyMethodDistributed{T}
buffers::Dict{Int, Matrix{T}}
end
for w in workers()
remotecall_fetch(w, method) do method
method.buffers[myid()] = rand(size(data, 1), 1_000)
end
end
@distributed for (c_inds, method) in collect(zip(column_index_batches,
Iterators.repeated(method)))
buffer = method.buffers[myid()] # <- Grab the Thread local buffer here.
buffer .= data[:, c_inds] # <- I'm using the buffer here!
res = svd!(buffer)
# do something with the res, e.g.
return res.S[1] / res.S[end]
end
but for larger buffers, this totally crashes my computer, which has me believe that the buffers are actually still send around, or there is some memory leak. However, I would imagine that the solution should look something like that.
Any insights are appreciated. Thanks!