[ANN] ThreadsX.jl: Parallelized Base functions

ThreadsX.jl is aiming at providing API compatible with Base functions optimized for multicore machines.

tl;dr

Quoting tl;dr section of README:

Add prefix ThreadsX. to functions from Base to get some speedup, if supported. Example:

using ThreadsX
ThreadsX.sum(sin, 1:10_000)

To find out functions supported by ThreadsX.jl, just type ThreadsX. + TAB in the REPL:

julia> using ThreadsX

julia> ThreadsX.
MergeSort       any             findlast        mapreduce       sort
QuickSort       count           foreach         maximum         sort!
Set             extrema         issorted        minimum         sum
StableQuickSort findall         map             prod            unique
all             findfirst       map!            reduce

See more in the documentation.

Benchmarks

I ran some benchmarks in a machine with two 12 core CPUs (Intel® Xeon® CPU E5-2650 v4 @ 2.20GHz) with julia nightly build 1.5.0-DEV.464.

`lscpu` output
Architecture:          x86_64
CPU op-mode(s):        32-bit, 64-bit
Byte Order:            Little Endian
CPU(s):                48
On-line CPU(s) list:   0-47
Thread(s) per core:    2
Core(s) per socket:    12
Socket(s):             2
NUMA node(s):          2
Vendor ID:             GenuineIntel
CPU family:            6
Model:                 79
Model name:            Intel(R) Xeon(R) CPU E5-2650 v4 @ 2.20GHz
Stepping:              1
CPU MHz:               1201.664
CPU max MHz:           2900.0000
CPU min MHz:           1200.0000
BogoMIPS:              4401.71
Virtualization:        VT-x
L1d cache:             32K
L1i cache:             32K
L2 cache:              256K
L3 cache:              30720K
NUMA node0 CPU(s):     0-11,24-35
NUMA node1 CPU(s):     12-23,36-47
Flags:                 fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc aperfmperf eagerfpu pni pclmulqdq dtes64 monitor ds_cpl vmx smx est tm2 ssse3 sdbg fma cx16 xtpr pdcm pcid dca sse4_1 sse4_2 x2apic movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand lahf_lm abm 3dnowprefetch epb invpcid_single intel_pt retpoline kaiser tpr_shadow vnmi flexpriority ept vpid fsgsbase tsc_adjust bmi1 hle avx2 smep bmi2 erms invpcid rtm cqm rdseed adx smap xsaveopt cqm_llc cqm_occup_llc cqm_mbm_total cqm_mbm_local dtherm ida arat pln pts

You can find the benchmark script at https://github.com/tkf/ThreadsXBenchmarks.jl

Sorting

sort_agg

  • Y axis is the speed measured with respect to QuickSort defined in Base
  • X axis is JULIA_NUM_THREADS
  • Left column is sorting for a random vector with “narrow” distribution: rand(0:0.01:1, 1_000_000).
  • Right column is sorting for a random vector with “wide” distribution: rand(Float64, 1_000_000).

This is the first time I implemented sorting, let alone a parallel one. So I don’t have a good intuition for how reasonable it is. I hope it’s not awful :slight_smile:

Above benchmarks are for floats. ThreadsX.sort[!] entry point also tries to use parallel counting sort if the input is integer (as done in Base).

Reduction

fold_agg

  • Y axis is the speed measured with respect to the Base function or manual implementation.
  • X axis is JULIA_NUM_THREADS
  • findfirst: findfirst(==(-1), xs) with xs = rand(2^26); xs[2*end÷3] = -1.
  • foreach_symmetrize: A .= B .+ B' implemented using foreach compared against the baseline implemented using manual for loop. B = randn(6000, 6000).
  • sum_sin: sum(sin, 1:10_000_000)
  • unique: unique(rand(1:10, 10_000_000))

The nice scaling of sum_sin is somewhat expected since it’s not memory bound. So, this can be treated as the “upper bound” of scaling that is possible with ThreadsX/Transducers.

