Strange comportment of @distributed

Hi, I’m not use to write down question but I’ve long search on google and I can’t find answer to my problem. I trying to implement the algorithm of sugimoto et al concerning Compressive Bilateral Filter (DOI: 10.1109/TIP.2015.2442916) using their O(1) gaussian filter (DOI: 10.1109/ICIP.2013.6738106). I succeded to implement it and I’m now trying to optimized it using distributed loops, but when I run the @distributed version I only got a zeros arrays as results.
As I forecast this might not be clear here is the code I use for parallele and serial loop.
The O(1) gaussian filter: (for the non distributed version just remove the @distributed)

function Fastgauss(img::Array{Float64}, σ::Union{Float64,Array{Float64}}, K::Int64=3)
        # Derivation of the Gaussian spatial kernel error
        function Dkernelerror(R::Int64, σ::Float64)
            ϕ = (2*R+1)/(2*σ)
            ψ = π*σ*(2*K+1)/(2*R+1)
            # derivative of Es(R)+Ef(K,R) = erfc(ϕ)+(erfc(ψ)-erfc(πσ)) wrt R
            return (sqrt(π)*σ*(2*K+1)*exp(-ψ*ψ))/(R*R+R+0.25)-(2*exp(-ϕ*ϕ))/(sqrt(π)*σ)
        end
        # Binary search
        low = Int64(floor(2*σ))
        up = Int64(ceil(4*σ))
        while up > low+1
             mid = Int64(round((low+up)/2))
             if Dkernelerror(mid, σ) < 0
                 low = mid
             else
                 up = mid
             end
         end
         if abs(Dkernelerror(low, σ)) <= abs(Dkernelerror(up, σ))
             R = low
         else
             R = up
         end
         # kernel min size = 3
         if R < 3
             R = 3
         end
        # Filter for one dimension
        function filter1d(f::Array{Float64,1}, σ::Float64, R::Int64, K::Int64)
            # output
            fo = zeros(length(f))
            # reflecting padding of f
            f = [f[(R+1):-1:2];f;f[(end-1):-1:(end-R-1)]]
            # precompute γkcos(ϕku) and G0
            ϕ = 2*π/(2*R+1)
            G0 = 1/(2*R+1)
            coeff = fill(Float64[],2)
            coeff[1] = 2 .*cos.(ϕ.*(1:K))
            coeff[2] = 2 .*exp.((-0.5*σ*σ*ϕ*ϕ).*(1:K).*(1:K)).*cos.((ϕ*R).*(1:K))
            F0 = sum(f[1:(2*R+1)])
            Zprev = zeros(K)
            Z = zeros(K)
            for k in 1:K
                γcos = (2*exp(-0.5*σ*σ*ϕ*ϕ*k*k)).*cos.((ϕ*k).*(-R:R))
                Zprev[k] = sum(γcos.*f[1:(2*R+1)])
                Z[k] = sum(γcos.*f[2:(2*R+2)])
            end
            fo[1] = G0*(F0+sum(Zprev))
            F0 += f[2*R+2]-f[1]
            for x in 1:(length(fo)-1)
                fo[x+1] = G0*(F0+sum(Z))
                # F0(x+1) = F0(x) + f(x+R+1) - f(x-R)
                F0 += f[2*R+2+x]-f[x+1]
                # Δ = f(x+R+1) - f(x+R) - f(x-R) + f(x-R-1)
                Δ = f[2*R+2+x]-f[2*R+1+x]-f[x+1]+f[x]
                for k in 1:K
                    ξ = coeff[1][k]*Z[k]-Zprev[k]+coeff[2][k]*Δ
                    Zprev[k] = Z[k]
                    Z[k] = ξ
                end
            end
            return fo
        end
        # apply filter1d in each dimension
        sz = size(img)
        # output image
        imgo = zeros(sz)
        # filter x
        @distributed for y in 1:sz[1]
            imgo[y,:] = filter1d(img[y,:],σ,R,K)
        end
        # filter y
        @distributed for x in 1:sz[2]
            imgo[:,x] = filter1d(imgo[:,x],σ,R,K)
        end
        return imgo
end

And the CompressIve Bilateral Filter :

