@distributed computing

I was trying to use @distributed in a for loop, where the end result is two arrays. Here is the code without @distributed

function check_par(M)
    take_vec1 =  Array{Float32}(undef,0)
    take_vec2 =  Array{Float32}(undef,0)
    for i in 1:M 
         vec = Array{Float32}(rand(Normal(0,1), 5))
         push!(take_vec1, mean(vec))
         push!(take_vec2, mean(vec.^2))
     end
     take_vec1, take_vec2
end

Then I tried this using distributed,


using Distributed, BenchmarkTools
rmprocs(workers()); addprocs(4); nworkers()
@everywhere using Statistics, Random, Distributions, Plots, StatsBase

function check_par(M)
     take_vec1 =  Array{Float32}(undef,0)
     take_vec2 =  Array{Float32}(undef,0)
      @distributed for i in 1:M 
         vec = Array{Float32}(rand(Normal(0,1), 5))
         push!(take_vec1, mean(vec))
         push!(take_vec2, mean(vec.^2))
     end
     take_vec1, take_vec2
 end

which makes clear that I have no idea how to use @distributed. Can anyone suggest me how to use @distributed in this case, and also maybe would you please suggest something to read so that I can get a better idea. Should I just start reading Julia 1.1 Documentation?

you could try this (with reduction).
I actually expected that your code would run correctly (but it does not).

It is possible that this could be done more efficiently.
You may want to check out https://github.com/joshday/OnlineStats.jl

function check_par(M)
     take_vec= @distributed (vcat) for i in 1:M 
         vec=Array{Float32}(rand(Normal(0,1), 5))
         mean(vec),mean(vec.^2)
     end
     take_vec
 end

 v=check_par(2)
 typeof(v)

Thanks a lot, the output is slightly different, its giving me an array of tuples, I think which could be converted to two arrays. So this is ok maybe for now. But any idea how can I save the result in two arrays?

Indeed. Maybe there is nicer way which directly results in two vectors.

You can get two vectors like this.
Alternatively consider the version below (which gives you a 2 dim array)

arr_tuple=check_par(100)
 v1=map(x->x[1],arr_tuple)
 v2=map(x->x[2],arr_tuple)



function check_par3(M)
    Random.seed!(22)
     take_vec= @distributed (vcat) for i in 1:M 
         vec=Array{Float32}(rand(Normal(0,1), 5))
         reshape([mean(vec),mean(abs2.(vec))],1,2)
     end
     take_vec
 end

If you want to have two arrays in the end, then you could used SharedArrays, as explained in the documentation: Parallel Map and Loops

I would write it like

using Distributed, SharedArrays
addprocs(4)
@everywhere using Statistics, StatsBase, Distributions, Random 

function check_par(M)
    take_vec1 =  SharedArray{Float32}(M)
    take_vec2 =  SharedArray{Float32}(M)
    @sync @distributed for i in 1:M 
        vec = Array{Float32}(rand(Normal(0,1), 5))
        take_vec1[i] = mean(vec)
        take_vec2[i] =  mean(vec.^2)
    end
    take_vec1, take_vec2
 end

Notice that you also need to use @sync so that the loop waits for the results to complete and it doesn’t return a vector of uninitialized 0’s.

I think its better I write the full code here, so that its clear where I need the parallel computing. So I have been trying to write couple functions to do Bootstrap, here are the functions

Function to create the data

function create_data(N1::Int64, N0::Int64)
    # I set tau = 5, there was nothing in the paper
    Y_1 = Array{Int8}(fill(5, N1));
    Y_0 = Array{Float32}(rand(Normal(0,1), N0))
    
    # datasets
    # treated data sets
    X_t = Array{Float32}(rand(Uniform(0,1), N1))
    treated = hcat(X_t, Y_1);
    
    # control data sets
    X_c = Array{Float32}(rand(Uniform(0,1), N0))
    control = hcat(X_c, Y_0);
    
    treated, control
end

The output of the above function could be placed as arguments in the following which would create a matched data set

