Preallocating large matrices in all cores is slower than allocating every time

Hi everyone,

I am trying to improve a piece of code of mine (numerical simulations for a scientific research project) to scale it up.

It essentially involves running one computationally and memory heavy function for several trials and then reducing across trials. A pseudocode example:

using Distributed
addprocs()

@everywhere hard_function(x) # allocates big matrices, does stuff with them and returns an array

results = @distributed (+) for _ = 1:K # we don't want the individual results, just their sum
    hard_function(x)
end

This was my initial code, which was a bit slow, so I thought the fact that I am allocating big matrices (e.g. 100 \times 10^6) for every one of the K trials would be a problem. So I decided to try something else:

@everywhere matrices = preallocate(matrices)
@everywhere hard_function!(matrices, x) # defines same function, but using preallocated matrices

results = @distributed (+) for _ = 1:K
    hard_function!(matrices, x)
end

My reasoning being that in this way I would allocate things only once, for every worker.

It turns out it made things slower and allocates more memory (according to @time) :confused:

I tried looking for other sources to this discrepancy, but couldn’t find an explanation so far, so maybe thought I’d ask here and check if I’m not trying anything stupid or if my original idea was wrong from the start.

Thanks a lot!
Lucas

Maybe local matrices are transferred to the workers and you are seeing the cost of de/serialization? Maybe you can do

@everywhere begin
    matrices = ...
    getmatrices() = matrices
end

results = @distributed (+) for _ = 1:K
    hard_function!(getmatrices(), x)
end

Here is a demo. Let us instrument serialize function first:

julia> @everywhere begin
       using Serialization: Serialization, AbstractSerializer

       struct PrintSerialize
           x
       end

       function Serialization.serialize(ser::AbstractSerializer, x::PrintSerialize)
           value = x.x
           @info "Serializing:" value
           return invoke(Serialization.serialize, Tuple{AbstractSerializer,Any}, ser, x)
       end
       end

Define a global and a getter function:

julia> @everywhere begin
           GLOBAL = PrintSerialize([123])
           getglobal() = GLOBAL
       end

If I use getglobal function, GLOBAL is not serialized

julia> @distributed (+) for x in 1:3
           @show (myid(), objectid(getglobal()))
           x
       end
      From worker 2:    (myid(), objectid(getglobal())) = (2, 0x2ef8e7048e9c0500)
      From worker 2:    (myid(), objectid(getglobal())) = (2, 0x2ef8e7048e9c0500)
      From worker 3:    (myid(), objectid(getglobal())) = (3, 0xd5452e7af34c434c)
6

However, if I use GLOBAL variable, it is serialized

julia> @distributed (+) for x in 1:3
           @show (myid(), objectid(GLOBAL))
           x
       end
┌ Info: Serializing:
│   value =
│    1-element Vector{Int64}:
â””     123
┌ Info: Serializing:
│   value =
│    1-element Vector{Int64}:
â””     123
      From worker 3:    (myid(), objectid(GLOBAL)) = (3, 0x2c14a9407614be5b)
      From worker 2:    (myid(), objectid(GLOBAL)) = (2, 0x66a2f372e6cb38ae)
      From worker 2:    (myid(), objectid(GLOBAL)) = (2, 0x66a2f372e6cb38ae)
6
3 Likes

FYI, as of FLoops.jl v0.1.10, the private variables created with @init are not serialized when used with Distributed. So, this should just work:

@floop DistributedEx() for x in 1:K
    @init matrices = ...
    y = hard_function!(matrices, x)
    @reduce result += y
end
use(result)

Ref: https://github.com/JuliaFolds/FLoops.jl/pull/80

2 Likes

Hi @tkf,

Thank you for your suggestions. I tried both of them:

  • the first one didn’t help (maybe my fault as I’m not quite sure of what is going on - I had the impression that I was already accounting for the effect of “re-sending matrices” when I preallocated them inside the worker with @everywhere ? what is the point of calling getmatrices()?)
  • the second one gave me the error LoadError: syntax: invalid syntax in "local" declaration around /home/lucas/.julia/packages/FLoops/yli1G/src/reduce.jl:615

Sorry for the delayed reply.

As I demonstrated in my first comment, it wasn’t working the way you expected. Distributed.jl tries to send the globals like matrices to remote workers, if the globals are mentioned in @distributed body. The function getmatrices is a way to workaround this problem; i.e., it let us not mention the globals in @distributed.

It’s hard to tell what went wrong without an MWE.

See also: Please read: make it easier to help you

1 Like

Sorry, you’re right that I haven’t been helping myself very much…

I just referred to that error in case you knew (from experience) where it might have come from.

Regarding the MWE, my original code is long and confusing, hence why I’ve been trying to go with pseudocode, but you’re right, let me try that:

using Distributed, FLoops

addprocs(3)

@everywhere begin
    using Random
    simulate!(stats) = randn!(stats)
end 

M = 10; K = 10;
aggregated_stats = zeros(M+1, 10);

@floop DistributedEx() for i in 1:K
    @init stats = zeros(M+1, 10)

    simulate!(stats)
    @reduce(aggregated_stats += stats)
end

this does not give the issue I had before, but another one: UndefVarError: #__##oninit_function#261 not defined (please find the full stacktrace here)
Even though it is a different error than before, I’m saying it because I think it might help me to understand what is going on.

Thanks for taking the time to help me.

Thank you, I understood that from your comment (although for some mistake of mine it didn’t translate in the same way on my code…)

My confusion comes from not understanding why that is the default behaviour. Don’t we already have $ to signal when we want the variable to be sent over? Wouldn’t it be better to have as default just using the local variable?

You’d need @everywhere to define the loop first. Also, the initialization of aggregated_stats is not FLoops-compatible. The easiest way is to let FLoops initialize it (mwe). If you want to re-use the memory for aggregated_stats, you can do it like mwe2.

using Distributed

addprocs(3)

@everywhere using FLoops

@everywhere begin
    using Random
    simulate!(stats) = randn!(stats)
end

@everywhere function mwe(; M = 10, K = 10)
    @floop DistributedEx() for i in 1:K
        @init stats = zeros(M+1, 10)

        simulate!(stats)
        @reduce(aggregated_stats += stats)
    end
    return aggregated_stats
end

@everywhere function mwe2(; M = 10, K = 10)
    @floop DistributedEx() for i in 1:K
        @init stats = zeros(M+1, 10)
        simulate!(stats)
        @reduce() do (aggregated_stats = zeros(M+1, 10); stats)
            aggregated_stats .+= stats
        end
    end
    return aggregated_stats
end

mwe()
mwe2()

For more information, see Parallel loops · FLoops and How to do X in parallel? · FLoops

Yeah, I think that’s a legitimate assumption but it’s also useful to know that there are a lots of “magics” (implicit handling) in Distributed.jl when it comes to serialization. The issue we are seeing is that globals and constants referenced from closure are automatically serialized:

julia> X = 0;  # no @everywhere

julia> remotecall_fetch(2) do
           @show X
       end
      From worker 2:    X = 0
0

Magics like this are convenient when working interactively but unfortunately it makes reasoning about performance hard in cases like your code in the OP.

1 Like