function CBLF(img::Array{Float64}, guide::Array{Float64}, σs::Union{Float64,Array{Float64}}, σr::Float64, τ::Float64=0.005)
        @eval using SpecialFunctions
        # normalize tone of img and guide to [0,1]
        img = (img.-minimum(img[:]))./(maximum(img[:])-minimum(img[:]))
        guide = (guide.-minimum(guide[:]))./(maximum(guide[:])-minimum(guide[:]))
        # optimize for K
        ξ = erfcinv(τ*τ)
        K = Int64(ceil(ξ*ξ/(2*π)+ξ/(2*π*σr)-0.5))
        function Dkernelerror(T::Float64)
            ϕ = (π*σr*(2*K+1))/T
            ψ = (T-1.0)/σr
            # derivative of E(K,T) = erfc(ϕ)+erfc(ψ)
            return ((2*sqrt(π)*σr*(2*K+1))/(T*T))*exp(-ϕ*ϕ)-(2/(sqrt(π)*σr))*exp(-ψ*ψ)
        end
        # Binary search
        # extend search domain by 0.03
        low = σr*ξ+1.0-0.03
        up = (π*σr*(2*K+1))/ξ+0.03
        # loop 10times to find T
        for i in 1:10
            mid = (low+up)/2
            if Dkernelerror(mid) < 0
                low = mid
            else
                up = mid
            end
        end
        T = (low+up)/2
        b0 = ones(size(img))
        b = Fastgauss(img,σs)
        for k in 1:K
            ω = 2*π*k/T
            ak = 2*exp(-0.5*ω*ω*σr*σr)
            c = cos.(ω.*guide)
            s = sin.(ω.*guide)
            Ψc = Fastgauss(c,σs)
            Ψs = Fastgauss(s,σs)
            b0 += ak.*(c.*Ψc.+s.*Ψs)
            Ψc = Fastgauss(c.*img,σs)
            Ψs = Fastgauss(s.*img,σs)
            b += ak.*(c.*Ψc.+s.*Ψs)
        end
        return b./b0
end

For the sake of exemple I use the “lighthouse” from TestImages as it:

img = convert(Array{Float64},Gray.(testimage("lighthouse")));

When I run Fastgauss on this image I get a major improvement in computation time using @distributed loops : (using BenchmarkTools)

# distributed loops
@btime simg = Fastgauss(img,3.0);
634.200 μs (18 allocations: 3.00 MiB)
# serial loops
@btime simg = Fastgauss(img,3.0);
20.775 ms (34562 allocations: 27.54 MiB)


The result for Fastgauss in @distributed and serial version is the same. But even if it provide an improvement in running time the use of @distributed version of Fastgauss in CBLF produce a bad output (a zeros array).

# distributed loops
@btime fimg = CBLF(img,img,10.0,0.2);
83.776 ms (1318 allocations: 177.07 MiB)
# serial loops
@btime fimg = CBLF(img,img,10.0,0.2);
523.033 ms (588378 allocations: 738.37 MiB)

(I’m not able to add more than one picture but I can upload the 2 different output of CBLF on demande). Moreover I also notice that when I use btime on the @ditributed version it takes a long time to write down the execution time (~2/3min) and the memory consuption explode (~15Gb of RAM).
However the serial version of Fastgauss use by CLBF produce the desired output and I don’t get why the parallel one don’t ?
Thanks in advance for your help,
Geoffrey

Welcome @Geoffrey !

Your functions work well if not using distributed, so my guess is, the problem is not in your functions.
At first try the
@sync
macro just in front of @distributed, so e.g.:

        # filter x
       @sync @distributed for y in 1:sz[1]
           imgo[y,:] = filter1d(img[y,:],σ,R,K)
       end

