# @distributed computing

I was trying to use @distributed in a for loop, where the end result is two arrays. Here is the code without @distributed

``````function check_par(M)
take_vec1 =  Array{Float32}(undef,0)
take_vec2 =  Array{Float32}(undef,0)
for i in 1:M
vec = Array{Float32}(rand(Normal(0,1), 5))
push!(take_vec1, mean(vec))
push!(take_vec2, mean(vec.^2))
end
take_vec1, take_vec2
end
``````

Then I tried this using distributed,

``````
using Distributed, BenchmarkTools
@everywhere using Statistics, Random, Distributions, Plots, StatsBase

function check_par(M)
take_vec1 =  Array{Float32}(undef,0)
take_vec2 =  Array{Float32}(undef,0)
@distributed for i in 1:M
vec = Array{Float32}(rand(Normal(0,1), 5))
push!(take_vec1, mean(vec))
push!(take_vec2, mean(vec.^2))
end
take_vec1, take_vec2
end
``````

which makes clear that I have no idea how to use @distributed. Can anyone suggest me how to use @distributed in this case, and also maybe would you please suggest something to read so that I can get a better idea. Should I just start reading Julia 1.1 Documentation?

you could try this (with reduction).
I actually expected that your code would run correctly (but it does not).

It is possible that this could be done more efficiently.
You may want to check out https://github.com/joshday/OnlineStats.jl

``````function check_par(M)
take_vec= @distributed (vcat) for i in 1:M
vec=Array{Float32}(rand(Normal(0,1), 5))
mean(vec),mean(vec.^2)
end
take_vec
end

v=check_par(2)
typeof(v)
``````

Thanks a lot, the output is slightly different, its giving me an array of tuples, I think which could be converted to two arrays. So this is ok maybe for now. But any idea how can I save the result in two arrays?

Indeed. Maybe there is nicer way which directly results in two vectors.

You can get two vectors like this.
Alternatively consider the version below (which gives you a 2 dim array)

``````arr_tuple=check_par(100)
v1=map(x->x,arr_tuple)
v2=map(x->x,arr_tuple)

function check_par3(M)
Random.seed!(22)
take_vec= @distributed (vcat) for i in 1:M
vec=Array{Float32}(rand(Normal(0,1), 5))
reshape([mean(vec),mean(abs2.(vec))],1,2)
end
take_vec
end

``````

If you want to have two arrays in the end, then you could used SharedArrays, as explained in the documentation: Parallel Map and Loops

I would write it like

``````using Distributed, SharedArrays
@everywhere using Statistics, StatsBase, Distributions, Random

function check_par(M)
take_vec1 =  SharedArray{Float32}(M)
take_vec2 =  SharedArray{Float32}(M)
@sync @distributed for i in 1:M
vec = Array{Float32}(rand(Normal(0,1), 5))
take_vec1[i] = mean(vec)
take_vec2[i] =  mean(vec.^2)
end
take_vec1, take_vec2
end
``````

Notice that you also need to use `@sync` so that the loop waits for the results to complete and it doesn’t return a vector of uninitialized 0’s.

I think its better I write the full code here, so that its clear where I need the parallel computing. So I have been trying to write couple functions to do Bootstrap, here are the functions

Function to create the data

``````function create_data(N1::Int64, N0::Int64)
# I set tau = 5, there was nothing in the paper
Y_1 = Array{Int8}(fill(5, N1));
Y_0 = Array{Float32}(rand(Normal(0,1), N0))

# datasets
# treated data sets
X_t = Array{Float32}(rand(Uniform(0,1), N1))
treated = hcat(X_t, Y_1);

# control data sets
X_c = Array{Float32}(rand(Uniform(0,1), N0))
control = hcat(X_c, Y_0);

treated, control
end
``````

The output of the above function could be placed as arguments in the following which would create a matched data set

