How to synchronize and share data correctly between processes in a loop?

I am practicing to make my real code work on parallel. My challenge is in the relation between the threads and the main process. To make it clear, I am putting the serial code func_series(), which solves a linear equation Ax=b in time-loop with making update on A and b at each iteration. A is diagonal and the solution can be partitioned. I wrote func_parallel() with two flags sharedFlag_runTDIterand sharedFlag_finishTDIterThread to synchronize between the threads, in which the threads update the values of A and b (from the main thread/memory) and return the equation solution (to the main thread/memory) at each time-step. However, I am not having the same results which is coming from the synchronization or miss use of remotecall_fetch. Could you please guide me?

using LinearAlgebra, SparseArrays, .Threads, Distributed
addprocs(3);
@everywhere using SharedArrays
Result = [];
function func_series()
  Nthreads = 3;
  tmin = 1;
  tmax = 2;
  timeSim = tmin:tmax;
  A = sparse([1.0 2 0 0 0 0; 3 1 0 0 0 0; 0 0 3 1 0 0; 0 0 2 4 0 0; 0 0 0 0 9 3; 0 0 0 0 2 4]);
  b = [1.0, 1, 1, 1, 1, 1];
  x = zeros(length(b)); 
  x_record = zeros(length(x),size(timeSim,1));
  indexRange = [1:2,  
                3:4,
                5:6];  
  workvec = [similar(x, 2),  
             similar(x, 2),
             similar(x, 2)];  
  for i in tmin:tmax
    for j = 1:Nthreads
      x[indexRange[j]] .= A[indexRange[j],indexRange[j]]\b[indexRange[j]];
    end
    A.nzval .+= i;
    b .+= i;
    x_record[:,i] .= x;
  end
  return x_record
end

function func_parallel()
  Nthreads = 3;
  tmin = 1;
  tmax = 2;
  timeSim = tmin:tmax;
  A = sparse([1.0 2 0 0 0 0; 3 1 0 0 0 0; 0 0 3 1 0 0; 0 0 2 1 0 0; 0 0 0 0 1 3; 0 0 0 0 2 1]);
  A_remotecall = remotecall(() -> A, 1);
  b = [1.0, 1, 1, 1, 1, 1];
  b_remotecall = remotecall(() -> b, 1);
  x = zeros(length(b)); 
  x_record = zeros(length(x),size(timeSim,1));
  sharedFlag_runTDIter = SharedArray{Bool}(1);
  sharedFlag_runTDIter[1] = false;
  sharedFlag_finishTDIterThread = SharedArray{Bool}(Nthreads);
  sharedFlag_finishTDIterThread .= false;
  indexRange = [1:2,
                3:4,
                5:6]; 
  workvec = [similar(x, 2), 
             similar(x, 2),
             similar(x, 2)];  
  thrs = [Threads.@spawn begin
          for _ in tmin:tmax
            while sharedFlag_runTDIter[j] == false
              nothing
            end
            remotecall_fetch(x[indexRange[j]] .= fetch(A_remotecall)[indexRange[j],indexRange[j]]\fetch(b_remotecall)[indexRange[j]]);   
            sharedFlag_finishTDIterThread[j] = true; # to inform the main thread that the result is ready
            sharedFlag_runTDIter[j] = false;              # to prevent the thread from solving the next time-step before receiving the update of A and b
            end 
  end for j = 1:Nthreads];

  for i in tmin:tmax
    sharedFlag_runTDIter .= true;  # to allow threads start solving x=A\b
    if reduce(&, sharedFlag_finishTDIterThread) == true   # wait until all threads finish solving x=A\b
      A.nzval .+= i;
      b .+= i;
    end
    x_record[:,i] .= x;
  end 
  #fetch.(thrs);
  return x_record
end

Result = func_series()
6Ă—2 Matrix{Float64}:
 0.2        0.25
 0.4        0.5
 0.3        0.428571
 0.1        0.142857
 0.0333333  0.0526316
 0.233333   0.368421
Result = func_parallel()
6Ă—2 Matrix{Float64}:
 0.2  0.2
 0.4  0.4
 0.0  0.0
 0.0  0.0
 0.0  0.0
 0.0  0.0

Any comment here please?

Hi, I didn’t really check if you’re getting any advantage out of it, but I think that you can skip the whole Distributed and SharedArrays complication:

function func_threads()
	tmin = 1;
	tmax = 2;
	timeSim = tmin:tmax;
	A = sparse([1.0 2 0 0 0 0; 3 1 0 0 0 0; 0 0 3 1 0 0; 0 0 2 4 0 0; 0 0 0 0 9 3; 0 0 0 0 2 4]);
	b = [1.0, 1, 1, 1, 1, 1];
	x = zeros(length(b));
	x_record = zeros(length(x),size(timeSim,1));
	indexRange = [1:2,  
				  3:4,
				  5:6];  
	for i in tmin:tmax
		Threads.@threads for j in eachindex(indexRange)
			At =  A[indexRange[j],indexRange[j]]
		@views x[indexRange[j]] .= At\b[indexRange[j]];
	  end
	  A.nzval .+= i;
	  b .+= i;
	  x_record[:,i] .= x;
	end
	return x_record
  end

I believe you mixed up distributed workers and Julia threads a little bit in your example. In most cases you want to chose one or the other. There are possibly already implemented ways to do what you’re doing.

1 Like

Thanks for your reply. I think I will have advantage in my real code.

Yes, in this MWE, it can be done using Threads.@threads . However, there is an overhead at each iteration time to open the threads and I did not get gain in my real code. So, I was thinking of using @spawn by launching the process and don’t close it by fetch until the end of the time-loop to avoid the repetitive overhead. (I am not sure if my though is correct. Please guide me). Could you please guide me to know where is the error in my code above?

threads in Julia can share memory, so you don’t have to do anything special (you need to of course avoid race condition).

And if you’re just using threads, you don’t need all the Distributed and @everywhere and SharedArray stuff, those are for multi-processing (where you have different OS processes, and memory is not shared)

1 Like

Thanks for your reply.

  • Yes, in this MWE, it can be done using Threads.@threads. However, there is an overhead at each iteration time to open the threads and I did not get gain in my real code. So, I was thinking of using @spawn by launching the process and don’t close it by fetch until the end of the time-loop to avoid the repetitive overhead. (I am not sure if my though is correct. Please guide me).
  • Different OS processes means different tasks (functions), right? If so, I am sorry if it did not mention it, I may in future assign different functions to be processed on different processes. So, I was practicing on using @spawn.

why not use threads?

1 Like

that’s not how Julia threads works

1 Like

if I have two different functions named as f1 and f2', how can I call them in Threads.@threads`?

That is why I am trying to use @spawn instead as far as I know.

help?> Threads.@spawn
  Threads.@spawn [:default|:interactive] expr


  Create a Task and schedule it to run on any available thread in the specified threadpool (:default if unspecified). The task
  is allocated to a thread once one becomes available. To wait for the task to finish, call wait on the result of this macro, or
  call fetch to wait and then obtain its return value.
1 Like

Well, then that is why I chose Threads.@spawn in my WME. Could you please guide me to know where is the error in my code above?
Thank you in advance.

Any tip please to correct the parallel code to work as the sequential?