Can the threads generated by @tspawnat synchronized by channel?

I want to use and test multi-threading with synchronization between them using channel channels as in the example MWE, in which nthreads()=3. Each thread executes for-loop and while-loop inside it.

In each iteration of the while-loop, the first thread, i.e., threadid()=1, put! into channels and blocks until threads 2&3 take!. Then move to the next iteration. threadid()=1 also updates status in the second iteration of the while-loop to get out of it.

To track the execution of thread 1, the command println("Iteration $t Thread $(threadid()) Update status") is assigned it. While for threads 2&3, the command println("Iteration $t Thread $(threadid()) Execution") is assigned to them.

function f()
  iter = 0   # artificial parameter that helps in updating `status`
  channels = [Channel{Nothing}(0) for i in 1:(nthreads()-1)]
  status = true
  thrs = [@tspawnat i begin
  for t in 1:2
    iter = 0
    status = true
    while status
      if threadid() == 1
       #This is to reverse the value of `status` in the second iteration of the `while loop`
        iter += 1;
        if iter == 2
          status = false
        end
        println("Iteration $t Thread $(threadid()) Update status")
        #
        put!.(channels, nothing)
      else
        take!(channels[threadid()-1])
        println("Iteration $t Thread $(threadid()) Execution")
      end # if threadid() == 1
    end # while status
  end  # for t in 1:2
 end for i = 1:nthreads()];
 fetch.(thrs);
end 
f()

I expect the results to be as below, however, it freezes randomly. I think it is due to improper usage of put! and take! for channels. Could you please guide me to solve the problem?.

Iteration 1 Thread 1 Update status
Iteration 1 Thread 2 Execution
Iteration 1 Thread 3 Execution
Iteration 1 Thread 1 Update status
Iteration 1 Thread 2 Execution
Iteration 1 Thread 3 Execution

Iteration 2 Thread 1 Update status
Iteration 2 Thread 2 Execution
Iteration 2 Thread 3 Execution
Iteration 2 Thread 1 Update status
Iteration 2 Thread 2 Execution
Iteration 2 Thread 3 Execution

From a first glance the iter variable could be causing a data race. Do you want to have one for all threads or one per thread?

1 Like

Thanks for your feedback.
It could be only for thread1, cause it is the only thread changing it. So, I think there is not data race, right?

You are defining iter on the main thread and later on within the @tspawnat context. Not sure whether that is causing the issue, but if you want a global counter I‘d try to remove the second iter=0.

1 Like

I think this will work, with thread 1 controlling the schedule of putting items into the channels, and the other threads just taking from their channels as long as they are not empty. I put some sleeps since everything is now asynchronous.

using Base.Threads

function f()
    channels = [Channel{Nothing}() for _ in 1:nthreads()-1]
    @threads for _ in 1:nthreads()
        if threadid() == 1
            for t in 1:2
                status = true
                iter = 0
                while status
                    iter += 1
                    if iter == 2
                        status = false
                    end
                    println("Iteration $t Thread $(threadid()) Update status")
                    for channel in channels
                        @async put!(channel, nothing)
                    end
                    sleep(1)
                end
            end
        else
            @async begin
                for item in channels[threadid() - 1]
                    println("Thread $(threadid()) Execution")
                    sleep(0.1)
                end
            end
        end
    end
    return nothing
end

f()
julia> f()
Iteration 1 Thread 1 Update status
Thread 2 Execution
Thread 3 Execution
Iteration 1 Thread 1 Update status
Thread 2 Execution
Thread 3 Execution
Iteration 2 Thread 1 Update status
Thread 2 Execution
Thread 3 Execution
Iteration 2 Thread 1 Update status
Thread 2 Execution
Thread 3 Execution

If you’d like the iteration number to be communicated to the other threads, you could use channels = [Channel{Tuple{Nothing, Int}}() for _ in 1:nthreads()-1] and then put!(channel, (nothing, t)) and deconstruct the item:

for item in channels[threadid() - 1]
   _, t = item
   println("Iteration $t Thread $(threadid()) Execution")
   sleep(0.1)
end

Then you get:

julia> f()
Iteration 1 Thread 1 Update status
Iteration 1 Thread 2 Execution
Iteration 1 Thread 3 Execution
Iteration 1 Thread 1 Update status
Iteration 1 Thread 2 Execution
Iteration 1 Thread 3 Execution
Iteration 2 Thread 1 Update status
Iteration 2 Thread 3 Execution
Iteration 2 Thread 2 Execution
Iteration 2 Thread 1 Update status
Iteration 2 Thread 3 Execution
Iteration 2 Thread 2 Execution
1 Like

Thank you very much for your great help. I have some points please:
1- Why do you prefer to use @threads rather than @tspawnat? I tried and both and led to the same result
2- Does the the created @async task in the thread will be pinned to it and executed only by it? for example @async put!(channel, nothing) will always create tasks executed by threadid()=1?
3- As far as I know each put! needs take!, why there is no take! in the second @async? Does for item in channels[threadid() - 1] work as take! by checking the channel for unlimited time (i.e., pooling)?
4- I am trying to avoid using sleep cause I care about the performance. Thanks for proposing your code, however, I need to find a faster way to execute. Is it possible that we make a simple change in my MWE to let it work? Actually, I modified my MWE to let iter be only local to threadid()=1 but it starts to go into infinite iterations.

