How to re-use threads?

I have the below simple MWE for learning purposes on how to synchronize between Threads.@spawn processes accessing a SharedArray. I expect the results to be as below, however, it stuck before going to the following iteration. Could you please guide me where is the problem here?

using LinearAlgebra, SparseArrays, .Threads, Distributed, SharedArrays
function func()
  Nthreads = 3;
  tmin = 1;
  tmax = 2;
  sharedFlag = SharedArray{Bool}(Nthreads);
  sharedFlag .= false;
  l = ReentrantLock();
  thrs = [Threads.@spawn begin
          for d in tmin:tmax
            while !sharedFlag[j]
              nothing
            end
            lock(l)
            println("Hi from process $j at iteration $d")
            sharedFlag[j] = false
            unlock(l)
            
          end
  end for j = 1:Nthreads];
 for i in tmin:tmax
    lock(l)
    sharedFlag .= true;
    unlock(l)

    while reduce(|, sharedFlag) == true 
      nothing
    end

  end
  fetch.(thrs)
end
func()
Hi from process 2 at iteration 1
Hi from process 3 at iteration 1
Hi from process 1 at iteration 1
Hi from process 2 at iteration 2
Hi from process 3 at iteration 2
Hi from process 1 at iteration 2

I don’t have much experience with SharedArrays, but I believe they are for multi-processing, not for multi-threading.

1 Like

Thank you for your reply.
1- This code is multi-threading', right? Actually, I dont really know the difference between multi-processingandmulti-threading`.

2- Could you recommend a way to use a flag to sycronize between the threads in the code rather than using sharedFlag = SharedArray{Bool}(Nthreads);

That’s correct - SharedArrays are for sharing memory between different Julia processes invoked with the Distributed standard library, and have nothing to do with Threads which already share memory (which is the whole point of using them over processes).

1 Like