I didn’t check if this solves your problem, its just a guess. I do not have the time now to create a MWE (Please read: make it easier to help you - #93) from your code. E.g. we do not know, if you use the @everywhere macro correctly.

If this does not solve your issue, you should start with a small and simple code example which is similar to your usage of distributed, e.g. filling some array (imgo) with results of a function with similar parameters (imgo), but make it small. The purpose of that is, if you do it, it is quite likely, that you may spot the error on your own, and for us it would be much easier to help you. Without trying I think it is just not possible to find out whats wrong with your code, because important parts are missing.

1 Like

Hi @oheil , thank you for your answer and sorry that my example is not easy to run. I test the add of @sync and indeed this work, the CBLF then give the right output, but I get no more inprovement on the running time.
I think my probleme is more due to a miss-understanding on how distributed loop works in julia. To be more precise the scheme of what I want to do is the following :
1/ I have a function (Fastgauss) in which I apply a small set of operation (filter1d) to each rows then to each columns and where the order doesn’t matter so it can be distributed. This function is usefull by itself but it can be called in other function.
2/ Then I need to use the previous function inside another one (here CBLF).

I try to make a MWE in which I replace the filter1d by a function which just add 1 to each element of the row/column and then I nested this MWE in a function that apply it 3 times as it :

# function 1/
function mwe(img::Array{Float64})
    function add_one(f::Array{Float64,1})
        fo = zeros(length(f))
        fo = f.+1
        return fo
    end
    sz = size(img)
    # output image
    imgo = zeros(sz)
    @distributed for y in 1:sz[1]
        imgo[y,:] = add_one(img[y,:])
    end
    @distributed for x in 1:sz[2]
        imgo[:,x] = add_one(imgo[:,x])
    end
    return imgo
end
#function 2/ (nested)
function mwenested(img::Array{Float64})
    imgo = img
    for _ in 1:3
        imgo = mwe(imgo)
    end
    return imgo
end

I was expected this mwe to produce the same error as my original code but there it is not the case. The expected output for mwe is that we add 2 to each element (and that’s what we get) and for mwenested is that we add 6 to each element (which we also get) as can be seen by simply trying :

test = mwe(zeros((100,100))); test2 = mwenested(zeros((100,100)));

Moreover the distributed version of this mwe also improve the running time :

# distributed loop in mwe
 @btime test = mwe(zeros((100,100)));
40.901 μs (18 allocations: 158.14 KiB)
# normal for loop in mwe
@btime test = mwe(zeros((100,100)));
72.600 μs (604 allocations: 681.41 KiB)

But here again when running btime on distributed loop it take a while to have the answer and julia memory usage raise a lot. I have to restart a new julia session to get back to a normal memory usage. I think that the memory consumption is underpinning something that may cause the problem but I can’t figure it out.
Moreover you mention @everywhere but I didn’t use this macro and in mwe function it seems to work if we forgot about the memory usage. I don’t know where @everywhere should be placed because if I add it before the Fastgauss function definition nothing change and if I had it before the Fastgauss call in CBLF I get an UndefVarError for the parameters used by Fastgauss.

Many thanks again for your help, and I hope this clarify a bit my problem,
Geoffrey

PS: I forget to mention that I’m using julia 1.3.1

Good work with the MWE!

With your new code I was able to reproduce that the results in test and test2 are good if running not distributed and are wrong (all zero or something else) if run on more cores. I think the main problem is, that your result array is not a SharedArray. Shared memory is needed for multiple cores (not threads!) to write to.

So the MWE which works is the following:

using Distributed

using SharedArrays

addprocs(4)

function mwe(img::AbstractArray{Float64})
    function add_one(f::AbstractArray{Float64,1})
        fo = zeros(length(f))
        fo = f.+1
        return fo
    end
    sz = size(img)
    # output image
    imgo = SharedArray(zeros(sz))
    @sync @distributed for y in 1:sz[1]
        imgo[y,:] = add_one(img[y,:])
    end
    @sync @distributed for x in 1:sz[2]
        imgo[:,x] = add_one(imgo[:,x])
    end
    return imgo
end
test = mwe(zeros((100,100))); 

function mwenested(img::Array{Float64})
    imgo = img
    for _ in 1:3
        imgo = mwe(imgo)
    end
    return imgo
end
test2 = mwenested(zeros((100,100)));

the @everywhere macro wasn’t needed, maybe this is just some old knowledge which is outdated in the meanwhile.

Benchmarking the MWE didn’t make sense, because the overhead for the distribution is large compared to what has to be done on each worker. If you do not get good parallel benchmarks with your real code, you have to think about your algorithm.

Let us keep the threads clean, and discuss here only issues about “how to use Distributed so that results are fine”. Not “how to make code faster in general”.

Needed Documentation is:
https://docs.julialang.org/en/v1/manual/parallel-computing/index.html
https://docs.julialang.org/en/v1/manual/parallel-computing/index.html#Multi-Core-or-Distributed-Processing-1
https://docs.julialang.org/en/v1/manual/parallel-computing/index.html#man-shared-arrays-1

EDIT:
I changed the Array types to AbstractArray, so that passing an Array(img) or a SharedArray(imgo) is possible. In general, it is not needed at all to specify the type of the parameters.

Hi @oheil, Thank you indeed it works whith your added correction although it’s lead to a slow down off the algorithm (it’s almost two times slower in distributed mode). This may be due to the need of allocating memory for each workers and it mean that I should probably look for other way to make my code faster, may be using the gpu.
Anyway thank you very much for your help on this !