How to use multithreading appropriately?

Functions having same output but different multithreading setting have very different performances. Why would this happen? Are there any rules to conform when using multithreading?

Below shows the code snippets of data generation process and two methods of calculating loglikelihood function. My questions focus on the latter, in which has different multithreading settings and very different performance.

When I run method 1, I get a very high CPU usage (80%+) and poorer performance. When I run method 2, I get a lower CPU usage (40%) and better performance. Is the CPU usage not proportional to the performance in terms of multithreading?

Data Generation Process

using Random, Distributions
using LogExpFunctions, FastGaussQuadrature
const seed = Random.seed!(2023 - 10 - 8)
const LT = 20
pifun(t) = logistic(t - LT / 2)
t = 1:LT
pit = pifun.(t)
pit[end] = 1

Inds = 5000
mR = 3.0
sigmaR = 1.0
mC = -1.0
sigmaC = 1.0
y1 = randn(seed, Inds)
y2 = randn(seed, Inds)

r = exp.(mR .+ sigmaR .* y1)
c = exp.(mC .+ sigmaC .* y2)

function calcEMAX(ri::T, ci::T, pit = pit) where {T <: Real}
	EV = zeros(T, LT)
	for t in (LT - 1):-1:1
		EV[t] = log1pexp(pit[t + 1] * ri - ci + (1 - pit[t + 1]) * EV[t + 1])
	end
	return EV
end

function simulateY(r = r, c = c, pit = pit)
	Y = Vector{Vector{Int64}}()
	for i in 1:Inds
		t = 1
		yi = Int64[]
		EMAX = calcEMAX(r[i], c[i])
		while t <= LT
			if pit[t] > rand(seed)
				break
			else
				p1prob = 1 / (1 + exp(pit[t] * r[i] - c[i] + (1 - pit[t]) * EMAX[t]))
				if p1prob > rand(seed)
					push!(yi, 1)
					break
				else
					push!(yi, 0)
				end
			end
			t = t + 1
		end
		if yi != []
			push!(Y, yi)
		end
	end
	return Y
end

Y = simulateY(r, c)

Calculate the likelihood: Method 1

function indll(mR::T, sigmaR::T, mC::T, sigmaC::T, yi, pit = pit, nodes = nodes, weights = weights) where {T <: Real}
	indll = Vector{T}(undef, length(nodes) * length(nodes))
	@sync for k1 in eachindex(nodes)
		for k2 in eachindex(nodes)
			Threads.@spawn begin
				r = exp(mR + sqrt(2) * sigmaR * nodes[k1])
				c = exp(mC + sqrt(2) * sigmaC * nodes[k2])
				EMAX = calcEMAX(r, c)
				indllnode = zero(T)
				for t in eachindex(yi)
					u0 = pit[t] * r - c + (1 - pit[t]) * EMAX[t]
					if yi[t] == 0
						indllnode += u0 - log1pexp(u0)
					else
						indllnode += -log1pexp(u0)
					end
				end
				indll[(k1 - 1) * 10 + k2] = indllnode + log(weights[k1] * weights[k2] / pi)
			end
		end
	end
	return logsumexp(indll)
end

function nll(mR::T, sigmaR::T, mC::T, sigmaC::T, Y = Y, pit = pit, nodes = nodes, weights = weights) where {T <: Real}
	ll = zero(T)
	for i in eachindex(Y)
		ll += indll(mR, sigmaR, mC, sigmaC, Y[i])
	end
	return -ll
end

Performance

nodes, weights = gausshermite(10)
using BenchmarkTools
@btime nll(zeros(4)...)
#  804.050 ms (4977257 allocations: 476.75 MiB)
# 36692.967318

Calculate the likelihood: Method 2

function indll(mR::T, sigmaR::T, mC::T, sigmaC::T, yi, pit = pit, nodes = nodes, weights = weights) where {T <: Real}
	indll = Vector{T}()
	for k1 ∈ eachindex(nodes)
		for k2 ∈ eachindex(nodes)
			r = exp(mR + sqrt(2) * sigmaR * nodes[k1])
			c = exp(mC + sqrt(2) * sigmaC * nodes[k2])
			EMAX = calcEMAX(r, c)
			indllnode = zero(T)
			for t ∈ eachindex(yi)
				u0 = pit[t] * r - c + (1 - pit[t]) * EMAX[t]
				if yi[t] == 0
					indllnode += u0 - log1pexp(u0)
				else
					indllnode += -log1pexp(u0)
				end
			end
			indllnode = indllnode + log(weights[k1] * weights[k2] / pi)
			push!(indll, indllnode)
		end
	end
	return logsumexp(indll)