``````function match_tc(treated_d, control_d)
matched_Y0 = Array{Float32}(undef, 0)
for i = 1:size(treated_d)
x = [abs(treated_d[i, 1]-control_d[j, 1]) for j = 1:size(control_d) ];
x_min = findmin(x) #find the 1st minimum, assuming there is only one, 2 is for index
ith_control = control_d[x_min, 2]
push!(matched_Y0, ith_control)
end
matched = hcat(treated_d[:, 2], matched_Y0) # matched dataset
end
``````

Then from the matched data we can do bootstrap using following function

``````function boot_ATET(B::Int64, treated_d, control_d, N1_b, N0_b)
ATET_b = Float32[] # storage for bootstrap estimates
for b in 1:B
i_t = sample(collect(1: size(treated_d)), N1_b, replace = true)
i_c = sample(collect(1: size(control_d)), N0_b, replace = true)
treated_d_b = treated_d[i_t, :]
control_d_b = control_d[i_c, :]
matched = match_tc(treated_d_b, control_d_b)
ATET = mean(matched[:, 1] - matched[:, 2])
push!(ATET_b, ATET)
end
ATET_b
end
``````

And finally we can do Monte Carlo Simulation using `replic` function, and this is where I wanted to use some parallel or distributed computing

``````function replic(R, N1, N0, B, N1_b, N0_b, seedn)
Random.seed!(seedn)
R_means =  Array{Float32}(undef, 0)
R_bootmeans = Array{Float32}(undef, 0)
R_bootvars = Array{Float32}(undef, 0)

for i in 1:R
treated, control = create_data(N1, N0) # create the data
matched = match_tc(treated, control) # match and create matched data
ATET = mean(matched[:, 1] - matched[:, 2]) # estimate
push!(R_means, ATET)

# Bootstrap replication is 100
ATET_b = boot_ATET(B, treated, control, N1_b, N0_b)
boot_mean = mean(ATET_b)
push!(R_bootmeans, boot_mean)
boot_var = sum((ATET_b .- ATET).^2)/B
push!(R_bootvars, boot_var)
end
R_means, R_bootmeans, R_bootvars
end
``````

so after defining you can just call this,

``````replic(10, 100, 100, 100, 100, 100, 3434);

``````

to get the result for 10 replications, here some parallel would be really useful. This is probably one of the first complete Julia functions I have written, so I could also learn if someone has some suggestions. Thanks in advance

So I replaced the `replic` function with following, but didn’t work,

``````
# with parallel

function replic_par(R, N1, N0, B, N1_b, N0_b, seedn)
Random.seed!(seedn)
R_means =  SharedArray{Float64}(R)
R_bootmeans = SharedArray{Float64}(R)
R_bootvars = SharedArray{Float64}(R)

@sync @distributed for i in 1:R
treated, control = create_data(N1, N0) # create the data
matched = match_tc(treated, control) # match and create matched data
ATET = mean(matched[:, 1] - matched[:, 2]) # estimate
R_means[i] =  ATET

# Bootstrap replication is 100
ATET_b = boot_ATET(B, treated, control, N1_b, N0_b)
boot_mean = mean(ATET_b)
R_bootmeans[i] = boot_mean
boot_var = sum((ATET_b .- ATET).^2)/B
R_bootvars[i] = boot_var
end
R_means, R_bootmeans, R_bootvars
end
``````

thanks for all the help anyways,

Yes, I was going to say to put `@everywhere`. Great!

Awesome, thank you so much, any comment regarding the functions in general? May be boring to check, no worries!, thanks a lot man, thank you so much.

you can also call replic with different seeds.
Also, I think your simulation has potential for speed ups if you avoid allocations.

``````seed0=3434
res=@distributed (vcat) for i=1:10
seed=seed0+i
hcat(replic(10, 100, 100, 100, 100, 100, seed)...)
end
``````

hmm! Interesting suggestion, thanks a lot, yes you are right!, thank you so much