function f()
  channels = [Channel{Nothing}(0) for i in 1:(nthreads()-1)]
  status = true
  thrs = [@tspawnat i begin
  for t in 1:2
    if threadid() == 1
     iter=0
    end
    status = true
    while status
      if threadid() == 1
       #This is to reverse the value of `status` in the second iteration of the `while loop`
        iter += 1;
        if iter == 2
          status = false
        end
        println("Iteration $t Thread $(threadid()) Update status")
        #
        put!.(channels, nothing)
      else
        take!(channels[threadid()-1])
        println("Iteration $t Thread $(threadid()) Execution")
      end # if threadid() == 1
    end # while status
  end  # for t in 1:2
 end for i = 1:nthreads()];
 fetch.(thrs);
end 
f()
  1. I’ve looked at ThreadPools.@tspawnwat but I’m not too familiar so I tried to make it work with just Base.Threads. If it works for you using @tspawnat there is no problem I think.
  2. My understanding is that if you @async in a different thread the task will stay there as it is sticky by default.
  3. Iterating over the channel will be like an unbounded sequence of take!s, I believe.
  4. You can leave out the sleeps, they are not required. I only put them there because there is no actual workload, so the execution is near-instantaneous, and this means that sometimes the order to operations is different (e.g. the first thread will have put multiple nothings before the other threads take anything). So the behavior of my code is quite different as all three threads kind of operate independently for a given channel state, while in your code there is a global schedule for all threads.
    As for your example, I find it quite hard to reason about it so I don’t know to make it work. Maybe someone else will chime in.

I think your use case is a multi-threaded variant of the producer-consumer setting described in the manual so maybe you can find more info there. I think that your example should also work in a single-threaded environment, so maybe you can try to reduce it further to get more insight.

1 Like

Regarding the third point, this is a MWE:

julia> channel = Channel{Int}()
Channel{Int64}(0) (empty)

julia> @async for item in channel
           println(item)
       end
Task (runnable) @0x00007f7c5112d2c0

julia> put!(channel, 1);
1

julia> put!(channel, 10);
10

So the task that we created on the second line will basically just stay there forever and take items off the channel if there are any.

1 Like

Thank you very much to point all out.

As far as I know, when a thread put! in an unbuffered channel, then it will be blocked until take! is applied. So, in this case, the first thread in your code will have only only put! not multiple, right?

in your code, when thread-1 executes the below for-loop it wont stop the the other two threads and will move to execute sleep(1), right?

...
for channel in channels
    @async put!(channel, nothing)
end
 sleep(1)
...

Yes, however, my code is just a test example to set the synchronization between threads. The execution load in my real code is heavy.

  • So, the task will go in a for-loop forever, but at each iteration it waits until a put! is entered in the channel, right? or the task will go over the iterations and at any time a put! is met, then it will println(item) its value?

Iterating over a channel is different from iterating over, say, a vector. With the latter, you don‘t remove the elements and when reaching the last element, the for loop returns. A for loop over a channel never returns, it removes the element it is accessing, and after reaching the last element, it waits on the channel (until new items appear on there).

1 Like

To recap in your code, moving over a channel is as in:

for item in channels[threadid() - 1]
     println("Thread $(threadid()) Execution")
end

And moving over vector is as in below, right?

for channel in channels
    @async put!(channel, nothing)
end

Here, the thread won’t wait at the end of this for loop and you put a sleep to show the println statements in order, right?. Is there another way to let the thread waits the take! of other threads rather than sleep cause I need the first thread to wait the execution of others at this point?

I’m just talking single vectors vs. single channels now, to explain what

for item in channel
    println(item)
end

does.
You could write that for loop equivalently as

while true
    item = take!(channel) # take or wait on the channel
    println(item)
end

From this it is clear that the while loop will never return. This is why we wrap the loop with @async.
Since that loop is nested in other loops, not returning would mean that the outer loops don’t iterate.

Thank you for your explanation.
Is it possible to help me to modify your code to have the same sequence of results without using sleep to make it faster in performance?
Again, the sequence is:

Iteration 1 Thread 1 Update status
Iteration 1 Thread 2 Execution
Iteration 1 Thread 3 Execution
Iteration 1 Thread 1 Update status
Iteration 1 Thread 2 Execution
Iteration 1 Thread 3 Execution
Iteration 2 Thread 1 Update status
Iteration 2 Thread 3 Execution
Iteration 2 Thread 2 Execution
Iteration 2 Thread 1 Update status
Iteration 2 Thread 3 Execution
Iteration 2 Thread 2 Execution

You can just remove the sleeps, but there is no guarantee that threads 2&3 will finish executing the print statement (or whatever workload they need to do) before thread 1 puts items in channels that are already freed. But why would you care about having this strict order? It is an advantage of the multi-threaded async execution model that threads can start working immediately when work is available and not have to wait on other threads.

1 Like

If you want to have thread 1 wait on the others, just define return channels to communicate when a thread is done:

using Base.Threads

function f()
    channels = [Channel{Int}() for _ in 1:nthreads()-1]
    return_channels = [Channel{Int}() for _ in 1:nthreads()-1]
    @async put!.(return_channels, 0) # put something there for thread 1 to take
    @threads for _ in 1:nthreads()
        if threadid() == 1
            for t in 1:2
                status = true
                iter = 0
                while status
                    iter += 1
                    if iter == 2
                        status = false
                    end
                    take!.(return_channels) # take before advancing to the next operation
                    println("Iteration $t Thread $(threadid()) Update status")
                    put!.(channels, t)
                end
            end
        else
            @async begin
                for t in channels[threadid() - 1]
                    println("Iteration $t Thread $(threadid()) Execution")
                    put!(return_channels[threadid() - 1], t) # put to say "I'm done"
                end
            end
        end
    end
    return nothing
end

f()
1 Like