Pmap slow compared to map

I am trying to write a parallel image cross-correlation algorithm and I am not sure why I do not get the performance with pmap as expected.

So basically, I have two images (im1 and im2). I take smaller slices (21x21) of both at the same location and compare them by cross-correlation. For each pair of slices, I am getting a cross-correlation matrix.
This is my code:

# taking both images and a tuple with coordinates where the slices are taken
@everywhere function crossCorr_par(im1::SharedArray{Float64}, 
                                   im2::SharedArray{Float64}, 
                                   t::Tuple{Int64, Int64})
    a, b = t[1], t[2] # the tuple distributed by map, coordinates of the slices in the large images
    s = 20 # the size of the image slices
    image = Array(im1[a:a+s, b:b+s]) # create local (per worker) arrays "template" and "image" (of Array type) as slices from the bigger shared arrays
    template = Array(im2[a:a+s, b:b+s])
    
    # preparing "image" for cross-correlation and creating a local "out" array to write the results of the cross-cor function - cross-cor matrix
    image_pad = padarray(image, Fill(0.0::Float64, (size(template)[1]-1, size(template)[2]-1)))
    image_pad = image_pad[indices(image_pad)[1], indices(image_pad)[2]]::Array{Float64, 2}
    
    outsize=tuple([size(image)...] + [size(template)...] - 1...)
    out = zeros(Float64, outsize)
   
    # the actual calculation happens here
    # all arguments are local to the respective worker
    # slide the template over the image and calculate the cross-cor coefficient for each displacement
    e = CrossCorrelation(out, image_pad, template, outsize);
    return e
end

@everywhere function CrossCorrelation(out::Array{Float64, 2}, image::Array{Float64, 2},
        template::Array{Float64, 2}, outsize::Tuple{Int64, Int64})
    
    @inbounds begin
        for i2=1:outsize[2], i1=1:outsize[1] # looping true the rows of a column first!
            s = 0.0::Float64
            for y=1:size(template)[2], x=1:size(template)[1] # a lot of looping and then accesing individual items from the two arrays
                @fastmath s += template[x, y] * image[i1+x-1, i2+y-1] # actual calculations
            end
            out[i1, i2] += s # writes to local array on the respective worker
        end
    end

    return out
end

n = 100 # the image size
im1 = rand(1.0:2.0, n, n) # this is supposed to be the original image
im2 = copy(im1); # This is the template
im1_shared = SharedArray(im1); # these are for the parallel version
im2_shared = SharedArray(im2);

# A collection of tuples which contain the coordinates of the small slices of the images im1 and im2
iterations = [ (1, 1)   (1, 22)   (1, 43)   (1, 64), 
               (22, 1)  (22, 22)  (22, 43)  (22, 64)
               (43, 1)  (43, 22)  (43, 43)  (43, 64)
               (64, 1)  (64, 22)  (64, 43)  (64, 64)] # this is for the n = 100 image size

@time c = map(x -> crossCorr(im1, im2, x), iterations);
@time c_par = pmap(x -> crossCorr_par(im1_shared, im2_shared, x), iterations);

This is the output with image size n = 100
0.092365 seconds (13.11 k allocations: 5.595 MiB, 4.79% gc time)
0.046134 seconds (19.15 k allocations: 1.699 MiB)

pmap is almost twice as fast as map. Now, if I take a larger image/array (n = 200):
0.374411 seconds (32.14 k allocations: 53.488 MiB, 2.05% gc time)
1.361117 seconds (101.23 k allocations: 12.442 MiB, 0.23% gc time)

or n = 500:
1.030963 seconds (71.29 k allocations: 152.065 MiB, 13.02% gc time)
9.571784 seconds (269.64 k allocations: 34.470 MiB, 0.07% gc time)

Note: The function crossCorr() for the serial version is written for Array types instead of SharedArrays. Everything else is identical.

So, the larger the images, the more time the 4 cores need to process the slices, compared to a single core dealing with all chunks sequentially. Some thoughts to this:

  1. I read about the overhead problem, where, if I get it correctly, the task on each worker has to be fairly large, otherwise the management etc. of the tasks add a lot to time needed for completion. So this might be the problem here, but what is a “large” or comp expensive task where pmap is showing a speedup? The actual calculations in my code are just some sums and multiplications…
  2. The images im1 and im2 are of SharedArray type in the parallel version, so available to all workers and there is no transfer of data across workers, at least not obvious to me. Also, the functions are available everywhere - thanks to @everywhere.
  3. I hope that I have a type stable code. I do change SharedArray slices to Array, but hope that this is not the bottleneck. The crossCorrelation function is compatible with the Array type.
  4. Since Julia is reading arrays column-wise, I am not sure what happens with two dimensional ones. I wrote the code to access one column and then loop over all row entries in it, then proceed to the next column etc.
  5. Each worker is taking slices from the shared arrays and processing them independently. This should be a perfect example of an algorithm that could be executed in parallel, right?