function match_tc(treated_d, control_d)
    matched_Y0 = Array{Float32}(undef, 0)
    for i = 1:size(treated_d)[1]
        x = [abs(treated_d[i, 1]-control_d[j, 1]) for j = 1:size(control_d)[1] ]; 
        x_min = findmin(x)[2] #find the 1st minimum, assuming there is only one, 2 is for index
        ith_control = control_d[x_min, 2]
        push!(matched_Y0, ith_control)
    end
    matched = hcat(treated_d[:, 2], matched_Y0) # matched dataset
end

Then from the matched data we can do bootstrap using following function

function boot_ATET(B::Int64, treated_d, control_d, N1_b, N0_b)
    ATET_b = Float32[] # storage for bootstrap estimates
    for b in 1:B 
        i_t = sample(collect(1: size(treated_d)[1]), N1_b, replace = true)
        i_c = sample(collect(1: size(control_d)[1]), N0_b, replace = true)
        treated_d_b = treated_d[i_t, :]
        control_d_b = control_d[i_c, :]
        matched = match_tc(treated_d_b, control_d_b)
        ATET = mean(matched[:, 1] - matched[:, 2])
        push!(ATET_b, ATET)
    end
    ATET_b
end

And finally we can do Monte Carlo Simulation using replic function, and this is where I wanted to use some parallel or distributed computing

function replic(R, N1, N0, B, N1_b, N0_b, seedn)
    Random.seed!(seedn)
    R_means =  Array{Float32}(undef, 0)
    R_bootmeans = Array{Float32}(undef, 0)
    R_bootvars = Array{Float32}(undef, 0)

    for i in 1:R
        treated, control = create_data(N1, N0) # create the data
        matched = match_tc(treated, control) # match and create matched data
        ATET = mean(matched[:, 1] - matched[:, 2]) # estimate
        push!(R_means, ATET)

        # Bootstrap replication is 100
        ATET_b = boot_ATET(B, treated, control, N1_b, N0_b)
        boot_mean = mean(ATET_b)
        push!(R_bootmeans, boot_mean)
        boot_var = sum((ATET_b .- ATET).^2)/B
        push!(R_bootvars, boot_var)
    end
    R_means, R_bootmeans, R_bootvars
end

so after defining you can just call this,

replic(10, 100, 100, 100, 100, 100, 3434);

to get the result for 10 replications, here some parallel would be really useful. This is probably one of the first complete Julia functions I have written, so I could also learn if someone has some suggestions. Thanks in advance

So I replaced the replic function with following, but didn’t work,


# with parallel


function replic_par(R, N1, N0, B, N1_b, N0_b, seedn)
    Random.seed!(seedn)
    R_means =  SharedArray{Float64}(R)
    R_bootmeans = SharedArray{Float64}(R)
    R_bootvars = SharedArray{Float64}(R)

    @sync @distributed for i in 1:R
        treated, control = create_data(N1, N0) # create the data
        matched = match_tc(treated, control) # match and create matched data
        ATET = mean(matched[:, 1] - matched[:, 2]) # estimate
        R_means[i] =  ATET

        # Bootstrap replication is 100
        ATET_b = boot_ATET(B, treated, control, N1_b, N0_b)
        boot_mean = mean(ATET_b)
        R_bootmeans[i] = boot_mean
        boot_var = sum((ATET_b .- ATET).^2)/B
        R_bootvars[i] = boot_var
    end
    R_means, R_bootmeans, R_bootvars
end

thanks for all the help anyways,

It worked I just had to add @everywhere in all functions

Yes, I was going to say to put @everywhere. Great!

Awesome, thank you so much, any comment regarding the functions in general? May be boring to check, no worries!, thanks a lot man, thank you so much.

you can also call replic with different seeds.
Also, I think your simulation has potential for speed ups if you avoid allocations.

seed0=3434
res=@distributed (vcat) for i=1:10
    seed=seed0+i
    hcat(replic(10, 100, 100, 100, 100, 100, seed)...)
end

hmm! Interesting suggestion, thanks a lot, yes you are right!, thank you so much

Below I tried to speed it up a bit (about 30%). I am not sure if I maintained the same functionality though.
Unfortunately, readability may have suffered. Also, I probably made a lot of changes with little impact on performance.