It’s interesting that findfirst starts perform poorly at nthreads = 13. But maybe this is not surprising as at this point it starts using two NUMA nodes. Unlike other benchmarks, findfirst is all about terminating tasks as much as possible once the first “needle” is found. This requires communications between tasks which presumably hurts the performance when crossing the NUMA nodes.

Implementation

Most of reduce-based functions are implemented as thin wrappers of Transducers.jl.

Custom collections can support ThreadsX.jl API by implementing SplittablesBase.jl interface.

69 Likes

Hey @tfk, could you provide a short script that runs your benchmarks and saves the results? I could run this on an 64-core Epyc-2, if you’re interested.

Is there a reason for this to be a package and not just in base?

FWIW, not having it in Base has a lot of advantages. it can freely get new releases at any time, it is not tied to the backwards compatibility guarantees of Base, it can use other packages as dependencies that might themselves not be suitable for Base, etc etc.

8 Likes

This looks great. Any comments on the difference between this threaded mapreduce and the one in Strided.jl?

ThreadsXBenchmarks.jl

] add https://github.com/tkf/ThreadsXBenchmarks.jl JSON ArgCheck AtBackslash BangBang BenchmarkTools InteractiveUtils JSON Logging ProgressLogging  Referenceables Setfield Tables ThreadsX Transducers

You need to add the entire list of dependencies to your default environment, because the script starts a new Julia process (with that default environment) and runs the benchmarks there.
Once you’ve done that, it is simply

outdir = "dir/you/want/to/save/results/in"
ThreadsXBenchmarks.run_all(outdir)

The benchmarks hung for me at 8 threads (showing 0% CPU use), with the last thing to print being

[Info: Paramter: (datasize = 1000000, distribution = "narrow", alg = "ThreadsX.QuickSort", basesize = nothing, smallsize = nothing)

I’ve now killed it, and the stacktrace was:

^CERROR: ERROR: InterruptException:
Stacktrace:LoadError:
 [1] poptaskref(::Base.InvasiveLinkedListSynchronized{Task}) at ./task.jl:702
 [2] wait at ./task.jl:709 [inlined]
 [3] wait(::Base.GenericCondition{Base.Threads.SpinLock}) at ./condition.jl:106
 [4] wait(::Base.Process) at ./process.jl:622
 [5] success at ./process.jl:483 [inlined]
 [6] run(::Cmd; wait::Bool) at ./process.jl:440
 [7] run at ./process.jl:438 [inlined]
 [8] runscript(::String, ::Array{String,1}; env::Array{Pair{String,String},1}) at /home/chriselrod/.julia/packages/ThreadsXBenchmarks/3cFLt/src/ThreadsXBenchmarks.jl:52
 [9] run_nthreads(::String; nthreads_range::UnitRange{Int64}) at /home/chriselrod/.julia/packages/ThreadsXBenchmarks/3cFLt/src/ThreadsXBenchmarks.jl:82
 [10] #run_all#10 at /home/chriselrod/.julia/packages/ThreadsXBenchmarks/3cFLt/src/ThreadsXBenchmarks.jl:112 [inlined]
 [11] run_all(::String) at /home/chriselrod/.julia/packages/ThreadsXBenchmarks/3cFLt/src/ThreadsXBenchmarks.jl:112

I am on:

julia> versioninfo()
Julia Version 1.5.0-DEV.521
Commit b637cb786a* (2020-03-28 19:29 UTC)
Platform Info:
  OS: Linux (x86_64-generic-linux)
  CPU: Intel(R) Core(TM) i9-10980XE CPU @ 3.00GHz
  WORD_SIZE: 64
  LIBM: libopenlibm
  LLVM: libLLVM-9.0.1 (ORCJIT, skylake)
Environment:
  JULIA_NUM_THREADS = 18

Let me know if there’s anywhere I should file an issue. Threads hanging are a known issue.

OTOH, we do want Base implementations to be faster, so I think it would make sense to migrate some of these parallel implementations into Base over time. That does also force more code to be threadsafe in the ecosystem, however, so probably best not to do it all at once.

16 Likes

Sure, when things have been tried out in the wild for a while and they feel really solid, then it makes sense.

4 Likes

@Elrod Thanks a lot for trying this out (even though there was absolutely no information for how to use this)!

I added some simple instructions in https://github.com/tkf/ThreadsXBenchmarks.jl

] dev https://github.com/tkf/ThreadsXBenchmarks.jl
; cd ~/.julia/dev/ThreadsXBenchmarks
] activate .
using ThreadsXBenchmarks
ThreadsXBenchmarks.run_all("PATH/TO/OUTPUT/DIRECTORY")