Could you please help me find the error in my understanding of writing parallel code, if there is any. Thanks…

Hello,

Your issue is similar to mine from last month; basically, closures in pmap are slow when passing large objects (like your 100x100 im1 and im2 arrays). There is a github issue with a bit more detail.

The simplest way to fix the issue seems be using the cache pool:

pool = CachingPool(workers())
pmap(pool, x -> crossCorr_par(im1_shared, im2_shared, x), iterations)

It looks like the CachingPool approach was intended to become default in pmap, based on this pull request from last year, but the proposed change remains under review after a year.

3 Likes

Using the caching pool with n = 500

0.289320 seconds (29.08 k allocations: 42.033 MiB, 2.79% gc time)
0.495277 seconds (140.55 k allocations: 12.615 MiB)

both map and pmap are faster cmopared to the initial performance of 1 vs 9 seconds in my post.

This is the output of n = 1000 with the caching pool:

map: 1.185926 seconds (80.23 k allocations: 173.439 MiB, 9.57% gc time)
pmap: 1.331276 seconds (385.24 k allocations: 41.469 MiB, 9.29% gc time)

With n = 3500 and usint @btime:

12.535 s (854240 allocations: 2.11 GiB)
12.996 s (4127005 allocations: 476.04 MiB)

The processes take almost the same amount of time. Although, the memory usage is less in pmap.
Still, there is no reason to use pmap. It is slower than map. Any thoughts on this?

A few comments:

