This is going to be quite long, I’m going to try to include everything I can.
I’m trying to implement a parallel quicksort algorithm, as a part of a project to compare multiple programming languages. The way I’m doing it involves some thread micro-management and data exchange between threads.
Now the only way to spawn threads in Julia that I know of is the Threads.@threads
macro, while maintaining thread safety is achieved using Threads.Mutex
- es.
Sadly this is quite limiting. Every program using this mechanism I came across simply splits an array between threads, making them each process a number of elements.
To come around this limitation I came up with a contrived way, or better, system, that lets me make a “queue” for work distribution between threads:
- there are 2 arrays for thread arguments, each holds
Threads.nthreads()
Channel
s - at the beginning only the first thread gets arguments to process
- during execution it dispatches work for other threads by
put!
ting arguments into other threads’ argument channels- everything is wrapped in mutexes and other checks to minimize race conditions and such
- at some point each thread gets to a point where this argument throwing stops and it returns
The whole thing is wrapped in a function threadWork
that gets called in a loop with @threads
macro.
Code (variable names in English, comments mostly in Slovene, dive into at your own risk):
function generatePivot(src::AbstractArray{T,1} where T<:Number)
if size(src)[1] > 15
range = 1:size(src)[1]
sum = Float64(0)
sum += src[rand(range)]
sum += src[rand(range)]
sum += src[rand(range)]
sum /= 3
return Int(floor(sum))
else
return src[rand(1:size(src)[1])]
end
end
function onePassSwaps!(src::AbstractArray{T,1} where T<:Number,
pivot::Number)
# inicializacija
srcSize = size(src)[1]
i = 0
j = srcSize + 1
# Hoarova particija - https://en.wikipedia.org/wiki/Quicksort
while i < j
i += 1
while i <= srcSize && src[i] < pivot
i += 1
end
j -= 1
while j > 0 && src[j] > pivot
j -= 1
end
if i >= j
return (j, srcSize - j) # dolžina levega dela tabele
end
src[i], src[j] = src[j], src[i]
end
end
function copyArray!(src::AbstractArray{T,1} where T<:Number,
dest::AbstractArray{T,1} where T<:Number)
dest[:] = src[:]
end
function inPlaceQuickSort!(src::AbstractArray{T,1} where T<:Number)
if (size(src)[1] > 1)
pivot = generatePivot(src)
sizes = onePassSwaps!(src, pivot)
inPlaceQuickSort!(view(src, 1:sizes[1]))
inPlaceQuickSort!(view(src, (sizes[1]+1):(size(src)[1])))
end
end
function outwardQuickSort!(src::AbstractArray{T,1} where T<:Number,
dest::AbstractArray{T,1} where T<:Number)
if (size(src)[1] > 1)
copyArray!(src, dest)
pivot = generatePivot(dest)
sizes = onePassSwaps!(dest, pivot)
inPlaceQuickSort!(view(dest, 1:sizes[1]))
inPlaceQuickSort!(view(dest, sizes[1]+1:size(src)[1]))
else
dest[1] = src[1]
end
end
function threadSafePut!(c::Channel, m::Threads.Mutex, e)
Base.lock(m)
put!(c, e)
Base.unlock(m)
end
function threadSafeTake!(c::Channel, m::Threads.Mutex)
Base.lock(m)
while !isready(c)
Base.unlock(m) # will this hackery work?
Base.sleep(0.001)
Base.lock(m) # maybe!
end
ret = take!(c)
Base.unlock(m)
return ret
end
function threadWork(channels, subSizes, channels_ready, sizes_ready, debugChannel)
descending = true # spuščamo se v globino
myID = Threads.threadid()
while descending
args = threadSafeTake!(channels[myID], channels_ready[myID])
if args[1] == "prep"
tabela, tempTabela, size, forward, procesi = args[2]
if procesi == 1
descending = false
if forward
inPlaceQuickSort!(tabela)
else
outwardQuickSort!(tempTabela, tabela)
end
else
# generiranje pivota, indeksov podtabel
pivot = generatePivot(tabela)
put!(debugChannel, pivot)
step = size / procesi
starts = [Int(floor((i - 1) * step)) + 1 for i in 1:procesi]
ends = [Int(floor(i * step)) for i in 1:procesi]
ends[procesi] = size # za vsak primer
if forward
src = tabela
tgt = tempTabela
else
src = tempTabela
tgt = tabela
end
# klicanje asinhronih metod
for i = 1:procesi
threadSafePut!(channels[i + myID - 1], channels_ready[i + myID - 1], ("swaps", (view(src, starts[i]:ends[i]), pivot)))
end
# priprava za organiziranje
threadSafePut!(channels[myID], channels_ready[myID], ("organize", (src, tgt, size, procesi, forward)))
end
elseif args[1] == "organize"
src, tgt, size, procesi, forward = args[2]
sizes = [threadSafeTake!(subSizes[i + myID - 1], sizes_ready[i + myID - 1]) for i in 1:procesi]
# računanje velikosti delov in odmikov
lowers = Array{Int, 1}(undef, procesi)
highers = Array{Int, 1}(undef, procesi)
starts = Array{Int, 1}(undef, procesi)
lowers[1] = 1
highers[1] = 1
starts[1] = 1
for (i, lens) in enumerate(sizes[1:procesi-1])
lowers[i+1] = lowers[i] + lens[1]
highers[i+1] = highers[i] + lens[2]
starts[i+1] = starts[i] + lens[1] + lens[2]
end
highers .+= lowers[procesi] + sizes[procesi][1] - 1 #luškan mali trik
# klicanje asinhronih metod
for i = 1:procesi
threadSafePut!(channels[i + myID - 1], channels_ready[i + myID - 1],
("copy", (view(src, starts[i]:starts[i]+sizes[i][1]-1),
view(tgt, lowers[i]:lowers[i]+sizes[i][1]-1))))
end
# priprava za drugo kopiranje
threadSafePut!(channels[myID], channels_ready[myID], ("organize2", (src, tgt, size, procesi, lowers, highers, starts, sizes, forward)))
elseif args[1] == "organize2"
src, tgt, size, procesi, lowers, highers, starts, sizes, forward = args[2]
# čakanje ostalih niti
[threadSafeTake!(subSizes[i + myID - 1], sizes_ready[i + myID - 1]) for i in 1:procesi]
# klicanje asinhronih metod
for i = 1:procesi
threadSafePut!(channels[i + myID - 1], channels_ready[i + myID - 1],
("copy", (view(src, starts[i]+sizes[i][1]:starts[i]+sizes[i][1]+sizes[i][2]-1),
view(tgt, highers[i]:highers[i]+sizes[i][2]-1))))
end
# priprava za deljenje
threadSafePut!(channels[myID], channels_ready[myID], ("divide", (src, tgt, size, procesi, lowers, sizes, forward)))
elseif args[1] == "divide"
src, tgt, size, procesi, lowers, sizes, forward = args[2]
# razpoložljive procese razdelimo čim bolj "pravično" na dva dela
prviDel = Int(floor((lowers[procesi] + sizes[procesi][1] - 1) * procesi / size)) + 1
if prviDel <= 0
prviDel = 1
elseif prviDel >= procesi
prviDel = procesi-1
end
# povrnemo začetno orientacijo
if !forward
src, tgt = tgt, src
end
# računanje velikosti tabel
size1 = lowers[procesi] + sizes[procesi][1] - 1
size2 = size - size1
# asinhrona rekurzivna klica
threadSafePut!(channels[myID], channels_ready[myID],
("prep", (view(src, 1:lowers[procesi]+sizes[procesi][1]-1),
view(tgt, 1:lowers[procesi]+sizes[procesi][1]-1),
size1, !forward, prviDel)))
threadSafePut!(channels[myID + prviDel], channels_ready[myID + prviDel],
("prep", (view(src, lowers[procesi]+sizes[procesi][1]:size),
view(tgt, lowers[procesi]+sizes[procesi][1]:size),
size2, !forward, procesi - prviDel)))
elseif args[1] == "swaps"
src, pivot = args[2]
size = onePassSwaps!(src, pivot)
threadSafePut!(subSizes[myID], sizes_ready[myID], size)
elseif args[1] == "copy"
src, dest = args[2]
copyArray!(src, dest)
threadSafePut!(subSizes[myID], sizes_ready[myID], true)
else
error("Neznan korak: " * args[1])
end
end
return nothing
end
channels = [Channel(2) for i = 1:Threads.nthreads()] # element = (tip::String, args::Tuple)
subSizes = [Channel(1) for i = 1:Threads.nthreads()]
channels_ready = [Threads.Mutex() for i = 1:Threads.nthreads()]
sizes_ready = [Threads.Mutex() for i = 1:Threads.nthreads()]
src = rand(Int, 1000000)
temp = similar(src)
display(src)
debugChannel = Channel(100)
threadSafePut!(channels[1], channels_ready[1], ("prep", (src, temp, size(src)[1], true, Threads.nthreads())))
Threads.@threads for i = 1:Threads.nthreads()
threadWork(channels, subSizes, channels_ready, sizes_ready, debugChannel)
end
display(src)
issorted(src)
Problems start when I try to run this at more than 2 threads.
First I set the environment variable as such:
$env:JULIA_NUM_THREADS=4
Then I run Julia REPL, inputting:
include("multithreading_test.jl")
Problems I came across:
- Julia just crashes (literally at one point the Powershell prompt just appears, no error message or whatever)
- Everything stalls (left the window for a few minutes, nothing happened)
- Different error messages:
- and my personal favourite:
Again, this only happens with 3 threads or more, 1 and 2 threads work perfectly.
Anyway, I know this all might be a bit too hacky and unpredictable, but if anyone has any idea what could be going wrong, please do share. If nothing good comes out of this, I’ll try finding another solution, be it using multiprocessing, MPI or something similar.
Thank you!