Yeah, the way I used it was to activate the root project (as in the above instructions or with julia --project). In retrospect maybe I should’ve activated it automatically.

Oops. This is embarrassing… I don’t know where to start digging, though. The stacktrace looks like from the parent process and not the sub-process using 8 threads. Was there nothing from the sub-process?

There are no complex task syncing in quicksort so this could be just my programming mistake in bound check/indexing or something causing segfaults in subtasks. This might broke the state of the task scheduler and maybe explain why you didn’t see the stacktrace of subprocess. I guess I should be doing more fuzz testing.

Just in case you want to run the benchmarks again, I created only-sort branch that only runs quicksort benchmark.

If you find something, I highly appreciate it if you let me know in the issue tracker https://github.com/tkf/ThreadsX.jl/issues or here.

@oschulz Thanks a lot for your interest! I think the steps I noted in the comment just above is enough to run the benchmarks.

@Oscar_Smith I agree with @kristoffer.carlsson that it is not ideal to put something like this in Base now. I think we need more exploration to figure out what the best API is in Julia.

For example, there is a keyword argument basesize for tweaking granularity of tasking. It’s a simple API receiving integer at the moment. But it’s possible to add more flexible/dynamic way to specify this number: https://github.com/tkf/Transducers.jl/issues/201. Experimenting something like this is very difficult when the API in Base.

Having said that, I think it’d be nice to add minimal implementation of (say) Threads.map and maybe Threads.mapreduce sometime soon. It seems this is what people try to implement immediately. But there are some foot-guns when implementing Threads.map with raw @spawn and also doing it “correctly” (i.e., not relying on return_type) is hard.

@mcabbott I think tl;dr is that, if you work on array of numbers, use Strided.jl (at least for now). Some more notes on this:

  • Strided.jl’s mapreduce supports dims. ThreadsX does not at this point.

  • It looks like Strided.jl uses @threads instead of @spawn. So I imagine using its mapreduce inside mapreduce does not load-balance well and maybe is not cache-friendly. Though this will not be a problem once @threads is re-written using @spawn. There are at least two open PRs that does this: #35003 / #32477.

  • IIUC Strided.jl’s main focus is arrays. The design of ThreadsX/Transducers is that it works with generic collections (e.g., dicts, sets, strings, etc.). Also, it should work well with iterator transformations like ThreadsX.reduce(+, (x for x in xs if x !== missing)) (though I just noticed that it doesn’t work now. It should be easy to fix).

One API in ThreadsX that is very array-focused is ThreadsX.foreach. This may be useful when you have multiple output arrays. Doing this with @strided presumably needs multiple broadcast expressions. You may be able to get some performance benefits or code clarity by fusing them.

I guess saying something more useful needs actual benchmarks.

3 Likes

I fully agree that base is not ready for new api level stuff, but if there was a way to implement the multi-threaded behavior as an average case speedup transparently, that would be great.

I imagine doing this transparently is very difficult. OK, speeding up something like sum(::Vector{Float64}) automatically using threads is pretty easy. But sum(user_defined_function, ::Vector{Float64}) or sum(::Vector{UserDefinedNumber}) needs that the user-defined things to be “thread-friendly” in the sense that, e.g., every functions touching elements do not mutate something global. This changes the API of sum as it requires more constraints in the input.

