How to use multithreading appropriately?

Functions having same output but different multithreading setting have very different performances. Why would this happen? Are there any rules to conform when using multithreading?

Below shows the code snippets of data generation process and two methods of calculating loglikelihood function. My questions focus on the latter, in which has different multithreading settings and very different performance.

When I run method 1, I get a very high CPU usage (80%+) and poorer performance. When I run method 2, I get a lower CPU usage (40%) and better performance. Is the CPU usage not proportional to the performance in terms of multithreading?

Data Generation Process

using Random, Distributions
using LogExpFunctions, FastGaussQuadrature
const seed = Random.seed!(2023 - 10 - 8)
const LT = 20
pifun(t) = logistic(t - LT / 2)
t = 1:LT
pit = pifun.(t)
pit[end] = 1

Inds = 5000
mR = 3.0
sigmaR = 1.0
mC = -1.0
sigmaC = 1.0
y1 = randn(seed, Inds)
y2 = randn(seed, Inds)

r = exp.(mR .+ sigmaR .* y1)
c = exp.(mC .+ sigmaC .* y2)

function calcEMAX(ri::T, ci::T, pit = pit) where {T <: Real}
	EV = zeros(T, LT)
	for t in (LT - 1):-1:1
		EV[t] = log1pexp(pit[t + 1] * ri - ci + (1 - pit[t + 1]) * EV[t + 1])
	return EV

function simulateY(r = r, c = c, pit = pit)
	Y = Vector{Vector{Int64}}()
	for i in 1:Inds
		t = 1
		yi = Int64[]
		EMAX = calcEMAX(r[i], c[i])
		while t <= LT
			if pit[t] > rand(seed)
				p1prob = 1 / (1 + exp(pit[t] * r[i] - c[i] + (1 - pit[t]) * EMAX[t]))
				if p1prob > rand(seed)
					push!(yi, 1)
					push!(yi, 0)
			t = t + 1
		if yi != []
			push!(Y, yi)
	return Y

Y = simulateY(r, c)

Calculate the likelihood: Method 1

function indll(mR::T, sigmaR::T, mC::T, sigmaC::T, yi, pit = pit, nodes = nodes, weights = weights) where {T <: Real}
	indll = Vector{T}(undef, length(nodes) * length(nodes))
	@sync for k1 in eachindex(nodes)
		for k2 in eachindex(nodes)
			Threads.@spawn begin
				r = exp(mR + sqrt(2) * sigmaR * nodes[k1])
				c = exp(mC + sqrt(2) * sigmaC * nodes[k2])
				EMAX = calcEMAX(r, c)
				indllnode = zero(T)
				for t in eachindex(yi)
					u0 = pit[t] * r - c + (1 - pit[t]) * EMAX[t]
					if yi[t] == 0
						indllnode += u0 - log1pexp(u0)
						indllnode += -log1pexp(u0)
				indll[(k1 - 1) * 10 + k2] = indllnode + log(weights[k1] * weights[k2] / pi)
	return logsumexp(indll)

function nll(mR::T, sigmaR::T, mC::T, sigmaC::T, Y = Y, pit = pit, nodes = nodes, weights = weights) where {T <: Real}
	ll = zero(T)
	for i in eachindex(Y)
		ll += indll(mR, sigmaR, mC, sigmaC, Y[i])
	return -ll


nodes, weights = gausshermite(10)
using BenchmarkTools
@btime nll(zeros(4)...)
#  804.050 ms (4977257 allocations: 476.75 MiB)
# 36692.967318

Calculate the likelihood: Method 2

function indll(mR::T, sigmaR::T, mC::T, sigmaC::T, yi, pit = pit, nodes = nodes, weights = weights) where {T <: Real}
	indll = Vector{T}()
	for k1 ∈ eachindex(nodes)
		for k2 ∈ eachindex(nodes)
			r = exp(mR + sqrt(2) * sigmaR * nodes[k1])
			c = exp(mC + sqrt(2) * sigmaC * nodes[k2])
			EMAX = calcEMAX(r, c)
			indllnode = zero(T)
			for t ∈ eachindex(yi)
				u0 = pit[t] * r - c + (1 - pit[t]) * EMAX[t]
				if yi[t] == 0
					indllnode += u0 - log1pexp(u0)
					indllnode += -log1pexp(u0)
			indllnode = indllnode + log(weights[k1] * weights[k2] / pi)
			push!(indll, indllnode)
	return logsumexp(indll)

