queue = Vector{Node}()
push!(queue, rootNode)
bestKnownSolution = 100
while !isempty(queue)
nextNode = pop!(queue)
newNode1, newNode2, bestFoundSolution = processNode!(nextNode)
if !isnothing(newNode1)
addToSortedQueue!(newNode1)
end
if !isnothing(newNode2)
addToSortedQueue!(newNode2)
end
if isBetter(bestFoundSolution, bestKnownSolution)
bestKnownSolution = bestFoundSolution
end
end
return bestKnownSolution
I have been testing an algorithm for about a year now for my thesis. Its cool on its own but before I submit the paper I wanted to try and add some results for how it scales with parallel processing (which it is designed to do well with on paper).
None of the the stuff that happens in processNode! ever reads or writes to anything in the queue.
In my head this was going to be really easy, the block of code inside the while loop gets assigned to a different processor, somehow only one processor is allowed to write to the queue and bestKnownSolution at a time, and the return statement doesn’t trigger until all the processors are done running.
All the examples I can find seem more complicated than this and with a different use in mind. Is what I’m trying to do hard for some reason? Does someone have a link to an example of handling a queue like this, or does someone who uses Distributed know how I can edit the above code to accomplish this?
I’m pretty sure this doesn’t matter at all, but if it does: For testing I have access to as many processors as I want, so I’m going to run it with 2,4, 8, 16, … 256 in the hopes of showing that the speed scales logarithmically with the number of processors like it does on paper. So I think that ideally each node gets assigned to a thread until a processor is available to grab it, but I’m not super sure what the gap between the theory and the practice here is, and I’m just looking for input from someone who ddoes this sort of thing.
Thanks in advance
Not an expert, but I can take a stab at this. I think if you can modify from using a while
loop to a using a for
loop it will make this simple. Because each parallel process needs access to queue
and pop!
is deleting things, there’s not a certainty that the threads won’t clash with each other. So it’s important that each parallel process is either completely independent or you implement great control by locking things and stuff like that (which I don’t know how to do and have managed to avoid by trying to use only what looks the simplest).
Would something like this work?
for q in queue
newNode1, newNode2, bestFound Solution = processNode!(q)
...
end
If this structure works, then the simple thing to do is to mulithread it with Threads.@threads
in front of the for
:
Threads.@threads for q in queue
newNode1 = ...
...
end
I’m sure you’ve seen it, but here’s the multithreading docs anyway. Multi-Threading
And then if you want some cheap and easy compute power, I’d check out JuliaHub and the VSCode extension to run it on the cloud right from your IDE.
The order the queue is processed in is important. lots of nodes can be processed simultaneously by n processors, but they should be the n most important nodes. The queue is sorted so that pop! always takes the most important node. I think I am probably going to have to lock the queue when adding and removing from it, but I wasn’t sure what the best way to do that was. Also the queue doesn’t have any nodes in it besides the root at the beginning, so wouldn’t it just terminate here?
I also think I should be using distributed instead of multi-threading, since I have access to a distributed cluster for my research with an obscene amount of memory, and its what I have done all my other experiments on, now I just want to parallelize it.
Edit: Maybe I don’t need Distributed, its a tool I have access to, but I’m looking at the documentation for the cluster I use, and I don’t think I need it. I just want to parallelize the processing, the memory can all be in one place. So maybe it is as simple as locking the queue and just threading the loop, but I’m still unclear on how to prevent early termination of the loop, if its empty but might not be empty in the future.
Edit2: Now I think I do need to use distributed, but I am not sure of anything, and I still cant get a basic example of this working in either case
Edit3: Back to multi-threading pretty sure this time, still don’t know how to make it work though.
I have a bad, but technically working soluution to this question:
queue = Vector{Node}()
push!(queue, rootNode)
bestKnownSolution = 100
queueLock = ReentrantLock()
while !isempty(queue)
Threads.@threads for it in 1:Threads.nthreads()
nextNode = pop!(queue) #also locked like below
newNode1, newNode2, bestFoundSolution = processNode!(nextNode)
if !isnothing(newNode1)
begin
lock(queueLock)
try
if !isempty(queue)
addToSortedQueue!(newNode1)
end
finally
unlock(queueLock)
end
end
end
if !isnothing(newNode2)
#same deal with the lock
end
if isBetter(bestFoundSolution, bestKnownSolution)
#same deal with the lock
end
end
end
return bestKnownSolution
But this is pretty suboptimal because every thread has to finish before the while loop runs again. So it works, but there is time wasted while the thread just sits there waiting for the other threads to finish for no reason
Depending on if you go with threads or distributed it might look a little different, but I think the main idea for both could be to have the main loop just spawn jobs as long as there are jobs in the queue, and in case there are jobs running but the queue is empty you wait
for a condition. The block with job processing would notify
to the same condition when adding new nodes to the queue. The queue would then have to be either thread safe, using locks as you did in the last post, or if distributed using something like remote channel.
I would probably go with threads if you don’t need the distributed.
I haven’t put proper locks on any of the lines I added here, so you will have to think through that. Just a quick sketch of how I think it could be done.
queue = Vector{Node}()
push!(queue, rootNode)
bestKnownSolution = 100
queueLock = ReentrantLock()
running_jobs = 0
new_job = Condition()
while true
if length(queue) + running_jobs == 0
break;
end
if isempty(queue)
wait(new_job)
end
@async begin
nextNode = pop!(queue) #also locked like below
running_jobs += 1
newNode1, newNode2, bestFoundSolution = processNode!(nextNode)
running_jobs -= 1
if !isnothing(newNode1)
begin
lock(queueLock)
try
if !isempty(queue)
addToSortedQueue!(newNode1)
notify(new_job)
end
finally
unlock(queueLock)
end
end
end
if !isnothing(newNode2)
#same deal with the lock
end
if isBetter(bestFoundSolution, bestKnownSolution)
#same deal with the lock
end
end
end
return bestKnownSolution
2 Likes
Thankyou this is extremely helpful, and the right answer