2 Likes

Sorry, there was. Here is a more complete copy/paste:

[ Info: Running: `scaling_nthreads_target.jl` with 8 thread(s)                                                                                                                                                                                                                                                                                                                       [160/1900]
[ Info: Benchmark: sum_sin
[ Info: Parameter: (datasize = 1000000,)
[ Info: Parameter: (datasize = 10000000,)
[ Info: Benchmark: foreach_symmetrize
[ Info: Parameter: (datasize = 2000,)
[ Info: Parameter: (datasize = 6000,)
[ Info: Benchmark: sort
[ Info: Parameter: (datasize = 1000000, distribution = "wide", alg = "ThreadsX.MergeSort", basesize = nothing, smallsize = nothing)
[ Info: Parameter: (datasize = 10000000, distribution = "wide", alg = "ThreadsX.MergeSort", basesize = nothing, smallsize = nothing)
[ Info: Parameter: (datasize = 1000000, distribution = "narrow", alg = "ThreadsX.MergeSort", basesize = nothing, smallsize = nothing)
[ Info: Parameter: (datasize = 10000000, distribution = "narrow", alg = "ThreadsX.MergeSort", basesize = nothing, smallsize = nothing)
[ Info: Parameter: (datasize = 1000000, distribution = "wide", alg = "ThreadsX.QuickSort", basesize = nothing, smallsize = nothing)
[ Info: Parameter: (datasize = 10000000, distribution = "wide", alg = "ThreadsX.QuickSort", basesize = nothing, smallsize = nothing)
[ Info: Parameter: (datasize = 1000000, distribution = "narrow", alg = "ThreadsX.QuickSort", basesize = nothing, smallsize = nothing)
^CERROR: ERROR: InterruptException:
Stacktrace:LoadError:
 [1] poptaskref(::Base.InvasiveLinkedListSynchronized{Task}) at ./task.jl:702
 [2] wait at ./task.jl:709 [inlined]
 [3] wait(::Base.GenericCondition{Base.Threads.SpinLock}) at ./condition.jl:106
 [4] wait(::Base.Process) at ./process.jl:622
 [5] success at ./process.jl:483 [inlined]
 [6] run(::Cmd; wait::Bool) at ./process.jl:440
 [7] run at ./process.jl:438 [inlined]
 [8] runscript(::String, ::Array{String,1}; env::Array{Pair{String,String},1}) at /home/chriselrod/.julia/packages/ThreadsXBenchmarks/3cFLt/src/ThreadsXBenchmarks.jl:52
 [9] run_nthreads(::String; nthreads_range::UnitRange{Int64}) at /home/chriselrod/.julia/packages/ThreadsXBenchmarks/3cFLt/src/ThreadsXBenchmarks.jl:82
 [10] #run_all#10 at /home/chriselrod/.julia/packages/ThreadsXBenchmarks/3cFLt/src/ThreadsXBenchmarks.jl:112 [inlined]
 [11] run_all(::String) at /home/chriselrod/.julia/packages/ThreadsXBenchmarks/3cFLt/src/ThreadsXBenchmarks.jl:112
 [12] top-level scope at REPL[3]:1
 [13] eval(::Module, ::Any) at ./boot.jl:331
 [14] eval_user_input(::Any, ::REPL.REPLBackend) at /home/chriselrod/Documents/languages/julia/usr/share/julia/stdlib/v1.5/REPL/src/REPL.jl:130
 [15] run_backend(::REPL.REPLBackend) at /home/chriselrod/.julia/packages/Revise/2K7IK/src/Revise.jl:1070
 [16] top-level scope at none:0

julia> InterruptException:
Stacktrace:
 [1] poptaskref(::Base.InvasiveLinkedListSynchronized{Task}) at ./task.jl:702
 [2] wait at ./task.jl:709 [inlined]
 [3] wait(::Base.GenericCondition{Base.Threads.SpinLock}) at ./condition.jl:106
 [4] _wait(::Task) at ./task.jl:238
 [5] sync_end(::Array{Any,1}) at ./task.jl:294
 [6] macro expansion at ./task.jl:335 [inlined]
 [7] maptasks(::ThreadsX.Implementations.var"#97#98"{Float64,Base.Order.ForwardOrdering}, ::Base.Iterators.Zip{Tuple{Base.Iterators.PartitionIterator{SubArray{Float64,1,Array{Float64,1},Tuple{UnitRange{Int64}},true}},Base.Iterators.PartitionIterator{Array{Int8,1}}}}) at /home/chriselrod/.julia/packages/ThreadsX/OsJPr/src/utils.jl:49
 [8] _quicksort!(::Array{Float64,1}, ::SubArray{Float64,1,Array{Float64,1},Tuple{UnitRange{Int64}},true}, ::ThreadsX.Implementations.ParallelQuickSortAlg{Base.Sort.QuickSortAlg,Int64,Int64}, ::Base.Order.ForwardOrdering, ::Array{Int8,1}, ::Bool, ::Bool) at /home/chriselrod/.julia/packages/ThreadsX/OsJPr/src/quicksort.jl:74
 [9] sort!(::Array{Float64,1}, ::Int64, ::Int64, ::ThreadsX.Implementations.ParallelQuickSortAlg{Base.Sort.QuickSortAlg,Nothing,Int64}, ::Base.Order.ForwardOrdering) at /home/chriselrod/.julia/packages/ThreadsX/OsJPr/src/quicksort.jl:22
 [10] _sort! at /home/chriselrod/.julia/packages/ThreadsX/OsJPr/src/mergesort.jl:130 [inlined]
 [11] #sort!#86 at /home/chriselrod/.julia/packages/ThreadsX/OsJPr/src/mergesort.jl:170 [inlined]
 [12] #3 at /home/chriselrod/.julia/packages/ThreadsXBenchmarks/3cFLt/src/scripts/scaling_nthreads_target.jl:21 [inlined]
 [13] ##core#345(::NamedTuple{(:datasize, :dist, :alg, :basesize, :smallsize),Tuple{Int64,NamedTuple{(:label, :value),Tuple{String,StepRangeLen{Float64,Base.TwicePrecision{Float64},Base.TwicePrecision{Float64}}}},NamedTuple{(:label, :value),Tuple{String,ThreadsX.Implementations.ParallelQuickSortAlg{Base.Sort.QuickSortAlg,Nothing,Int64}}},Nothing,Nothing}}, ::MersenneTwister, ::Arr
ay{Float64,1}, ::var"#3#14") at /home/chriselrod/.julia/packages/BenchmarkTools/eCEpo/src/execution.jl:371
 [14] ##sample#346(::BenchmarkTools.Parameters) at /home/chriselrod/.julia/packages/BenchmarkTools/eCEpo/src/execution.jl:379
 [15] sample at /home/chriselrod/.julia/packages/BenchmarkTools/eCEpo/src/execution.jl:394 [inlined]
 [16] _lineartrial(::BenchmarkTools.Benchmark{Symbol("##benchmark#344")}, ::BenchmarkTools.Parameters; maxevals::Int64, kwargs::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}) at /home/chriselrod/.julia/packages/BenchmarkTools/eCEpo/src/execution.jl:133
 [17] _lineartrial(::BenchmarkTools.Benchmark{Symbol("##benchmark#344")}, ::BenchmarkTools.Parameters) at /home/chriselrod/.julia/packages/BenchmarkTools/eCEpo/src/execution.jl:125
 [18] #invokelatest#1 at ./essentials.jl:710 [inlined]
 [19] invokelatest at ./essentials.jl:709 [inlined]
 [20] #lineartrial#38 at /home/chriselrod/.julia/packages/BenchmarkTools/eCEpo/src/execution.jl:33 [inlined]
 [21] lineartrial at /home/chriselrod/.julia/packages/BenchmarkTools/eCEpo/src/execution.jl:33 [inlined]
 [22] tune!(::BenchmarkTools.Benchmark{Symbol("##benchmark#344")}, ::BenchmarkTools.Parameters; progressid::Nothing, nleaves::Float64, ndone::Float64, verbose::Bool, pad::String, kwargs::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}) at /home/chriselrod/.julia/packages/BenchmarkTools/eCEpo/src/execution.jl:209
 [23] macro expansion at /home/chriselrod/.julia/packages/ThreadsXBenchmarks/3cFLt/src/scripts/scaling_nthreads_target.jl:116 [inlined]
 [24] macro expansion at /home/chriselrod/.julia/packages/ProgressLogging/g8xnW/src/ProgressLogging.jl:470 [inlined]
 [25] macro expansion at /home/chriselrod/.julia/packages/ThreadsXBenchmarks/3cFLt/src/scripts/scaling_nthreads_target.jl:104 [inlined]
 [26] macro expansion at /home/chriselrod/.julia/packages/ProgressLogging/g8xnW/src/ProgressLogging.jl:470 [inlined]
 [27] main(::Array{String,1}; benchmark_definitions::Dict{Symbol,NamedTuple{(:prepare, :run, :paramasdata, :paramaxes),T} where T<:Tuple}, scriptname::String, tags::Array{String,1}) at /home/chriselrod/.julia/packages/ThreadsXBenchmarks/3cFLt/src/scripts/scaling_nthreads_target.jl:100
 [28] main at /home/chriselrod/.julia/packages/ThreadsXBenchmarks/3cFLt/src/scripts/scaling_nthreads_target.jl:96 [inlined] (repeats 2 times)
 [29] top-level scope at /home/chriselrod/.julia/packages/ThreadsXBenchmarks/3cFLt/src/scripts/scaling_nthreads_target.jl:144
 [30] include(::String) at ./client.jl:442
 [31] top-level scope at none:10
in expression starting at /home/chriselrod/.julia/packages/ThreadsXBenchmarks/3cFLt/src/scripts/scaling_nthreads_target.jl:143

And now from the new branch, it seemingly hung at the same place (showing 0% CPU use). After killing it, the stack trace:

[ Info: Running: `scaling_nthreads_target.jl` with 8 thread(s)
[ Info: Benchmark: sort
[ Info: Parameter: (datasize = 1000000, distribution = "wide", alg = "ThreadsX.QuickSort", basesize = nothing, smallsize = nothing)
[ Info: Parameter: (datasize = 10000000, distribution = "wide", alg = "ThreadsX.QuickSort", basesize = nothing, smallsize = nothing)
[ Info: Parameter: (datasize = 1000000, distribution = "narrow", alg = "ThreadsX.QuickSort", basesize = nothing, smallsize = nothing)
^Cfatal: error thrown and no exception handler available.
InterruptException()
ERROR: jl_mutex_unlock at /home/chriselrod/Documents/languages/julia/src/locks.h:143 [inlined]
jl_task_get_next at /home/chriselrod/Documents/languages/julia/src/partr.c:441
poptaskref at ./task.jl:702
wait at ./task.jl:709 [inlined]
task_done_hook at ./task.jl:444
jl_apply at /home/chriselrod/Documents/languages/julia/src/julia.h:1685 [inlined]
jl_finish_task at /home/chriselrod/Documents/languages/julia/src/task.c:198
start_task at /home/chriselrod/Documents/languages/julia/src/task.c:697
unknown function (ip: (nil))
InterruptException:
Stacktrace:
 [1] poptaskref(::Base.InvasiveLinkedListSynchronized{Task}) at ./task.jl:702
 [2] wait at ./task.jl:709 [inlined]
 [3] wait(::Base.GenericCondition{Base.Threads.SpinLock}) at ./condition.jl:106
 [4] wait(::Base.Process) at ./process.jl:622
 [5] success at ./process.jl:483 [inlined]
 [6] run(::Cmd; wait::Bool) at ./process.jl:440
 [7] run at ./process.jl:438 [inlined]
 [8] runscript(::String, ::Array{String,1}; env::Array{Pair{String,String},1}) at /home/chriselrod/.julia/packages/ThreadsXBenchmarks/sqtbN/src/ThreadsXBenchmarks.jl:52
 [9] run_nthreads(::String; nthreads_range::UnitRange{Int64}) at /home/chriselrod/.julia/packages/ThreadsXBenchmarks/sqtbN/src/ThreadsXBenchmarks.jl:82
 [10] #run_all#10 at /home/chriselrod/.julia/packages/ThreadsXBenchmarks/sqtbN/src/ThreadsXBenchmarks.jl:112 [inlined]
 [11] run_all(::String) at /home/chriselrod/.julia/packages/ThreadsXBenchmarks/sqtbN/src/ThreadsXBenchmarks.jl:112
 [12] top-level scope at REPL[7]:1

To be clear, I just didn’t post the correct stack trace earlier.
And the stacktraces only appeared after CTRL-C.

The first time I tried running it, the stack trace showed Julia code, but the second time it shows src/locks.h and src/partr.c. I don’t know much about debugging this sort of thing.
I suspect that the CPU use declined to 0 is notable. Often it goes to 100% (i.e., a single core at max) when it gets stuck.
But that the stack traces were different suggests it isn’t totally idle?

Thanks a lot for the stacktrace! It’s very helpful (even though I’m still puzzled).

Does this occur if you interactively run

using ThreadsX
ThreadsX.sort!(rand(0:0.01:1, 1_000_000))

or

@btime ThreadsX.sort($(rand(0:0.01:1, 1_000_000)))

or from command line:

JULIA_NUM_THREADS=8 julia -e 'using ThreadsX, BenchmarkTools; @btime ThreadsX.sort($(rand(0:0.01:1, 1_000_000)))'

?

Do you mean it doesn't use (say) 7 CPUs even when the log message says ```Running: `scaling_nthreads_target.jl` with 7 thread(s)```? (Edit: never mind, you said when it gets stuck)

I meant to say, the pattern I often saw when things got stuck with X-threads is:

  1. Running fine with X*100% CPU use
  2. Stuck, with only 100% CPU use.

but here, I instead saw

  1. Running fine with X*100% CPU use
  2. Stuck, with only 0% CPU use.

Meaning, I did see 700% CPU-use with 7 threads.
I brought this up thinking it may provide a clue about where it gets stuck. It isn’t busy-waiting.

Although, running it either from the shell or interactively hasn’t yet froze:

julia> Threads.nthreads()
8

julia> using ThreadsX, BenchmarkTools

julia> @time ThreadsX.sort!(rand(0:0.01:1, 1_000_000));
  0.627938 seconds (2.64 M allocations: 132.785 MiB, 3.11% gc time)

julia> @btime ThreadsX.sort($(rand(0:0.01:1, 1_000_000)));
  3.503 ms (21429 allocations: 17.77 MiB)

julia> @btime ThreadsX.sort($(rand(0:0.01:1, 1_000_000)));
  3.583 ms (20970 allocations: 17.74 MiB)

julia> @btime ThreadsX.sort($(rand(0:0.01:1, 1_000_000)));
  3.514 ms (20562 allocations: 17.70 MiB)

julia> @btime ThreadsX.sort($(rand(0:0.01:1, 1_000_000)));
  3.488 ms (20575 allocations: 17.70 MiB)

julia> @btime ThreadsX.sort($(rand(0:0.01:1, 1_000_000)));
  3.534 ms (20750 allocations: 17.72 MiB)

julia> @btime ThreadsX.sort($(rand(0:0.01:1, 1_000_000)));
  3.452 ms (20753 allocations: 17.71 MiB)

julia> @btime ThreadsX.sort($(rand(0:0.01:1, 1_000_000)));
  3.476 ms (20755 allocations: 17.72 MiB)

julia> @btime ThreadsX.sort($(rand(0:0.01:1, 1_000_000)));
  3.346 ms (19802 allocations: 17.65 MiB)

EDIT: and then it hangs on the 9th @btime.

julia> @btime ThreadsX.sort($(rand(0:0.01:1, 1_000_000)));
^Cfatal: error thrown and no exception handler available.
InterruptException()
jl_mutex_unlock at /home/chriselrod/Documents/languages/julia/src/locks.h:143 [inlined]
jl_task_get_next at /home/chriselrod/Documents/languages/julia/src/partr.c:441
poptaskref at ./task.jl:702
wait at ./task.jl:709 [inlined]
task_done_hook at ./task.jl:444
jl_apply at /home/chriselrod/Documents/languages/julia/src/julia.h:1685 [inlined]
jl_finish_task at /home/chriselrod/Documents/languages/julia/src/task.c:198
start_task at /home/chriselrod/Documents/languages/julia/src/task.c:697
unknown function (ip: (nil))

Seems this stack trace is related to the ctrl-C. It’s similar to the one reported here, anyway.
Again:

julia> Threads.nthreads()
8

julia> using ThreadsX, BenchmarkTools

julia> @time ThreadsX.sort!(rand(0:0.01:1, 1_000_000));
  0.745465 seconds (2.64 M allocations: 132.803 MiB, 17.90% gc time)

julia> @btime ThreadsX.sort($(rand(0:0.01:1, 1_000_000)));
  3.440 ms (20427 allocations: 17.69 MiB)

julia> @btime ThreadsX.sort($(rand(0:0.01:1, 1_000_000)));
  3.503 ms (21267 allocations: 17.75 MiB)

julia> @btime ThreadsX.sort($(rand(0:0.01:1, 1_000_000)));
^Cfatal: error thrown and no exception handler available.
InterruptException()
jl_mutex_unlock at /home/chriselrod/Documents/languages/julia/src/locks.h:143 [inlined]
jl_task_get_next at /home/chriselrod/Documents/languages/julia/src/partr.c:441
poptaskref at ./task.jl:702
wait at ./task.jl:709 [inlined]
task_done_hook at ./task.jl:444
jl_apply at /home/chriselrod/Documents/languages/julia/src/julia.h:1685 [inlined]
jl_finish_task at /home/chriselrod/Documents/languages/julia/src/task.c:198
start_task at /home/chriselrod/Documents/languages/julia/src/task.c:697
unknown function (ip: (nil))

Thanks for the clarification. I agree that CPU usage going to 0% is suspicious. I think I saw something like this when the scheduler state was disrupted by out-of-bounds write.

Do you mind continue this in the issue tracker?

@tkf really cool package. I assume it still uses simd on a per thread basis to get those speedups. Is that true and if so how do you manage the simd by sending blocks of arrays to different threads?

I don’t do anything fancy at all. You just have simd keyword argument which can be true, false or :ivdep. When it’s true or :ivdep, each single-thread base case is run with @simd for or @simd ivdep for, respectively.

For more serious automatic vectorization, I think I can add an API like ThreadsX.foreach(f, blocked(indices)) so that you can use it with LoopVectorization.jl:

ThreadsX.foreach(blocked(eachindex(A, B, C))) do indices
    @avx for i ∈ indices
        ...
    end
end

But I think at this point it’s probably more straight forward to add threading support to LoopVectorization.jl API. @Elrod What do you think?

1 Like

Thanks a lot for your interest!

Ok, I’ll see when I can get the machine reserved for exclusive use one of the next nights.