end

function nll(mR::T, sigmaR::T, mC::T, sigmaC::T, Y = Y, pit = pit, nodes = nodes, weights = weights) where {T <: Real}
	ll = zeros(T, Threads.nthreads())
	Threads.@threads for i ∈ eachindex(Y)
		ll[Threads.threadid()] += indll(mR, sigmaR, mC, sigmaC, Y[i])
	end
	return -sum(ll)
end

Performace

@btime nll(zeros(4)...)
# 42.054 ms (1595245 allocations: 132.62 MiB)
# 36692.967318

What you are seeing is likely just overhead from creating and scheduling Tasks. Assuming length(nodes)==10 this means that your nested loops run 100 times so each iteration takes ~400µs. This thread measured the overhead of threading on the order of ~20µs so a bit smaller but that was a somewhat idealized scenario with a single thread. I suppose the scheduling overhead will be somewhat larger for multiple threads.

Generally, when using Julia threads you will cause some allocations, so the workload of each thread needs to be significant. So using larger code chunks is generally better than having many small chunks. This is why your second version is much faster. Note that it is also dangerously close to being incorrect since

I am actually not sure whether you are completely safe in this case.

You could rewrite it like so:

using ChunkSplitters
function nll(mR::T, sigmaR::T, mC::T, sigmaC::T, Y = Y, pit = pit, nodes = nodes, weights = weights) where {T <: Real}
    nchunks = Threads.nthreads()
    ll = zeros(T, nchunks)
    Threads.@threads for (Yrange, chunkid) in ChunkSplitters.chunks(Y, nchunks)
        workspace = zeros(length(nodes), length(nodes) # preallocate workspace
        for i in Yrange
            ll[:,chunkid] += indll!(workspace, mR, sigmaR, mC, sigmaC, Y[i])
        end
	end
	return -sum(ll)
end

function indll!(indll, mR::T, sigmaR::T, mC::T, sigmaC::T, yi, pit = pit, nodes = nodes, weights = weights) where {T <: Real}
	for k1 ∈ eachindex(nodes)
		for k2 ∈ eachindex(nodes)
			r = exp(mR + sqrt(2) * sigmaR * nodes[k1])
			c = exp(mC + sqrt(2) * sigmaC * nodes[k2])
			EMAX = calcEMAX(r, c)
			indllnode = zero(T)
			for t ∈ eachindex(yi)
				u0 = pit[t] * r - c + (1 - pit[t]) * EMAX[t]
				if yi[t] == 0
					indllnode += u0 - log1pexp(u0)
				else
					indllnode += -log1pexp(u0)
				end
			end
			indllnode = indllnode + log(weights[k1] * weights[k2] / pi)
			indll[k2,k1] = indllnode
		end
	end
	return logsumexp(vec(indll)) # vec reshapes any array to a vector (it's free!)
end

I took the liberty to also preallocate some workspace for the innerfunction, which I changed to a 2D-Array for easier indexing. It is safer than your original versionsince it does not rely on Threads.threadid() and should be faster as well.

EDIT: Fixed usage of ChunkSplitters

3 Likes

There is another question. Method 2 is faster as each thread handles larger chunks, and therefore there are less overhead caused by creating and scheduling tasks. However, I find method 2, which is faster, has a relatively lower CPU usage (approximately 40%), it didn’t utilize all CPU cores I set. Why would this happen?

Besides, if the workload of each task is different, Is there any solution to schedule the uneven tasks?

Didn’t look at this in-depth, however I can see that method 1 runs multiple @syncs in a loop, while method 2 has a single top-level Threads.@threads invocation, so it doesn’t seem unexpected that method 2 would be faster. Can you rewrite method 1 to have just one @sync?

The “lower” cpu usage might just be a measurement artifact. How did you come up with that number? Function invocation is still very short (~40ms). If you just looked at a Task Manager/htop then the update intervall will be too large to actually tell you the peak cpu load.

Thanks, @abraemer has given the right answer, the slowness is caused by overhead of creating and scheduling tasks.

I got that number from windows task manager.

I have tried many settings. I found CPU usage would be high if Inds is larger enough. I also found a phenomenon, if the work load of each task is over complicated, for example, including many for-loops, then it seems the multithreading is invalid – only one thread is working.