function nll(mR::T, sigmaR::T, mC::T, sigmaC::T, Y = Y, pit = pit, nodes = nodes, weights = weights) where {T <: Real}
	ll = zeros(T, Threads.nthreads())
	Threads.@threads for i ∈ eachindex(Y)
		ll[Threads.threadid()] += indll(mR, sigmaR, mC, sigmaC, Y[i])
	return -sum(ll)


@btime nll(zeros(4)...)
# 42.054 ms (1595245 allocations: 132.62 MiB)
# 36692.967318

What you are seeing is likely just overhead from creating and scheduling Tasks. Assuming length(nodes)==10 this means that your nested loops run 100 times so each iteration takes ~400µs. This thread measured the overhead of threading on the order of ~20µs so a bit smaller but that was a somewhat idealized scenario with a single thread. I suppose the scheduling overhead will be somewhat larger for multiple threads.

Generally, when using Julia threads you will cause some allocations, so the workload of each thread needs to be significant. So using larger code chunks is generally better than having many small chunks. This is why your second version is much faster. Note that it is also dangerously close to being incorrect since

I am actually not sure whether you are completely safe in this case.

You could rewrite it like so:

using ChunkSplitters
function nll(mR::T, sigmaR::T, mC::T, sigmaC::T, Y = Y, pit = pit, nodes = nodes, weights = weights) where {T <: Real}
    nchunks = Threads.nthreads()
    ll = zeros(T, nchunks)
    Threads.@threads for (Yrange, chunkid) in ChunkSplitters.chunks(Y, nchunks)
        workspace = zeros(length(nodes), length(nodes) # preallocate workspace
        for i in Yrange
            ll[:,chunkid] += indll!(workspace, mR, sigmaR, mC, sigmaC, Y[i])
	return -sum(ll)

function indll!(indll, mR::T, sigmaR::T, mC::T, sigmaC::T, yi, pit = pit, nodes = nodes, weights = weights) where {T <: Real}
	for k1 ∈ eachindex(nodes)
		for k2 ∈ eachindex(nodes)
			r = exp(mR + sqrt(2) * sigmaR * nodes[k1])
			c = exp(mC + sqrt(2) * sigmaC * nodes[k2])
			EMAX = calcEMAX(r, c)
			indllnode = zero(T)
			for t ∈ eachindex(yi)
				u0 = pit[t] * r - c + (1 - pit[t]) * EMAX[t]
				if yi[t] == 0
					indllnode += u0 - log1pexp(u0)
					indllnode += -log1pexp(u0)
			indllnode = indllnode + log(weights[k1] * weights[k2] / pi)
			indll[k2,k1] = indllnode
	return logsumexp(vec(indll)) # vec reshapes any array to a vector (it's free!)

I took the liberty to also preallocate some workspace for the innerfunction, which I changed to a 2D-Array for easier indexing. It is safer than your original versionsince it does not rely on Threads.threadid() and should be faster as well.

EDIT: Fixed usage of ChunkSplitters


There is another question. Method 2 is faster as each thread handles larger chunks, and therefore there are less overhead caused by creating and scheduling tasks. However, I find method 2, which is faster, has a relatively lower CPU usage (approximately 40%), it didn’t utilize all CPU cores I set. Why would this happen?

Besides, if the workload of each task is different, Is there any solution to schedule the uneven tasks?

Didn’t look at this in-depth, however I can see that method 1 runs multiple @syncs in a loop, while method 2 has a single top-level Threads.@threads invocation, so it doesn’t seem unexpected that method 2 would be faster. Can you rewrite method 1 to have just one @sync?

The “lower” cpu usage might just be a measurement artifact. How did you come up with that number? Function invocation is still very short (~40ms). If you just looked at a Task Manager/htop then the update intervall will be too large to actually tell you the peak cpu load.

Thanks, @abraemer has given the right answer, the slowness is caused by overhead of creating and scheduling tasks.

I got that number from windows task manager.

I have tried many settings. I found CPU usage would be high if Inds is larger enough. I also found a phenomenon, if the work load of each task is over complicated, for example, including many for-loops, then it seems the multithreading is invalid – only one thread is working.