@everywhere function create_data(N1::Int64, N0::Int64)
    # I set tau = 5, there was nothing in the paper
    Y_1 = Array{Int8}(fill(5, N1));
    Y_0 = Array{Float32}(rand(Normal(0,1), N0))
    
    # datasets
    # treated data sets
    X_t = Array{Float32}(rand(Uniform(0,1), N1))
    treated = hcat(X_t, Y_1);
    
    # control data sets
    X_c = Array{Float32}(rand(Uniform(0,1), N0))
    control = hcat(X_c, Y_0);
    
    treated, control
end

@everywhere function create_data!(treated,control,N1::Int64, N0::Int64)
    # I set tau = 5, there was nothing in the paper
    uni=Uniform(0,1)
    nm=Normal(0,1)
    @inbounds @simd for i=1:N1
         treated[i,1]=rand(uni)
         control[i,1]=rand(uni)
         treated[i,2]=5
         control[i,2]=rand(nm)
    end
    nothing
end

@everywhere function match_tc!(x,treated_d, control_d)
    matched_Y0 = Array{Float32}(undef, 0)
    sz=size(control_d)[1]
    ATET_sum=zero(eltype(treated_d))
    szt=size(treated_d)[1] 
    @inbounds for i = 1:szt    
        for j=1:sz        
            @inbounds x[j]=abs(treated_d[i, 1]-control_d[j, 1])
        end        
        x_min = findmin(x)[2] #find the 1st minimum, assuming there is only one, 2 is for index
        ith_control = control_d[x_min, 2]
        ATET_sum+=(treated_d[szt-i+1,2]-ith_control)
        #push!(matched_Y0, ith_control)
    end
    #matched = hcat(treated_d[:, 2], matched_Y0) # matched dataset
    #matched = match_tc(treated, control) # match and create matched data
    #ATET = mean(view(treated_d,:, 2) .- matched_Y0) # estimate
    return    ATET_sum/szt
    #return ATET
end

@everywhere function boot_ATET!(x,i_t,i_c,B::Int64, treated_d, control_d, N1_b, N0_b)
    ATET_b = Float32[] # storage for bootstrap estimates
    for b in 1:B 
        #i_t = sample(collect(1: size(treated_d)[1]), N1_b, replace = true)
        #i_c = sample(collect(1: size(control_d)[1]), N0_b, replace = true)
        sample!(collect(1: size(treated_d)[1]),  i_t,replace = true)
        sample!(collect(1: size(control_d)[1]), i_c, replace = true)
        treated_d_b = view(treated_d,i_t, :)
        control_d_b = view(control_d,i_c, :)
        #matched = match_tc(treated_d_b, control_d_b)
        #ATET = mean(matched[:, 1] - matched[:, 2])
        ATET=match_tc!(x,treated_d_b, control_d_b) # match and create matched data)
        push!(ATET_b, ATET)
    end
    ATET_b
end


@everywhere  function replic(R, N1, N0, B, N1_b, N0_b, seedn)
    Random.seed!(seedn)
    R_means =  Array{Float32}(undef, 0)
    R_bootmeans = Array{Float32}(undef, 0)
    R_bootvars = Array{Float32}(undef, 0)    
    treated, control = create_data(N1, N0) # create the data
    x=zero(Float32.(1:size(treated,1)))
    i_t=Vector{Int}(undef,N1_b)
    i_c=Vector{Int}(undef,N0_b)
    for i in 1:R
        create_data!(treated,control,N1, N0) # create the data
        ATET=match_tc!(x,treated, control) # match and create matched data)
        push!(R_means, ATET)

        # Bootstrap replication is 100
        ATET_b = boot_ATET!(x,i_t,i_c,B, treated, control, N1_b, N0_b)
        boot_mean = mean(ATET_b)
        push!(R_bootmeans, boot_mean)
        boot_var = sum((ATET_b .- ATET).^2)/B
        push!(R_bootvars, boot_var)
    end
    R_means, R_bootmeans, R_bootvars    
end


Hey @bernhard thank you so much for the all the suggestions and help, really appreciate it