Below I tried to speed it up a bit (about 30%). I am not sure if I maintained the same functionality though.
Unfortunately, readability may have suffered. Also, I probably made a lot of changes with little impact on performance.

``````@everywhere function create_data(N1::Int64, N0::Int64)
# I set tau = 5, there was nothing in the paper
Y_1 = Array{Int8}(fill(5, N1));
Y_0 = Array{Float32}(rand(Normal(0,1), N0))

# datasets
# treated data sets
X_t = Array{Float32}(rand(Uniform(0,1), N1))
treated = hcat(X_t, Y_1);

# control data sets
X_c = Array{Float32}(rand(Uniform(0,1), N0))
control = hcat(X_c, Y_0);

treated, control
end

@everywhere function create_data!(treated,control,N1::Int64, N0::Int64)
# I set tau = 5, there was nothing in the paper
uni=Uniform(0,1)
nm=Normal(0,1)
@inbounds @simd for i=1:N1
treated[i,1]=rand(uni)
control[i,1]=rand(uni)
treated[i,2]=5
control[i,2]=rand(nm)
end
nothing
end

@everywhere function match_tc!(x,treated_d, control_d)
matched_Y0 = Array{Float32}(undef, 0)
sz=size(control_d)
ATET_sum=zero(eltype(treated_d))
szt=size(treated_d)
@inbounds for i = 1:szt
for j=1:sz
@inbounds x[j]=abs(treated_d[i, 1]-control_d[j, 1])
end
x_min = findmin(x) #find the 1st minimum, assuming there is only one, 2 is for index
ith_control = control_d[x_min, 2]
ATET_sum+=(treated_d[szt-i+1,2]-ith_control)
#push!(matched_Y0, ith_control)
end
#matched = hcat(treated_d[:, 2], matched_Y0) # matched dataset
#matched = match_tc(treated, control) # match and create matched data
#ATET = mean(view(treated_d,:, 2) .- matched_Y0) # estimate
return    ATET_sum/szt
#return ATET
end

@everywhere function boot_ATET!(x,i_t,i_c,B::Int64, treated_d, control_d, N1_b, N0_b)
ATET_b = Float32[] # storage for bootstrap estimates
for b in 1:B
#i_t = sample(collect(1: size(treated_d)), N1_b, replace = true)
#i_c = sample(collect(1: size(control_d)), N0_b, replace = true)
sample!(collect(1: size(treated_d)),  i_t,replace = true)
sample!(collect(1: size(control_d)), i_c, replace = true)
treated_d_b = view(treated_d,i_t, :)
control_d_b = view(control_d,i_c, :)
#matched = match_tc(treated_d_b, control_d_b)
#ATET = mean(matched[:, 1] - matched[:, 2])
ATET=match_tc!(x,treated_d_b, control_d_b) # match and create matched data)
push!(ATET_b, ATET)
end
ATET_b
end

@everywhere  function replic(R, N1, N0, B, N1_b, N0_b, seedn)
Random.seed!(seedn)
R_means =  Array{Float32}(undef, 0)
R_bootmeans = Array{Float32}(undef, 0)
R_bootvars = Array{Float32}(undef, 0)
treated, control = create_data(N1, N0) # create the data
x=zero(Float32.(1:size(treated,1)))
i_t=Vector{Int}(undef,N1_b)
i_c=Vector{Int}(undef,N0_b)
for i in 1:R
create_data!(treated,control,N1, N0) # create the data
ATET=match_tc!(x,treated, control) # match and create matched data)
push!(R_means, ATET)

# Bootstrap replication is 100
ATET_b = boot_ATET!(x,i_t,i_c,B, treated, control, N1_b, N0_b)
boot_mean = mean(ATET_b)
push!(R_bootmeans, boot_mean)
boot_var = sum((ATET_b .- ATET).^2)/B
push!(R_bootvars, boot_var)
end
R_means, R_bootmeans, R_bootvars
end

``````

Hey @bernhard thank you so much for the all the suggestions and help, really appreciate it