Thank you for your reply.
1- This code is `multi-threading’, right?

2- Could you recommend a way to use a flag to synchronize between the threads in the code rather than using sharedFlag = SharedArray{Bool}(Nthreads);

Do NOT used the package SharedArrays for a multithreaded app. Just use a normal array and blocking access if needed. Read Multi-Threading · The Julia Language on how to do this.

1 Like

Thanks for your reply.

  • Using Threads.@spawn or Threads.@threads create several threads to run on a single cpu core, right?
  • Could you recommend a way to use a flag to synchronize between the threads in the above code, rather than using sharedFlag = SharedArray{Bool}(Nthreads);?

I tried to run your code, but I do not understand what it is supposed to do, therefore I cannot fix it.

First of all your indentation is wrong. For example the line for i in tmin:tmax should probably start in the same column as the end the line before.

And I have no idea what you are doing in this block:

  thrs = [Threads.@spawn begin
          for d in tmin:tmax
            while !sharedFlag[j]
              nothing
            end
            lock(l)
            println("Hi from process $j at iteration $d")
            sharedFlag[j] = false
            unlock(l)
            
          end
  end for j = 1:Nthreads];

Not well indented and pretty much unreadable.
I guess it shall be an array comprehension. From my point of view an array comprehension should not be longer than one line.

If you could explain in words what this program is supposed to do, what is the input, what is the output, what are the processing steps, which of the processing you want to do in parallel I might be able to work on it.

2 Likes

I think I fixed your indentation, now it is a little bit more readable:

using LinearAlgebra, SparseArrays, .Threads, Distributed, SharedArrays

function func()
    Nthreads = 3;
    tmin = 1;
    tmax = 2;
    sharedFlag = SharedArray{Bool}(Nthreads);
    sharedFlag .= false;
    l = ReentrantLock();
    thrs = [Threads.@spawn begin
                for d in tmin:tmax
                    while !sharedFlag[j]
                        nothing
                    end
                    lock(l)
                    println("Hi from process $j at iteration $d")
                    sharedFlag[j] = false
                    unlock(l)
                end
            end for j = 1:Nthreads];
    for i in tmin:tmax
        lock(l)
        sharedFlag .= true;
        unlock(l)
        while reduce(|, sharedFlag) == true 
            nothing
        end
    end
    fetch.(thrs)
end

func()
1 Like

Simple example for multi-threading without dependencies in the data:

# Simple example of using Threads
# 1. create vector of vectors
# 2. write function that processes one vector
# 3. function that proccesses all vectors in serial
# 4. create threads and run function on each vector in parallel
# 5. compare results of serial and parallel processing

m = 4 # number of elements in each vector

n = Threads.nthreads() 
println("Number of threads: $n")

# function that processes vector elements in place
function hardwork(vec)
    for i in eachindex(vec)
        vec[i] = exp(vec[i])
        sleep(0.1)
    end
    nothing
end

vecvec = [rand(m) for i in 1:n]

function process_serial(vecvec)
    for vec in vecvec
        hardwork(vec)
    end
end

function process_parallel(vecvec)
    Threads.@threads for vec in vecvec
        hardwork(vec)
    end
end

work = deepcopy(vecvec)
process_serial(work)
work = deepcopy(vecvec)
@time process_serial(work)
result1=deepcopy(work)

work = deepcopy(vecvec)
process_parallel(work)
work = deepcopy(vecvec)
@time process_parallel(work)
result2=deepcopy(work)

# assert that results are the same
@assert all(result1 .== result2)

nothing

Output:

julia> include("simple.jl")
Number of threads: 8
  3.242664 seconds (161 allocations: 4.797 KiB)
  0.406468 seconds (211 allocations: 8.938 KiB)

Example for parallel calculations, that write to a container that is not thread-safe using a lock:

# use a sparse matrix
# calculate new colums and write them in per slice in parallel

using SparseArrays

n = Threads.nthreads() 
println("Number of threads: $n")

# heavy calculation; multithreading does not help if the calculation is too short
function calc_triple(m, n, slice)
    i= rand(1:m)
    j= rand(1+n*(slice-1):n*slice)
    v = rand()
    sleep(0.002)
    return (i,j,v)
end

# fill a slice of the array
# arraysize: m x m
# total number of slices: n
# slice: index of the slice to fill (1..n)
function fill_slice!(A, m, n, slice)
    for k = 1:10m/n
        i,j,v = calc_triple(m, n, slice)
        A[i,j] = v
    end
    return A
end

# fill a slice of the array using the provided lock when writing to the array
function fill_slice_lock!(A, m, n, slice, lk)
    for k = 1:10m/n
        i,j,v = calc_triple(m, n, slice)
        lock(lk) do 
            A[i,j] = v
        end
    end
    return A
end

# create and fill an array slice by slice in serial
function calc_serial(m, n)
    A = spzeros(m,m)
    for slice=1:n
        fill_slice!(A, m, n, slice)
    end
    return A
end

# create and fill an array slice by slice in parallel
function calc_parallel(m, n)
    A = spzeros(m,m)
    lk = ReentrantLock()
    Threads.@threads for slice=1:n
        fill_slice_lock!(A, m, n, slice, lk)
    end
    return A
end

# main program
m = 96
# call once to compile the code
calc_serial(m, n)
calc_parallel(m, n)
# measure the serial and parallel execution time
@time A = calc_serial(m, n)
@time A = calc_parallel(m, n)
nothing

You asked:

Using Threads.@spawn or Threads.@threads create several threads to run on a single cpu core, right?

No, threads are executed in parallel on all CPU cores that Julia is allowed to use. The difference between threads and processes are:

  1. All threads share the same memory which can make them faster
  2. all threads share the same garbage collector which can make them slower
  3. communication between threads is faster than between processes, but also more easy to get wrong
3 Likes

I have in my real code a time-loop which includes some computation at each iteration. I am using "Threads.@threads" or "Threads.@spawns" inside each iteration to address these calculation.
However, there is an overhead time at each iteration to open the threads and I did not get gain though. Therefore, I was thinking of using "Threads.@spawns" by creating them and leaving them running over the whole time-loop by some sort of syncronization (sharedFlag and l = ReentrantLock() in the code) between the slave threads and the master one. And this is what I want to correct in my MWE to make it gives me the results that I want.
Thanks to your advice, I changed to use normal array rather than “SharedArray” meanwhile working with threads as below. I tried to put two variables status and iter which are updated by the master thread, and print their resutls by the slave ones (again to test the synchronization between them). However, I am seeing that their values are not correct during the second iteration of time-loop (it should be as in the comments). I dont know the reason for that.

function func_parallel()
  Nthreads = 3;
  tmin = 1;
  tmax = 2;
  timeSim = tmin:tmax;
  status = false
  iter = 0
  l = ReentrantLock()
  flag_start  = [false, false, false]
  flag_finish = [false, false, false]
  # slave threads 
  thrs = [Threads.@spawn begin
          for d in tmin:tmax
            while !flag_start[j]
              sleep(0.001)
            end
            lock(l)
              println("Hi from process $j at iteration $d and $status and $iter")
              flag_finish[j] = true
              flag_start[j]  = false
            unlock(l)
          end # for _ in tmin:tmax
  end for j = 1:Nthreads];
  # master threads
  for i in tmin:tmax
    lock(l)
      flag_start  .= true
      unlock(l)
    while reduce(&, flag_finish) == false
      sleep(0.001)
    end
      status = !status
      iter += 1
  end 
  fetch.(thrs)
end
func_parallel()
Hi from process 1 at iteration 1 and false and 0
Hi from process 2 at iteration 1 and false and 0
Hi from process 3 at iteration 1 and false and 0
Hi from process 1 at iteration 2 and false and 2  # Hi from process 1 at iteration 2 and true and 1
Hi from process 2 at iteration 2 and false and 2  # Hi from process 2 at iteration 2 and true and 1
Hi from process 3 at iteration 2 and false and 2  # Hi from process 3 at iteration 2 and true and 1

My impressions is that what you are trying to do is pretty advanced. Did you have a look at GitHub - JuliaSIMD/Polyester.jl: The cheapest threads you can find! for low overhead threading and at GitHub - tro3/ThreadPools.jl: Improved thread management for background and nonuniform tasks in Julia. Docs at https://tro3.github.io/ThreadPools.jl ?

3 Likes

Thanks, I will check them!
By the way, I revised my last post just now again and I believe my point now should be clear about the main problem if you want to have another look at it.
Thanks alot for your help.

Can you perhaps rename the title of this thread? For example to
“How to implement a thread pool?” or “How to re-use threads?”.

This would make it more likely that someone replies who did this already…

1 Like

Thanks for your advice. It is unfortunate that I ma not able to rename it.

Well, I am able to edit the title of my own posts by clicking on the crayon next to the title. Does that work for you?

1 Like

Thanks for your feedback, I can do the edit within one day of posting it. After that, I cannot,

I’ve changed it as per Uwe’s suggestion.

1 Like

Thanks nils

You can use Atomic Operations if you want to do that manually, or Channels if you don’t.

1 Like