(1) Is there a reason you don’t use shared memory parallelism (e.g. Threads.@threads? I haven’t looked carefully, but it looks like you never try to write to the same piece of memory on two different workers.

(2) Is there a reason you don’t work with views or SubArrays of your SharedArray? I could be wrong, but I think parts of your code, such as Array(im1[a:a+s, b:b+s]), are adding unnecessary allocations.

(3) I don’t really know what you’re trying to do with this block:

for i2=1:outsize[2], i1=1:outsize[1] # looping true the rows of a column first!
            s = 0.0::Float64
            for y=1:size(template)[2], x=1:size(template)[1] # a lot of looping and then accesing individual items from the two arrays
                @fastmath s += template[x, y] * image[i1+x-1, i2+y-1] # actual calculations
            end
            out[i1, i2] += s # writes to local array on the respective worker
        end

but it looks like you could probably do something like:

out[i1,i2]= sum( template .* view(image,i1:(i1+x-1),i2:(i2+y-1)))

to simplify your inner loop.
Or, since you’re allocating out every time anyways, it would probably look cleanest with something like:

out= [sum( template .* view(image,i1:(i1+x-1),i2:(i2+y-1))) for  i2=1:outsize[2], i1=1:outsize[1]]

(4) It would make it easier to provide advice if you posted the 3-input crossCorr used for map.

Thanks for your reply!

(1) Is there a reason you don’t use shared memory parallelism (e.g. Threads.@threads ? I haven’t looked carefully, but it looks like you never try to write to the same piece of memory on two different workers.

I read that, in Julia, the threads are still in an experimental phase and not as mature as in python. This is why I went directly for the parallel processing with pmap and the like.

(2) Is there a reason you don’t work with views or SubArrays of your SharedArray ? I could be wrong, but I think parts of your code, such as Array(im1[a:a+s, b:b+s]) , are adding unnecessary allocations.

It does decrease allocations and time:

n = 500
map: 273.259 ms (16402 allocations: 41.39 MiB) 
map with views: 191.830 ms (14286 allocations: 37.64 MiB) 
pmap cause its terrible: 3.119 s (70724 allocations: 9.28 MiB)

but it looks like you could probably do something like:

out[i1,i2]= sum( template .* view(image,i1:(i1+x-1),i2:(i2+y-1)))

to simplify your inner loop.

I tried it. This is the output:

map with loops: 255.730 ms (16402 allocations: 41.39 MiB)
map with simplified loop: 1.376 s (2682033 allocations: 3.18 GiB)

Coming from python, I used to write it like this before - avoiding nested loops and vectorizing. In Julia, writing out loops is really much faster. At least that is something you read in the documentation and see in practice.

(4) It would make it easier to provide advice if you posted the 3-input crossCorr used for map .

This is the code for crossCorr (Notice: I added the views):

function crossCorr(im1::Array{Float64}, im2::Array{Float64}, t::Tuple{Int64, Int64})
    a, b = t[1], t[2]
    s = 20
    template = view(im2, a:a+s, b:b+s)
    image = view(im1, a:a+s, b:b+s)
    
    image_pad = padarray(image, Fill(0.0::Float64, (size(template)[1]-1, size(template)[2]-1)))
    image_pad = image_pad[indices(image_pad)[1], indices(image_pad)[2]]::Array{Float64, 2}
    outsize=tuple([size(image)...] + [size(template)...] - 1...)
    out = zeros(Float64, outsize)
   
    e = CrossCorrelation_view(out, image_pad, template, outsize);
    return e
end

The only difference to crossCorr_par is the Array types iit works with - instead of SharedArrays.

n = 100

@btime c = map(x -> crossCorr(im1, im2, x), iterations);
@btime c_view = map(x -> crossCorr_view(im1, im2, x), iterations);
@btime c_par = pmap(x -> crossCorr_par(im1_shared, im2_shared, x), iterations);
@btime c_par_view = pmap(x -> crossCorr_par_view(im1_shared, im2_shared, x), iterations);

7.144 ms (499 allocations: 1.25 MiB)
7.472 ms (435 allocations: 1.14 MiB)
10.244 ms (2262 allocations: 326.09 KiB)
17.122 ms (2294 allocations: 326.45 KiB)

Views, sharedarrays, pmap and all three combined (with CachingPool) are not faster than the sequential processing by map!

Hmm. I don’t know enough about how Julia does shared memory parallelism to be able to speak to where the issue is. Maybe ProfileView or @profiler would help identify where the time is spent, though these may not help if the slowdown is in sending to and collecting from workers.

At least in this case, I would suggest using KissThreading’s tmap! or a simple Threads.@threads loop.

1 Like

Your tasks seem to be quite speedy. I believe that even with a CachingPool, there is still substantial IO overhead associated with pmap (memory transfer to workers) when compared to map. I think if your function took longer to run, you’d see more of a performance difference.

I could be wrong, but I believe that’s what I’ve experienced in the past and also has been echoed elsewhere in discussions like this.

I’d second this.

2 Likes

Just use it, it’s fine. It’s experimental and might change a little bit to be task-based soon but if you want it working now it works just fine.

(And it’s more mature than Python because it actually exists…)

4 Likes

I reproduced your code, and tried a couple things. (nb: try to include your using statements to help us do so, especially for those of us unfamiliar with the Images library, and also specify the Julia version). The typeassert in crossCorr() wouldn’t work, so I wrapped the line in a convert(Array{Float64,2}) instead.

image_pad = convert(Array{Float64,2}, image_pad[indices(image_pad)[1], indices(image_pad)[2]])

Aside from that, I didn’t make important changes. I ran the code in Julia 0.7 stable (Windows), and measured timings with BenchmarkTools. I used up to 4 cores (Skylake).

Long story short, I am not able to reproduce the issue. I ran parallel maps over both crossCorr and crossCorr_par, but will focus on comparing apples to apples with crossCorr (making im1 and im2 available to all procs), not that it made much of a difference.

@everywhere n = 100 # the image size
@everywhere im1 = rand(1.0:2.0, n, n) # this is supposed to be the original image
@everywhere im2 = copy(im1); # This is the template
im1_shared = SharedArray(im1); # these are for the parallel version
im2_shared = SharedArray(im2);


@btime c = map(x -> crossCorr(im1, im2, x), iterations);
30.616 ms (5439 allocations: 2.38 MiB)
@btime c_par1 = pmap(x -> crossCorr_par(im1_shared, im2_shared, x), iterations); #4 cores
12.168 ms (1616 allocations: 312.69 KiB)
@btime c_par2 = pmap(x -> crossCorr(im1, im2, x), iterations); #4 cores
10.813 ms (1612 allocations: 312.50 KiB)

Reducing the number of workers caused a meaningful drop in performance

@btime pmap(WorkerPool([1]), x -> crossCorr(im1, im2, x), iterations);
31.881 ms (4245 allocations: 2.34 MiB)
@btime pmap(WorkerPool(workers()[1:2]), x -> crossCorr(im1, im2, x), iterations);
21.848 ms (1783 allocations: 308.09 KiB)
@btime pmap(WorkerPool(workers()[1:3]), x -> crossCorr(im1, im2, x), iterations);
15.995 ms (1782 allocations: 318.03 KiB)
@btime pmap(WorkerPool(workers()[1:4]), x -> crossCorr(im1, im2, x), iterations);
12.340 ms (1785 allocations: 328.05 KiB)

Using CachingPool doesn’t seem to make a big difference, since im1 and im2 are not terribly large.

@btime pmap(CachingPool([1]), x -> crossCorr(im1, im2, x), iterations);
33.224 ms (4225 allocations: 2.34 MiB)
@btime pmap(CachingPool(workers()[1:2]), x -> crossCorr(im1, im2, x), iterations);
18.813 ms (2000 allocations: 312.92 KiB)
@btime pmap(CachingPool(workers()[1:3]), x -> crossCorr(im1, im2, x), iterations);
14.215 ms (2005 allocations: 323.63 KiB)
@btime pmap(CachingPool(workers()[1:4]), x -> crossCorr(im1, im2, x), iterations);
11.228 ms (2019 allocations: 334.78 KiB)

Finally, for the sake of optimization, I tried to reduce data movement by sending iterations in chunks, rather than one at a time:

chunks=4;
n=length(iterations);
new_iterations = [iterations[i:chunks:n] for i in 1:chunks]
4-element Array{Array{Tuple{Int64,Int64},1},1}:
 [(1, 1), (1, 22), (1, 43), (1, 64)]
 [(22, 1), (22, 22), (22, 43), (22, 64)]
 [(43, 1), (43, 22), (43, 43), (43, 64)]
 [(64, 1), (64, 22), (64, 43), (64, 64)]

@everywhere function crossCorr(
  im1::Array{Float64}, 
  im2::Array{Float64}, 
  t::Array{T,1}
  ) where T <: Tuple
  
  [crossCorr(im1,im2,i) for i in t]
end 

@btime pmap(WorkerPool([1]), x -> crossCorr(im1, im2, x), new_iterations);
31.557 ms (4205 allocations: 2.34 MiB)
@btime pmap(WorkerPool(workers()[1:2]), x -> crossCorr(im1, im2, x), new_iterations);
18.070 ms (845 allocations: 274.38 KiB)
@btime pmap(WorkerPool(workers()[1:3]), x -> crossCorr(im1, im2, x), new_iterations);
17.572 ms (834 allocations: 283.97 KiB) #there are 4 chunks...
@btime pmap(WorkerPool(workers()[1:4]), x -> crossCorr(im1, im2, x), new_iterations);
10.310 ms (837 allocations: 293.50 KiB)

@btime pmap(x -> crossCorr(im1, im2, x), new_iterations); #same but no call to WorkerPool()
8.960 ms (666 allocations: 277.95 KiB)
@btime c_par2 = pmap(x -> crossCorr(im1, im2, x), iterations); #compare to regular call
10.948 ms (1615 allocations: 312.44 KiB)

Which helps somewhat when using 4 cores, and I imagine would scale favorably as iterations gets larger

In conclusion, the other posters and I are pointing out some potentially helpful strategies to optimize pmap (which does have its quirks), but I am not sure this is what is happening here. Did you verify all workers were properly added? Or perhaps you are distributing your computations across networked machines?

1 Like

Thanks a lot for the extended benchmarking. I cant find the cause yet. My initial code was in Julia 0.6.

Using Julia 1.0, I get for n = 100 and @everywhere im1, im2:

@btime c = map(x → crossCorr(im1, im2, x), iterations);
9.461 ms (226 allocations: 1.81 MiB)
@btime c_par1 = pmap(x → crossCorr(im1, im2, x), iterations);
11.088 ms (1688 allocations: 298.72 KiB)
@btime c_par2 = pmap(x → crossCorr_par(im1_shared, im2_shared, x), iterations);
13.949 ms (1661 allocations: 298.56 KiB)

You benchmarked with n = 100. In this case, my setup was also faster with pmap. What about larger matrices? Do you get a noticable speedup?

Or perhaps you are distributing your computations across networked machines?

No not, it is all on one machine

So, setting batch_size seems to improve performance and lets pmap look like it is doing what it should:

for n = 200:

@btime c = map(x → crossCorr(im1, im2, x), iterations);
48.657 ms (1136 allocations: 9.14 MiB)
@btime c_par1 = pmap(x → crossCorr(im1, im2, x), iterations);
108.986 ms (8193 allocations: 1.37 MiB)
@btime c_par1 = pmap(x → crossCorr(im1, im2, x), iterations, batch_size=10000);
26.171 ms (2394 allocations: 1.17 MiB)

Can anyone tell me why this is not default and what batch_size one should take. 10 000 was just a random number I picked.