I wrote the following code to test the various executors profited by Floops.jl and FoldsThreads.jl to distribute work between multiple threads. The code creates work (of various sizes) and distributes this work amongst threads. The time to perform the work is measured and displayed.
Overall, this shows me that using the default ThreadedEx() is a good choice as it is often the fastest or close to the fastest. However, some of the work has many slow tasks and then many fast tasks. I would expect the WorkStealingEx() to perform well here, but surprisingly (to me) it doesn’t. If anyone has insights into why these results are what they are I would appreciate the input.
Results
Running test: Even work
Executor : max(time) : Thread 1 Thread 2 Thread 3 Thread 4
ThreadedEx() : 2.602936 : 2.500834 2.602936 2.402311 2.50231
DistributedEx() : 2.542045 : 2.500014 2.542045 2.542043 2.542007
WorkStealingEx() : 5.000031 : 5.000031 2.517802 0 2.5178
DepthFirstEx() : 2.600013 : 2.500021 2.500013 2.600013 2.400038
TaskPoolEx() : 2.508705 : 2.500013 2.508554 2.500014 2.508705
NondeterministicEx() : 3.500061 : 2.000013 3.500061 2.000013 2.500018
NonThreadedEx() : 10.00007 : 10.00007 0 0 0
Running test: Slow → Fast work
Executor : max(time) : Thread 1 Thread 2 Thread 3 Thread 4
ThreadedEx() : 2.500015 : 2.500015 2.407437 1.708907 1.69881
DistributedEx() : 2.510014 : 2.510014 2.010114 2.000098 2.500511
WorkStealingEx() : 5.001415 : 5.001415 1.500349 0 1.690588
DepthFirstEx() : 2.50023 : 2.50023 2.185916 2.085845 2.400012
TaskPoolEx() : 2.500939 : 0.2765167 0.2807491 2.500743 2.500939
NondeterministicEx() : 2.381628 : 1.255439 2.381628 0.8501847 1.075577
NonThreadedEx() : 5.501457 : 5.501457 0 0 0
Running test: Fast → Slow work
Executor : max(time) : Thread 1 Thread 2 Thread 3 Thread 4
ThreadedEx() : 2.510013 : 0.2610509 2.500014 0.3418379 2.510013
DistributedEx() : 2.500013 : 0.2665398 2.500012 0.3372509 2.500013
WorkStealingEx() : 2.535267 : 0.6302342 2.535267 2.535244 0
DepthFirstEx() : 2.615074 : 0.2563641 0.354758 2.615074 2.615057
TaskPoolEx() : 2.500023 : 2.500023 2.400472 2.400192 2.500013
NondeterministicEx() : 2.023689 : 1.305418 2.023689 1.111898 1.239262
NonThreadedEx() : 5.604027 : 5.604027 0 0 0
Running test: Random work
Executor : max(time) : Thread 1 Thread 2 Thread 3 Thread 4
ThreadedEx() : 1.526639 : 1.286924 1.510447 1.526639 1.345505
DistributedEx() : 1.562826 : 1.21845 1.449455 1.562826 1.431401
WorkStealingEx() : 2.572559 : 2.572559 1.619886 1.451544 0
DepthFirstEx() : 1.680727 : 1.220644 1.680727 1.503027 1.48612
TaskPoolEx() : 1.698574 : 1.198054 1.550712 1.698574 1.537739
NondeterministicEx() : 1.595088 : 1.595088 1.234911 0.98544 1.105307
NonThreadedEx() : 4.879672 : 4.879672 0 0 0
Code
module floop_test
using FoldsThreads
using FLoops # exports @floop macro
using Printf
"""
Define function that does some work with amount dependent on x
"""
function doWork(x,worktime)
# Start timer
xtime=time()
# Get thread id
id=Threads.threadid()
# Output to track progress
@printf(".")
# Do work for atleast worktime
s = 0.0
while time()-xtime < worktime(x)
s += rand() - 1.0
end
# Store total time to this work
xtime = time() - xtime
return id, s, xtime
end
"""
Execute doWork in a floop
"""
function flooptest(N,worktime,executor)
s=zeros(N); id=zeros(Int64,N); xtime=zeros(N)
##### Main for loop with @floop #####
@floop executor for x in eachindex(s)
id[x],s[x],xtime[x] = doWork(x,worktime)
end
return s, id, xtime
end
"""
Run test and print results
"""
function runtest(testtype,N,worktime,executors)
println("------------------------")
println("Running test: $testtype ")
println("------------------------")
@printf("%35s : max(time) : ","Executor")
map( id -> @printf(" Thread %2i",id), 1:Threads.nthreads())
print("\n")
for executor in executors
# Run test
s, id, xtime = flooptest(N,worktime,executor)
# Analyze & print results
print("\n\u1b[1F\u1b[0K") # Clear dots
@printf("%35s : ",executor)
# Compute and print time spent working on each thread
time_thread=zeros(Threads.nthreads())
for i = 1:Threads.nthreads()
for n = eachindex(id)
if id[n] == i
time_thread[i] += xtime[n]
end
end
end
@printf(" %10.7g : ",maximum(time_thread))
map( t -> @printf(" %10.7g",t),time_thread)
@printf("\n")
end
end
# Test parameters
Nx=100
executors = [
ThreadedEx(),
DistributedEx(),
WorkStealingEx(),
DepthFirstEx(),
TaskPoolEx(), # recommended not to use in package
NondeterministicEx(),
NonThreadedEx(), # all work done on 1 thread (slow!)
]
random = rand(Nx)
# Run various tests
runtest("Even work",Nx,(x) -> 0.1,executors)
runtest("Slow -> Fast work",Nx,(x) -> x > Nx/2 ? 0.01 : 0.1,executors)
runtest("Fast -> Slow work",Nx,(x) -> x < Nx/2 ? 0.01 : 0.1,executors)
runtest("Random work",Nx,(x) -> 0.1*random[x],executors)
end