# How to synchronize and share data correctly between processes in a loop?

I am practicing to make my real code work on parallel. My challenge is in the relation between the threads and the main process. To make it clear, I am putting the serial code `func_series()`, which solves a linear equation `Ax=b` in time-loop with making update on `A` and `b` at each iteration. A is diagonal and the solution can be partitioned. I wrote `func_parallel()` with two flags `sharedFlag_runTDIter`and `sharedFlag_finishTDIterThread` to synchronize between the threads, in which the threads update the values of `A` and `b` (from the main thread/memory) and return the equation solution (to the main thread/memory) at each time-step. However, I am not having the same results which is coming from the synchronization or miss use of `remotecall_fetch`. Could you please guide me?

``````using LinearAlgebra, SparseArrays, .Threads, Distributed
@everywhere using SharedArrays
Result = [];
function func_series()
tmin = 1;
tmax = 2;
timeSim = tmin:tmax;
A = sparse([1.0 2 0 0 0 0; 3 1 0 0 0 0; 0 0 3 1 0 0; 0 0 2 4 0 0; 0 0 0 0 9 3; 0 0 0 0 2 4]);
b = [1.0, 1, 1, 1, 1, 1];
x = zeros(length(b));
x_record = zeros(length(x),size(timeSim,1));
indexRange = [1:2,
3:4,
5:6];
workvec = [similar(x, 2),
similar(x, 2),
similar(x, 2)];
for i in tmin:tmax
x[indexRange[j]] .= A[indexRange[j],indexRange[j]]\b[indexRange[j]];
end
A.nzval .+= i;
b .+= i;
x_record[:,i] .= x;
end
return x_record
end

function func_parallel()
tmin = 1;
tmax = 2;
timeSim = tmin:tmax;
A = sparse([1.0 2 0 0 0 0; 3 1 0 0 0 0; 0 0 3 1 0 0; 0 0 2 1 0 0; 0 0 0 0 1 3; 0 0 0 0 2 1]);
A_remotecall = remotecall(() -> A, 1);
b = [1.0, 1, 1, 1, 1, 1];
b_remotecall = remotecall(() -> b, 1);
x = zeros(length(b));
x_record = zeros(length(x),size(timeSim,1));
sharedFlag_runTDIter = SharedArray{Bool}(1);
sharedFlag_runTDIter[1] = false;
indexRange = [1:2,
3:4,
5:6];
workvec = [similar(x, 2),
similar(x, 2),
similar(x, 2)];
for _ in tmin:tmax
while sharedFlag_runTDIter[j] == false
nothing
end
remotecall_fetch(x[indexRange[j]] .= fetch(A_remotecall)[indexRange[j],indexRange[j]]\fetch(b_remotecall)[indexRange[j]]);
sharedFlag_runTDIter[j] = false;              # to prevent the thread from solving the next time-step before receiving the update of A and b
end

for i in tmin:tmax
sharedFlag_runTDIter .= true;  # to allow threads start solving x=A\b
if reduce(&, sharedFlag_finishTDIterThread) == true   # wait until all threads finish solving x=A\b
A.nzval .+= i;
b .+= i;
end
x_record[:,i] .= x;
end
#fetch.(thrs);
return x_record
end

Result = func_series()
6Ă—2 Matrix{Float64}:
0.2        0.25
0.4        0.5
0.3        0.428571
0.1        0.142857
0.0333333  0.0526316
0.233333   0.368421
Result = func_parallel()
6Ă—2 Matrix{Float64}:
0.2  0.2
0.4  0.4
0.0  0.0
0.0  0.0
0.0  0.0
0.0  0.0
``````

Hi, I didnâ€™t really check if youâ€™re getting any advantage out of it, but I think that you can skip the whole Distributed and SharedArrays complication:

``````function func_threads()
tmin = 1;
tmax = 2;
timeSim = tmin:tmax;
A = sparse([1.0 2 0 0 0 0; 3 1 0 0 0 0; 0 0 3 1 0 0; 0 0 2 4 0 0; 0 0 0 0 9 3; 0 0 0 0 2 4]);
b = [1.0, 1, 1, 1, 1, 1];
x = zeros(length(b));
x_record = zeros(length(x),size(timeSim,1));
indexRange = [1:2,
3:4,
5:6];
for i in tmin:tmax
At =  A[indexRange[j],indexRange[j]]
@views x[indexRange[j]] .= At\b[indexRange[j]];
end
A.nzval .+= i;
b .+= i;
x_record[:,i] .= x;
end
return x_record
end
``````

I believe you mixed up distributed workers and Julia threads a little bit in your example. In most cases you want to chose one or the other. There are possibly already implemented ways to do what youâ€™re doing.

1 Like

Yes, in this MWE, it can be done using `Threads.@threads` . However, there is an overhead at each iteration time to open the threads and I did not get gain in my real code. So, I was thinking of using `@spawn` by launching the process and donâ€™t close it by `fetch` until the end of the time-loop to avoid the repetitive overhead. (I am not sure if my though is correct. Please guide me). Could you please guide me to know where is the error in my code above?

threads in Julia can share memory, so you donâ€™t have to do anything special (you need to of course avoid race condition).

And if youâ€™re just using threads, you donâ€™t need all the `Distributed` and `@everywhere` and `SharedArray` stuff, those are for multi-processing (where you have different OS processes, and memory is not shared)

1 Like

• Yes, in this MWE, it can be done using `Threads.@threads`. However, there is an overhead at each iteration time to open the threads and I did not get gain in my real code. So, I was thinking of using `@spawn` by launching the process and donâ€™t close it by `fetch` until the end of the time-loop to avoid the repetitive overhead. (I am not sure if my though is correct. Please guide me).
• Different OS processes means different tasks (functions), right? If so, I am sorry if it did not mention it, I may in future assign different functions to be processed on different processes. So, I was practicing on using `@spawn`.

1 Like

thatâ€™s not how Julia threads works

1 Like

if I have two different functions named as `f1` and `f2', how can I call them in `Threads.@threads`?

That is why I am trying to use ` @spawn` instead as far as I know.

``````help?> Threads.@spawn
Well, then that is why I chose `Threads.@spawn` in my WME. Could you please guide me to know where is the error in my code above?