I want to use and test multi-threading with synchronization between them using channel channels as in the example MWE, in which nthreads()=3. Each thread executes for-loop and while-loop inside it.
In each iteration of the while-loop, the first thread, i.e., threadid()=1, put! into channels and blocks until threads 2&3take!. Then move to the next iteration. threadid()=1 also updates status in the second iteration of the while-loop to get out of it.
To track the execution of thread 1, the command println("Iteration $t Thread $(threadid()) Update status") is assigned it. While for threads 2&3, the command println("Iteration $t Thread $(threadid()) Execution") is assigned to them.
function f()
iter = 0 # artificial parameter that helps in updating `status`
channels = [Channel{Nothing}(0) for i in 1:(nthreads()-1)]
status = true
thrs = [@tspawnat i begin
for t in 1:2
iter = 0
status = true
while status
if threadid() == 1
#This is to reverse the value of `status` in the second iteration of the `while loop`
iter += 1;
if iter == 2
status = false
end
println("Iteration $t Thread $(threadid()) Update status")
#
put!.(channels, nothing)
else
take!(channels[threadid()-1])
println("Iteration $t Thread $(threadid()) Execution")
end # if threadid() == 1
end # while status
end # for t in 1:2
end for i = 1:nthreads()];
fetch.(thrs);
end
f()
I expect the results to be as below, however, it freezes randomly. I think it is due to improper usage of put! and take! for channels. Could you please guide me to solve the problem?.
You are defining iter on the main thread and later on within the @tspawnat context. Not sure whether that is causing the issue, but if you want a global counter I‘d try to remove the second iter=0.
I think this will work, with thread 1 controlling the schedule of putting items into the channels, and the other threads just taking from their channels as long as they are not empty. I put some sleeps since everything is now asynchronous.
using Base.Threads
function f()
channels = [Channel{Nothing}() for _ in 1:nthreads()-1]
@threads for _ in 1:nthreads()
if threadid() == 1
for t in 1:2
status = true
iter = 0
while status
iter += 1
if iter == 2
status = false
end
println("Iteration $t Thread $(threadid()) Update status")
for channel in channels
@async put!(channel, nothing)
end
sleep(1)
end
end
else
@async begin
for item in channels[threadid() - 1]
println("Thread $(threadid()) Execution")
sleep(0.1)
end
end
end
end
return nothing
end
f()
If you’d like the iteration number to be communicated to the other threads, you could use channels = [Channel{Tuple{Nothing, Int}}() for _ in 1:nthreads()-1] and then put!(channel, (nothing, t)) and deconstruct the item:
for item in channels[threadid() - 1]
_, t = item
println("Iteration $t Thread $(threadid()) Execution")
sleep(0.1)
end
Thank you very much for your great help. I have some points please:
1- Why do you prefer to use @threads rather than @tspawnat? I tried and both and led to the same result
2- Does the the created @async task in the thread will be pinned to it and executed only by it? for example @async put!(channel, nothing) will always create tasks executed by threadid()=1?
3- As far as I know each put! needs take!, why there is no take! in the second @async? Does for item in channels[threadid() - 1] work as take! by checking the channel for unlimited time (i.e., pooling)?
4- I am trying to avoid using sleep cause I care about the performance. Thanks for proposing your code, however, I need to find a faster way to execute. Is it possible that we make a simple change in my MWE to let it work? Actually, I modified my MWE to let iter be only local to threadid()=1 but it starts to go into infinite iterations.
function f()
channels = [Channel{Nothing}(0) for i in 1:(nthreads()-1)]
status = true
thrs = [@tspawnat i begin
for t in 1:2
if threadid() == 1
iter=0
end
status = true
while status
if threadid() == 1
#This is to reverse the value of `status` in the second iteration of the `while loop`
iter += 1;
if iter == 2
status = false
end
println("Iteration $t Thread $(threadid()) Update status")
#
put!.(channels, nothing)
else
take!(channels[threadid()-1])
println("Iteration $t Thread $(threadid()) Execution")
end # if threadid() == 1
end # while status
end # for t in 1:2
end for i = 1:nthreads()];
fetch.(thrs);
end
f()
I’ve looked at ThreadPools.@tspawnwat but I’m not too familiar so I tried to make it work with just Base.Threads. If it works for you using @tspawnat there is no problem I think.
My understanding is that if you @async in a different thread the task will stay there as it is sticky by default.
Iterating over the channel will be like an unbounded sequence of take!s, I believe.
You can leave out the sleeps, they are not required. I only put them there because there is no actual workload, so the execution is near-instantaneous, and this means that sometimes the order to operations is different (e.g. the first thread will have put multiple nothings before the other threads take anything). So the behavior of my code is quite different as all three threads kind of operate independently for a given channel state, while in your code there is a global schedule for all threads.
As for your example, I find it quite hard to reason about it so I don’t know to make it work. Maybe someone else will chime in.
I think your use case is a multi-threaded variant of the producer-consumer setting described in the manual so maybe you can find more info there. I think that your example should also work in a single-threaded environment, so maybe you can try to reduce it further to get more insight.
As far as I know, when a thread put! in an unbuffered channel, then it will be blocked until take! is applied. So, in this case, the first thread in your code will have only only put! not multiple, right?
in your code, when thread-1 executes the below for-loop it wont stop the the other two threads and will move to execute sleep(1), right?
...
for channel in channels
@async put!(channel, nothing)
end
sleep(1)
...
Yes, however, my code is just a test example to set the synchronization between threads. The execution load in my real code is heavy.
So, the task will go in a for-loop forever, but at each iteration it waits until a put! is entered in the channel, right? or the task will go over the iterations and at any time a put! is met, then it will println(item) its value?
Iterating over a channel is different from iterating over, say, a vector. With the latter, you don‘t remove the elements and when reaching the last element, the for loop returns. A for loop over a channel never returns, it removes the element it is accessing, and after reaching the last element, it waits on the channel (until new items appear on there).
To recap in your code, moving over a channel is as in:
for item in channels[threadid() - 1]
println("Thread $(threadid()) Execution")
end
And moving over vector is as in below, right?
for channel in channels
@async put!(channel, nothing)
end
Here, the thread won’t wait at the end of this for loop and you put a sleep to show the println statements in order, right?. Is there another way to let the thread waits the take! of other threads rather than sleep cause I need the first thread to wait the execution of others at this point?
I’m just talking single vectors vs. single channels now, to explain what
for item in channel
println(item)
end
does.
You could write that for loop equivalently as
while true
item = take!(channel) # take or wait on the channel
println(item)
end
From this it is clear that the while loop will never return. This is why we wrap the loop with @async.
Since that loop is nested in other loops, not returning would mean that the outer loops don’t iterate.
Thank you for your explanation.
Is it possible to help me to modify your code to have the same sequence of results without using sleep to make it faster in performance?
Again, the sequence is:
You can just remove the sleeps, but there is no guarantee that threads 2&3 will finish executing the print statement (or whatever workload they need to do) before thread 1 puts items in channels that are already freed. But why would you care about having this strict order? It is an advantage of the multi-threaded async execution model that threads can start working immediately when work is available and not have to wait on other threads.
If you want to have thread 1 wait on the others, just define return channels to communicate when a thread is done:
using Base.Threads
function f()
channels = [Channel{Int}() for _ in 1:nthreads()-1]
return_channels = [Channel{Int}() for _ in 1:nthreads()-1]
@async put!.(return_channels, 0) # put something there for thread 1 to take
@threads for _ in 1:nthreads()
if threadid() == 1
for t in 1:2
status = true
iter = 0
while status
iter += 1
if iter == 2
status = false
end
take!.(return_channels) # take before advancing to the next operation
println("Iteration $t Thread $(threadid()) Update status")
put!.(channels, t)
end
end
else
@async begin
for t in channels[threadid() - 1]
println("Iteration $t Thread $(threadid()) Execution")
put!(return_channels[threadid() - 1], t) # put to say "I'm done"
end
end
end
end
return nothing
end
f()