Strange memory allocation with DistributedArrays

Hello!

I noticed a strange thing while I was playing with distributed arrays. Here is an example of the code:

using Distributed; addprocs(2)
@everywhere using DistributedArrays

function create_big_darray(n)
    darr = dzeros(
        Int,
        (n, nworkers()),
        workers(),
        [1, nworkers()]
    )
end

function do_something_with_darray!(darr)
    @sync for worker in workers()
        @spawnat worker fill!(darr.localpart, 1)
    end
end

function fetch_result!(result, index, darr)
    result .= darr[:, index]
end
    
n = 2^27
darr = create_big_darray(n)
do_something_with_darray!(darr)

result = zeros(Int, n)
@allocated fetch_result!(result, 1, darr)  #2.4G allocations

I find strange that fetch_result! allocates memory. I think it should just copy the values to the preallocated array.

Is there a way to copy the result from a worker to an existing array without any allocations?

Can you be more specific about what you find strange?

I find strange that fetch_result! allocates memory. I think it should just copy the values to the preallocated array.

I added this to the original question.

Have you run with julia --track-allocation=user? I have run it right now, and it gives basically no allocation for the parts of you code where you allocate memory, and give basically all allocations in the fetch_result! so it can be that it is misatributting the source of the allocations, however, I have little experience with distributed computing in Julia, and it can be a known behavior of distributed code. I think fetch_result! will allocate a little anyway if it is the first time ran, as it needs to compile, but this would be not 2.4GiB.

Thank you for pointing that out to me, I should have tried it myself. I don’t know how --track-allocation should behave in this situation.

However, when I run htop and observe the occupied memory, I see a different picture. The program does allocate when I create arrays. It allocates even more memory when I call fetch_result!

2^27 integers need 1 gigabyte of RAM. So I think that the whole program should use approximately 3GiB. However, on my machine, htop shows more than 5GiB. Maybe I should use something else to track the allocations?

hmmm, has occurred to me right now that maybe it is just that darr[:, index] is allocating. Have you tried with a view? (i.e. use @view in front of darr[:, index]) It is possible that slicing the array this way returns a copy (I am not sure because it is not a vanilla Array, but in vanilla Array the slicing allocates a copy for the slice).

Yes, I tried view(darr, :, index) instead of darr[:, index], and it gives the same result. The DistributedArrays documentation says that

Indexing a DArray (square brackets) with ranges of indices always creates a SubArray , not copying any data.

https://juliaparallel.github.io/DistributedArrays.jl/stable/

fetch_results! is moving data between the processes that requires temporary buffers at the very least. Julia is also a garbage collected language, so just because you no longer have a reference to an object doesn’t mean it’s freed back to the system. You could run GC.gc() to force a garbage collection but even then Julia might “free” the memory so it can reuse it, but it might not release it back to the OS.

I’m not sure exactly what even triggers a garbage collection. Some languages, GO for example, use a 100% increase of memory will trigger a garbage collection, so if after the last collection 1GB was allocated, it won’t run the garbage collector again until 2GB is allocated. If Julia is using the same criteria and you just allocated 1GB that could easily see the memory hit over 2GB before another garbage collection is triggered.

–track-allocation=user might tell you what is allocating memory or if those temporary buffers are in the guts of the Julia code, it might not. Generally speaking this is why you don’t find too many garbage collection languages used on embedded (low memory) devices, because you can’t accurately predict memory usage.

On the other hand garbage collection is great with multi-threaded programs. Ensuring that memory is not freed in thread 1 while still being used in thread 2 requires careful protection. With garbage collection you only need to “solve” the problem once in the garbage collector and your program doesn’t have to worry about it.

2 Likes

I think there may be a bug in DistributedArrays, if you write the function by hand to do what the package should be doing here:

julia> function fetch_result!(result, index, darr)
           result .= @fetchfrom index+1 view(darr[:L],:,1)
       end
fetch_result! (generic function with 1 method)

julia> @time fetch_result!(result, 2, darr);
  1.192381 seconds (99.22 k allocations: 1.005 GiB, 4.84% gc time)

You see that it allocates 1 Gb which is the minimum. 1Gb is sent from worker 3 to master, it has to go into temporary array that gets garbage collected.

The bug is that there is 2 Gb allocated, which is not justified in the function above. I think I may have an idea where this is happening. I will try to file an issue with the package.

However, from what I have seen, the package needs some attention and updates that it has not had in a long time.

Filed here https://github.com/JuliaParallel/DistributedArrays.jl/issues/215.

3 Likes

@raminammour, @pixel27 Thank you for your answers! :slight_smile:

I still have a question (sorry, if it is common knowledge). Am I right that there is a fundamental reason that forbids one to copy from one worker to a given place in another worker’s memory without creating a temporary buffer? Is it connected to thread-safety issues?

Don’t think it’s a thread safety issue. I believe the data is passed to the serialize function to be encoded for and sent to the network card. So it isn’t just pointing the network card at the memory location and saying “send that”. That said, I don’t believe the whole object is serialized before transmission, it’s probably serialized into a buffer, when the buffer is full it’s sent. Don’t know if the buffer is reused or a new one is allocated, or if there is a pool of N buffers that are reused.

With a lot of Julia’s library functions you can step into them so you might be able to trace down and see what the code is actually doing.

1 Like

I got it, thank you!