Does @simd hang Base.Threads.@spawn?

Hi all,
I have an expensive function, f(x), that is embarassingly parallel. E.g. I have to run f(x) 300 times on different x, and each f(x) can take 20 seconds.

I wrote a function to parallelize the above, and it worked great.

Then, I discovered @simd, and this changed the evaluation time of f(x) from 20 seconds to 2 seconds for f_simd(x). Great!

I excitedly swapped in the f_simd(x), replacing f(x) in my parallelized function. That’s all I did, I swear! But now, all I see is the cores starting up, but nothing every finishes and the parallelized function just appears to hang indefinitely.

Is there some fundamental incompatibility between @simd and Base.Threads.@spawn, or have I missed something obvious?

Thanks so much for your thoughts!

These two macros have nothing to do with each other. It’s very hard to impossible to help without a concrete example to inspect.

Maybe `@threads` with `@simd` performance regression on `0.7.0-alpha` - #2 by raminammour can give some clues in which direction to look?

Hi! Thanks so much for the replies! I created a reproducible example (although it requires loading some code from Gist).

The problem occurs at the end. And the difficulty running parallel_with_plain_v5 the second time seems similar to the problem with getting parallel_with_simd_v7 to run at all (?).


"""
# start Julia with multiple cores using:
JULIA_NUM_THREADS=10 julia --startup-file=yes
"""


versioninfo()
"""
My setup is:
Julia Version 1.7.3
Commit 742b9abb4d (2022-05-06 12:58 UTC)
Platform Info:
  OS: macOS (x86_64-apple-darwin21.4.0)
  CPU: Intel(R) Xeon(R) CPU E5-2697 v2 @ 2.70GHz
  WORD_SIZE: 64
  LIBM: libopenlibm
  LLVM: libLLVM-12.0.1 (ORCJIT, ivybridge)
"""

using LinearAlgebra  	# for "I" in: Matrix{Float64}(I, 2, 2)
										 	# https://www.reddit.com/r/Julia/comments/9cfosj/identity_matrix_in_julia_v10/
using Sundials				# for CVODE_BDF
using Statistics 			# for mean(), max()
using DataFrames  # for e.g. DataFrame()
using Dates						# for e.g. DateTime, Dates.now()
using DifferentialEquations # for ODEProblem
using BenchmarkTools	# for @benchmark



# Check that you have multiple threads
numthreads = Base.Threads.nthreads()

# Load the model structure/rates (all precalculated for speed; 1.8 MB)
#include("/GitHub/BioGeoJulia.jl/test/model_p_object.jl")
url = "https://gist.githubusercontent.com/nmatzke/ed99ab8f5047794eb25e1fdbd5c43b37/raw/b3e6ddff784bd3521d089642092ba1e3830699c0/model_p_object.jl"
download(url,  "model_p_object.jl")
include("model_p_object.jl")

# Load the ODE functions
url = "https://gist.githubusercontent.com/nmatzke/f116258c78bd43ab7a448f07c4290516/raw/24a210261fd2e090b8ed27bc64a59a1ff9ec62cd/simd_vs_spawn_setup_v2.jl"
download(url,  "simd_vs_spawn_setup_v2.jl")
include("simd_vs_spawn_setup_v2.jl")

#include("/GitHub/BioGeoJulia.jl/test/simd_vs_spawn_setup_v2.jl")


p_Es_v5 = load_ps_127();



# Set up output object
numstates = 127
number_of_solves = 10
solve_results1 = collect(repeat([collect(repeat([0.0], numstates))], number_of_solves));
solve_results2 = collect(repeat([collect(repeat([0.0], numstates))], number_of_solves));
length(solve_results1)
length(solve_results1[1])
sum(sum.(solve_results1))


# Precalculate the Es for use in the Ds
Es_tspan = (0.0, 60.0)
prob_Es_v7 = DifferentialEquations.ODEProblem(Es_v7_simd_sums, p_Es_v5.uE, Es_tspan, p_Es_v5);
sol_Es_v7 = solve(prob_Es_v7, CVODE_BDF(linear_solver=:GMRES), save_everystep=true, 
abstol=1e-12, reltol=1e-9);

p_Ds_v7 = (n=p_Es_v5.n, params=p_Es_v5.params, p_indices=p_Es_v5.p_indices, p_TFs=p_Es_v5.p_TFs, uE=p_Es_v5.uE, terms=p_Es_v5.terms, sol_Es_v5=sol_Es_v7);


# Set up ODE inputs
u = collect(repeat([0.0], numstates));
u[2] = 1.0
du = similar(u)
du .= 0.0
p = p_Ds_v7;
t = 1.0

# ODE functions to integrate (single-step; ODE solvers will run this many many times)
@time Ds_v5_tmp(du,u,p,t)
@time Ds_v5_tmp(du,u,p,t)
@time Ds_v7_simd_sums(du,u,p,t)
@time Ds_v7_simd_sums(du,u,p,t)

#@btime Ds_v5_tmp(du,u,p,t)
# 7.819 ms (15847 allocations: 1.09 MiB)

#@btime Ds_v7_simd_sums(du,u,p,t)
# 155.858 μs (3075 allocations: 68.66 KiB)



include("/GitHub/BioGeoJulia.jl/test/simd_vs_spawn_setup_v2.jl")

tspan = (0.0, 1.0)
prob_Ds_v7 = DifferentialEquations.ODEProblem(Ds_v7_simd_sums, p_Ds_v7.uE, tspan, p_Ds_v7);

sol_Ds_v7 = solve(prob_Ds_v7, CVODE_BDF(linear_solver=:GMRES), save_everystep=false, abstol=1e-12, reltol=1e-9);

# This is the core operation; plain version (no @simd)
function core_op_plain(u, tspan, p_Ds_v7)
	prob_Ds_v5 = DifferentialEquations.ODEProblem(Ds_v5_tmp, u, tspan, p_Ds_v7);

	sol_Ds_v5 = solve(prob_Ds_v5, CVODE_BDF(linear_solver=:GMRES), save_everystep=false, abstol=1e-12, reltol=1e-9);
	return sol_Ds_v5
end


# This is the core operation; @simd version
function core_op_simd(u, tspan, p_Ds_v7)
	prob_Ds_v7 = DifferentialEquations.ODEProblem(Ds_v7_simd_sums, u, tspan, p_Ds_v7);

	sol_Ds_v7 = solve(prob_Ds_v7, CVODE_BDF(linear_solver=:GMRES), save_everystep=false, abstol=1e-12, reltol=1e-9);
	return sol_Ds_v7
end

@time core_op_plain(u, tspan, p_Ds_v7);
@time core_op_plain(u, tspan, p_Ds_v7);
@time core_op_simd(u, tspan, p_Ds_v7);
@time core_op_simd(u, tspan, p_Ds_v7);


function serial_with_plain_v7(tspan, p_Ds_v7, solve_results1; number_of_solves=10)
	start_time = Dates.now()
	tspan = (0.0, 1.0)
	for i in 1:number_of_solves
		u .= 0.0
		u[i] = 1.0

		sol_Ds_v7 = core_op_plain(u, tspan, p_Ds_v7)
		solve_results1[i] .= 	sol_Ds_v7.u[length(sol_Ds_v7.u)]
	#	print("\n")
	#	print(round.(sol_Ds_v7[length(sol_Ds_v7)], digits=3))
	end
	
	end_time = Dates.now()
	duration = (end_time - start_time).value / 1000.0
	sum_of_solutions = sum(sum.(solve_results1))
	return (duration, sum_of_solutions)
end


function serial_with_simd_v7(tspan, p_Ds_v7, solve_results1; number_of_solves=10)
	start_time = Dates.now()
	tspan = (0.0, 1.0)
	for i in 1:number_of_solves
		u .= 0.0
		u[i] = 1.0

		sol_Ds_v7 = core_op_simd(u, tspan, p_Ds_v7)
		solve_results1[i] .= 	sol_Ds_v7.u[length(sol_Ds_v7.u)]
	#	print("\n")
	#	print(round.(sol_Ds_v7[length(sol_Ds_v7)], digits=3))
	end
	
	end_time = Dates.now()
	duration = (end_time - start_time).value / 1000.0
	sum_of_solutions = sum(sum.(solve_results1))
	return (duration, sum_of_solutions)
end

# Output is (runtime, sum_of_solutions)
serial_with_plain_v7(tspan, p_Ds_v7, solve_results1; number_of_solves=number_of_solves)
serial_with_plain_v7(tspan, p_Ds_v7, solve_results1; number_of_solves=number_of_solves)
# (1.136, 8.129628626179963)

serial_with_simd_v7(tspan, p_Ds_v7, solve_results1; number_of_solves=number_of_solves)
serial_with_simd_v7(tspan, p_Ds_v7, solve_results1; number_of_solves=number_of_solves)
# (0.065, 8.129628626179963)





function parallel_with_plain_v5(tspan, p_Ds_v7, solve_results2; number_of_solves=10)
	start_time = Dates.now()
	tspan = (0.0, 1.0)
	
	# Individual ODE solutions will occur over different timeperiods,
	# initial values, and parameters.  We'd just like to load up the 
	# cores for the first jobs in the list, then add jobs as earlier
	# jobs finish.
	tasks = Any[]
	tasks_fetched_TF = Bool[]
	task_numbers = Any[]
	task_inc = 0
	are_we_done = false
	
	# List the tasks
	for i in 1:number_of_solves
		u .= 0.0
		u[i] = 1.0

		task_inc = task_inc + 1
		# THE ONLY DIFFERENCE IS HERE
		push!(tasks, Base.Threads.@spawn core_op_plain(u, tspan, p_Ds_v7))
		push!(tasks_fetched_TF, false) # Add a "false" to tasks_fetched_TF
		push!(task_numbers, task_inc)
	end

	iteration_number = 0
	while(are_we_done == false)
		iteration_number = iteration_number+1

		num_tasks = length(tasks)
		for j in 1:num_tasks
			if (tasks_fetched_TF[j] == false)
				if (istaskstarted(tasks[j]) == true) && (istaskdone(tasks[j]) == true)
					sol_Ds_v7 = fetch(tasks[j])
					solve_results2[task_numbers[j]] .= 	sol_Ds_v7.u[length(sol_Ds_v7.u)]
					print("\nFinished task j=")
					print(j)
					tasks_fetched_TF[j] = true
				end
			end
		end

		are_we_done = sum(tasks_fetched_TF) == length(tasks_fetched_TF)
		# Test for concluding the while loop
		are_we_done && break
	end # END while(are_we_done == false)

	end_time = Dates.now()
	duration = (end_time - start_time).value / 1000.0
	sum_of_solutions = sum(sum.(solve_results2))
	return (duration, sum_of_solutions)
end


function parallel_with_simd_v7(tspan, p_Ds_v7, solve_results2; number_of_solves=10)
	start_time = Dates.now()
	tspan = (0.0, 1.0)
	
	# Individual ODE solutions will occur over different timeperiods,
	# initial values, and parameters.  We'd just like to load up the 
	# cores for the first jobs in the list, then add jobs as earlier
	# jobs finish.
	tasks = Any[]
	tasks_fetched_TF = Bool[]
	task_numbers = Any[]
	task_inc = 0
	are_we_done = false
	
	# List the tasks
	for i in 1:number_of_solves
		u .= 0.0
		u[i] = 1.0

		task_inc = task_inc + 1
		# THE ONLY DIFFERENCE IS HERE
		push!(tasks, Base.Threads.@spawn core_op_simd(u, tspan, p_Ds_v7))
		push!(tasks_fetched_TF, false) # Add a "false" to tasks_fetched_TF
		push!(task_numbers, task_inc)
	end

	iteration_number = 0
	while(are_we_done == false)
		iteration_number = iteration_number+1

		num_tasks = length(tasks)
		for j in 1:num_tasks
			if (tasks_fetched_TF[j] == false)
				if (istaskstarted(tasks[j]) == true) && (istaskdone(tasks[j]) == true)
					sol_Ds_v7 = fetch(tasks[j])
					solve_results2[task_numbers[j]] .= 	sol_Ds_v7.u[length(sol_Ds_v7.u)]
					print("\nFinished task j=")
					print(j)
					tasks_fetched_TF[j] = true
				end
			end
		end

		are_we_done = sum(tasks_fetched_TF) == length(tasks_fetched_TF)
		# Test for concluding the while loop
		are_we_done && break
	end # END while(are_we_done == false)

	end_time = Dates.now()
	duration = (end_time - start_time).value / 1000.0
	sum_of_solutions = sum(sum.(solve_results2))
	return (duration, sum_of_solutions)
end


parallel_with_plain_v5(tspan, p_Ds_v7, solve_results2; number_of_solves=number_of_solves)
# Faster!
# (runtime, sum_of_solutions)
# (0.193, 8.129628626179963)


parallel_with_plain_v5(tspan, p_Ds_v7, solve_results2; number_of_solves=number_of_solves)
# Hangs sometimes, on the 2nd go!

parallel_with_simd_v7(tspan, p_Ds_v7, solve_results2; number_of_solves=number_of_solves)
# This just HANGS





You’re using the same exact u object for multiple different calls & tasks, which on top of this is a global variable. This is neither threadsafe nor good for performance and will mess with the solvers when they want to write results there.

Ack - OK thanks. This is a product of simplifying the code base to produce a workable example, I will change that and see if it persists.

OK, thanks very very much for your help! Here is another try at a reproducible example. The parallelised functions are updated so that each initial condition vector u is instead a row of solve_results1 or solve_results2.

The short version is this puzzle: for the serial functions, the @simd version is much faster than the “plain” version. But for the parallel versions, the @simd version is much slower – plus, in this case, the answer, sum_of_solutions, is variable and wrong.

I have this set up so that Julia is started with JULIA_NUM_THREADS=auto julia, in my case this creates 8 cores for 8 threads. Then, I make sure I never have more than 8 jobs spawned at once.

For just this example code, I could just use the serial @simd version, as it’s the fastest, but I have to scale this from 127 states/ODEs to more like 1000-2000, and then have to run it many times for different initial conditions etc.

The puzzle:

# Output is (runtime, sum_of_solutions)
serial_with_plain_v7(tspan, p_Ds_v7, solve_results1; number_of_solves=number_of_solves)
serial_with_plain_v7(tspan, p_Ds_v7, solve_results1; number_of_solves=number_of_solves)
serial_with_plain_v7(tspan, p_Ds_v7, solve_results1; number_of_solves=number_of_solves)
# (duration, sum_of_solutions)
# (1.1, 8.731365050398926)
# (0.878, 8.731365050398926)
# (0.898, 8.731365050398926)

serial_with_simd_v7(tspan, p_Ds_v7, solve_results1; number_of_solves=number_of_solves)
serial_with_simd_v7(tspan, p_Ds_v7, solve_results1; number_of_solves=number_of_solves)
serial_with_simd_v7(tspan, p_Ds_v7, solve_results1; number_of_solves=number_of_solves)
# (duration, sum_of_solutions)
# (0.046, 8.731365050398928)
# (0.042, 8.731365050398928)
# (0.046, 8.731365050398928)

parallel_with_plain_v5(tspan, p_Ds_v7, solve_results2; number_of_solves=number_of_solves)
# Faster than serial plain version
# (duration, sum_of_solutions)
# (0.351, 8.731365050398926)
# (0.343, 8.731365050398926)
# (0.366, 8.731365050398926)

parallel_with_simd_v7(tspan, p_Ds_v7, solve_results2; number_of_solves=number_of_solves)
# Dramatically slower than serial simd version, plus wrong sum_of_solutions
# (duration, sum_of_solutions)
# (136.966, 9.61313614002137)
# (141.843, 9.616688089683372)

This code should reproduce the problem on any Julia running multiple cores:


versioninfo()
notes="""
My setup is:
Julia Version 1.7.3
Commit 742b9abb4d (2022-05-06 12:58 UTC)
Platform Info:
  OS: macOS (x86_64-apple-darwin21.4.0)
  CPU: Intel(R) Xeon(R) CPU E5-2697 v2 @ 2.70GHz
  WORD_SIZE: 64
  LIBM: libopenlibm
  LLVM: libLLVM-12.0.1 (ORCJIT, ivybridge)
"""


# Startup notes
notes="""
# "If $JULIA_NUM_THREADS is set to auto, then the number of threads will be set to the number of CPU threads."
JULIA_NUM_THREADS=auto julia --startup-file=no
Threads.nthreads(): 8 # Number of CPU threads
"""


using LinearAlgebra  	# for "I" in: Matrix{Float64}(I, 2, 2)
										 	# https://www.reddit.com/r/Julia/comments/9cfosj/identity_matrix_in_julia_v10/
using Sundials				# for CVODE_BDF
using Statistics 			# for mean(), max()
using DataFrames  # for e.g. DataFrame()
using Dates						# for e.g. DateTime, Dates.now()
using DifferentialEquations # for ODEProblem
using BenchmarkTools	# for @benchmark
using Distributed			# for workers


# Check that you have multiple threads
numthreads = Base.Threads.nthreads()

# Download & include the pre-saved model structure/rates (all precalculated for speed; 1.8 MB)
#include("/GitHub/BioGeoJulia.jl/test/model_p_object.jl")
url = "https://gist.githubusercontent.com/nmatzke/ed99ab8f5047794eb25e1fdbd5c43b37/raw/b3e6ddff784bd3521d089642092ba1e3830699c0/model_p_object.jl"
download(url,  "model_p_object.jl")
include("model_p_object.jl")

# Load the ODE functions
url = "https://gist.githubusercontent.com/nmatzke/f116258c78bd43ab7a448f07c4290516/raw/24a210261fd2e090b8ed27bc64a59a1ff9ec62cd/simd_vs_spawn_setup_v2.jl"
download(url,  "simd_vs_spawn_setup_v2.jl")
include("simd_vs_spawn_setup_v2.jl")

#include("/GitHub/BioGeoJulia.jl/test/simd_vs_spawn_setup_v2.jl")
#include("/GitHub/BioGeoJulia.jl/test/simd_vs_spawn_setup_v3.jl")

# Load the pre-saved model structure/rates (all precalculated for speed; 1.8 MB)
p_Es_v5 = load_ps_127();



# Set up output object
numstates = 127
number_of_solves = 10

solve_results1 = Array{Float64, 2}(undef, number_of_solves, numstates)
solve_results1 .= 0.0
solve_results2 = Array{Float64, 2}(undef, number_of_solves, numstates)
solve_results2 .= 0.0
length(solve_results1)
length(solve_results1[1])
sum(sum.(solve_results1))


# Precalculate the Es for use in the Ds
Es_tspan = (0.0, 60.0)
prob_Es_v7 = DifferentialEquations.ODEProblem(Es_v7_simd_sums, p_Es_v5.uE, Es_tspan, p_Es_v5);
sol_Es_v7 = solve(prob_Es_v7, CVODE_BDF(linear_solver=:GMRES), save_everystep=true, 
abstol=1e-12, reltol=1e-9);

p_Ds_v7 = (n=p_Es_v5.n, params=p_Es_v5.params, p_indices=p_Es_v5.p_indices, p_TFs=p_Es_v5.p_TFs, uE=p_Es_v5.uE, terms=p_Es_v5.terms, sol_Es_v5=sol_Es_v7);


# Set up ODE inputs
u = collect(repeat([0.0], numstates));
u[2] = 1.0
du = similar(u)
du .= 0.0
p = p_Ds_v7;
t = 1.0

# ODE functions to integrate (single-step; ODE solvers will run this many many times)
@time Ds_v5_tmp(du,u,p,t)
@time Ds_v5_tmp(du,u,p,t)
@time Ds_v7_simd_sums(du,u,p,t)
@time Ds_v7_simd_sums(du,u,p,t)

#@btime Ds_v5_tmp(du,u,p,t)
# 7.819 ms (15847 allocations: 1.09 MiB)

#@btime Ds_v7_simd_sums(du,u,p,t)
# 155.858 μs (3075 allocations: 68.66 KiB)



tspan = (0.0, 1.0)
prob_Ds_v7 = DifferentialEquations.ODEProblem(Ds_v7_simd_sums, p_Ds_v7.uE, tspan, p_Ds_v7);

sol_Ds_v7 = solve(prob_Ds_v7, CVODE_BDF(linear_solver=:GMRES), save_everystep=false, abstol=1e-12, reltol=1e-9);

# This is the core operation; plain version (no @simd)
function core_op_plain(u, tspan, p_Ds_v7)
	prob_Ds_v5 = DifferentialEquations.ODEProblem(Ds_v5_tmp, u.+0.0, tspan, p_Ds_v7);

	sol_Ds_v5 = solve(prob_Ds_v5, CVODE_BDF(linear_solver=:GMRES), save_everystep=false, abstol=1e-12, reltol=1e-9);
	return sol_Ds_v5
end


# This is the core operation; @simd version
function core_op_simd(u, tspan, p_Ds_v7)
	prob_Ds_v7 = DifferentialEquations.ODEProblem(Ds_v7_simd_sums, u.+0.0, tspan, p_Ds_v7);

	sol_Ds_v7 = solve(prob_Ds_v7, CVODE_BDF(linear_solver=:GMRES), save_everystep=false, abstol=1e-12, reltol=1e-9);
	return sol_Ds_v7
end

@time core_op_plain(u, tspan, p_Ds_v7);
@time core_op_plain(u, tspan, p_Ds_v7);
@time core_op_simd(u, tspan, p_Ds_v7);
@time core_op_simd(u, tspan, p_Ds_v7);


function serial_with_plain_v7(tspan, p_Ds_v7, solve_results1; number_of_solves=10)
	start_time = Dates.now()
	for i in 1:number_of_solves
		# Temporary u
		solve_results1[i,:] .= 0.0
		
		# Change the ith state from 0.0 to 1.0
		solve_results1[i,i] = 1.0
		solve_results1

		sol_Ds_v7 = core_op_plain(solve_results1[i,:], tspan, p_Ds_v7)
		solve_results1[i,:] .= 	sol_Ds_v7.u[length(sol_Ds_v7.u)]
	#	print("\n")
	#	print(round.(sol_Ds_v7[length(sol_Ds_v7)], digits=3))
	end
	
	end_time = Dates.now()
	duration = (end_time - start_time).value / 1000.0
	sum_of_solutions = sum(sum.(solve_results1))
	return (duration, sum_of_solutions)
end


function serial_with_simd_v7(tspan, p_Ds_v7, solve_results1; number_of_solves=10)
	start_time = Dates.now()
	for i in 1:number_of_solves
		# Temporary u
		solve_results1[i,:] .= 0.0
		
		# Change the ith state from 0.0 to 1.0
		solve_results1[i,i] = 1.0
		solve_results1

		sol_Ds_v7 = core_op_simd(solve_results1[i,:], tspan, p_Ds_v7)
		solve_results1[i,:] .= 	sol_Ds_v7.u[length(sol_Ds_v7.u)]
	#	print("\n")
	#	print(round.(sol_Ds_v7[length(sol_Ds_v7)], digits=3))
	end
	
	end_time = Dates.now()
	duration = (end_time - start_time).value / 1000.0
	sum_of_solutions = sum(sum.(solve_results1))
	return (duration, sum_of_solutions)
end

# Output is (runtime, sum_of_solutions)
serial_with_plain_v7(tspan, p_Ds_v7, solve_results1; number_of_solves=number_of_solves)
serial_with_plain_v7(tspan, p_Ds_v7, solve_results1; number_of_solves=number_of_solves)
serial_with_plain_v7(tspan, p_Ds_v7, solve_results1; number_of_solves=number_of_solves)
# (duration, sum_of_solutions)
# (1.1, 8.731365050398926)
# (0.878, 8.731365050398926)
# (0.898, 8.731365050398926)

serial_with_simd_v7(tspan, p_Ds_v7, solve_results1; number_of_solves=number_of_solves)
serial_with_simd_v7(tspan, p_Ds_v7, solve_results1; number_of_solves=number_of_solves)
serial_with_simd_v7(tspan, p_Ds_v7, solve_results1; number_of_solves=number_of_solves)
# (duration, sum_of_solutions)
# (0.046, 8.731365050398928)
# (0.042, 8.731365050398928)
# (0.046, 8.731365050398928)

using Distributed

function parallel_with_plain_v5(tspan, p_Ds_v7, solve_results2; number_of_solves=10)
	start_time = Dates.now()
	number_of_threads = Base.Threads.nthreads()
	curr_numthreads = Base.Threads.nthreads()
		
	# Individual ODE solutions will occur over different timeperiods,
	# initial values, and parameters.  We'd just like to load up the 
	# cores for the first jobs in the list, then add jobs as earlier
	# jobs finish.
	tasks = Any[]
	tasks_started_TF = Bool[]
	tasks_fetched_TF = Bool[]
	task_numbers = Any[]
	task_inc = 0
	are_we_done = false
	current_running_tasks = Any[]
	
	# List the tasks
	for i in 1:number_of_solves
		# Temporary u
		solve_results2[i,:] .= 0.0
		
		# Change the ith state from 0.0 to 1.0
		solve_results2[i,i] = 1.0

		task_inc = task_inc + 1
		push!(tasks_started_TF, false) # Add a "false" to tasks_started_TF
		push!(tasks_fetched_TF, false) # Add a "false" to tasks_fetched_TF
		push!(task_numbers, task_inc)
	end
	
	# Total number of tasks
	num_tasks = length(tasks_fetched_TF)

	iteration_number = 0
	while(are_we_done == false)
		iteration_number = iteration_number+1
		
		# Launch tasks when thread (core) is available
		for j in 1:num_tasks
			if (tasks_fetched_TF[j] == false)
				if (tasks_started_TF[j] == false) && (curr_numthreads > 0)
					# Start a task
					push!(tasks, Base.Threads.@spawn core_op_plain(solve_results2[j,:], tspan, p_Ds_v7));
					curr_numthreads = curr_numthreads-1;
					tasks_started_TF[j] = true;
					push!(current_running_tasks, task_numbers[j])
				end
			end
		end
		
		# Check for finished tasks
		tasks_to_check_TF = ((tasks_started_TF.==true) .+ (tasks_fetched_TF.==false)).==2
		if sum(tasks_to_check_TF .== true) > 0
			for k in 1:sum(tasks_to_check_TF)
				if (tasks_fetched_TF[current_running_tasks[k]] == false)
					if (istaskstarted(tasks[k]) == true) && (istaskdone(tasks[k]) == true)
						sol_Ds_v7 = fetch(tasks[k]);
						solve_results2[current_running_tasks[k],:] .= sol_Ds_v7.u[length(sol_Ds_v7.u)].+0.0
						tasks_fetched_TF[current_running_tasks[k]] = true
						current_tasknum = current_running_tasks[k]
						deleteat!(tasks, k)
						deleteat!(current_running_tasks, k)
						curr_numthreads = curr_numthreads+1;
						print("\nFinished task #")
						print(current_tasknum)
						print(", current task k=")
						print(k)
						break # break out of this loop, since you have modified current_running_tasks
					end
				end
			end
		end

		are_we_done = sum(tasks_fetched_TF) == length(tasks_fetched_TF)
		# Test for concluding the while loop
		are_we_done && break
	end # END while(are_we_done == false)

	end_time = Dates.now()
	duration = (end_time - start_time).value / 1000.0
	sum_of_solutions = sum(sum.(solve_results2))
	print("\n")
	return (duration, sum_of_solutions)
end


function parallel_with_simd_v7(tspan, p_Ds_v7, solve_results2; number_of_solves=10)
	start_time = Dates.now()
	number_of_threads = Base.Threads.nthreads()
	curr_numthreads = Base.Threads.nthreads()
		
	# Individual ODE solutions will occur over different timeperiods,
	# initial values, and parameters.  We'd just like to load up the 
	# cores for the first jobs in the list, then add jobs as earlier
	# jobs finish.
	tasks = Any[]
	tasks_started_TF = Bool[]
	tasks_fetched_TF = Bool[]
	task_numbers = Any[]
	task_inc = 0
	are_we_done = false
	current_running_tasks = Any[]
	
	# List the tasks
	for i in 1:number_of_solves
		# Temporary u
		solve_results2[i,:] .= 0.0
		
		# Change the ith state from 0.0 to 1.0
		solve_results2[i,i] = 1.0

		task_inc = task_inc + 1
		push!(tasks_started_TF, false) # Add a "false" to tasks_started_TF
		push!(tasks_fetched_TF, false) # Add a "false" to tasks_fetched_TF
		push!(task_numbers, task_inc)
	end
	
	# Total number of tasks
	num_tasks = length(tasks_fetched_TF)

	iteration_number = 0
	while(are_we_done == false)
		iteration_number = iteration_number+1
		
		# Launch tasks when thread (core) is available
		for j in 1:num_tasks
			if (tasks_fetched_TF[j] == false)
				if (tasks_started_TF[j] == false) && (curr_numthreads > 0)
					# Start a task
					push!(tasks, Base.Threads.@spawn core_op_simd(solve_results2[j,:], tspan, p_Ds_v7))
					curr_numthreads = curr_numthreads-1;
					tasks_started_TF[j] = true;
					push!(current_running_tasks, task_numbers[j])
				end
			end
		end
		
		# Check for finished tasks
		tasks_to_check_TF = ((tasks_started_TF.==true) .+ (tasks_fetched_TF.==false)).==2
		if sum(tasks_to_check_TF .== true) > 0
			for k in 1:sum(tasks_to_check_TF)
				if (tasks_fetched_TF[current_running_tasks[k]] == false)
					if (istaskstarted(tasks[k]) == true) && (istaskdone(tasks[k]) == true)
						sol_Ds_v7 = fetch(tasks[k]);
						solve_results2[current_running_tasks[k],:] .= sol_Ds_v7.u[length(sol_Ds_v7.u)].+0.0
						tasks_fetched_TF[current_running_tasks[k]] = true
						current_tasknum = current_running_tasks[k]
						deleteat!(tasks, k)
						deleteat!(current_running_tasks, k)
						curr_numthreads = curr_numthreads+1;
						print("\nFinished task #")
						print(current_tasknum)
						print(", current task k=")
						print(k)
						break # break out of this loop, since you have modified current_running_tasks
					end
				end
			end
		end

		are_we_done = sum(tasks_fetched_TF) == length(tasks_fetched_TF)
		# Test for concluding the while loop
		are_we_done && break
	end # END while(are_we_done == false)

	end_time = Dates.now()
	duration = (end_time - start_time).value / 1000.0
	sum_of_solutions = sum(sum.(solve_results2))
	print("\n")
	return (duration, sum_of_solutions)
end

tspan = (0.0, 1.0)
parallel_with_plain_v5(tspan, p_Ds_v7, solve_results2; number_of_solves=number_of_solves)
# Faster than serial plain version
# (duration, sum_of_solutions)
# (0.351, 8.731365050398926)
# (0.343, 8.731365050398926)
# (0.366, 8.731365050398926)

parallel_with_simd_v7(tspan, p_Ds_v7, solve_results2; number_of_solves=number_of_solves)
# Dramatically slower than serial simd version
# (duration, sum_of_solutions)
# (136.966, 9.61313614002137)
# (141.843, 9.616688089683372)



Thanks again, Nick

Is it only with CVODE?

Hi Chris!

I just re-ran the example, where I just replaced CVODE_BDF(linear_solver=:GMRES) with Tsit5(). I get basically similar results (slightly slower).

Results pasted below, then a copy-pastable example that should replicate the problem.

# Output is (runtime, sum_of_solutions)
serial_with_plain_v7(tspan, p_Ds_v7, solve_results1; number_of_solves=number_of_solves)
serial_with_plain_v7(tspan, p_Ds_v7, solve_results1; number_of_solves=number_of_solves)
serial_with_plain_v7(tspan, p_Ds_v7, solve_results1; number_of_solves=number_of_solves)
# (duration, sum_of_solutions)
# (1.264, 8.731365050656382)
# (1.135, 8.731365050656382)
# (1.129, 8.731365050656382)

serial_with_simd_v7(tspan, p_Ds_v7, solve_results1; number_of_solves=number_of_solves)
serial_with_simd_v7(tspan, p_Ds_v7, solve_results1; number_of_solves=number_of_solves)
serial_with_simd_v7(tspan, p_Ds_v7, solve_results1; number_of_solves=number_of_solves)
# (duration, sum_of_solutions)
# (0.078, 8.731365050656382)
# (0.05, 8.731365050656382)
# (0.05, 8.731365050656382)


parallel_with_plain_v5(tspan, p_Ds_v7, solve_results2; number_of_solves=number_of_solves)
# Faster than serial plain version
# (duration, sum_of_solutions)
# (2.214, 8.731365050211458)
# (2.194, 8.731365050211458)
# 

parallel_with_simd_v7(tspan, p_Ds_v7, solve_results2; number_of_solves=number_of_solves)
# Dramatically slower than serial simd version
# ...and with the wrong answer!
# (duration, sum_of_solutions)
# (227.698, 9.81070102530404)

Replicable example (should be copy-pasteable) (it includes() an updated gist that uses Tsit5() instead of CVODE_BDF.

Cheers & thanks!!
Nick


versioninfo()
notes="""
My setup is:
Julia Version 1.7.3
Commit 742b9abb4d (2022-05-06 12:58 UTC)
Platform Info:
  OS: macOS (x86_64-apple-darwin21.4.0)
  CPU: Intel(R) Xeon(R) CPU E5-2697 v2 @ 2.70GHz
  WORD_SIZE: 64
  LIBM: libopenlibm
  LLVM: libLLVM-12.0.1 (ORCJIT, ivybridge)
"""


# Startup notes
notes="""
# "If $JULIA_NUM_THREADS is set to auto, then the number of threads will be set to the number of CPU threads."
JULIA_NUM_THREADS=8 julia --startup-file=no
Threads.nthreads(): 8 # Number of CPU threads
"""
using Distributed

function Rnames(obj)
	flat2(fieldnames(typeof(obj)))
end


# And multiple cores/workers to match
t = @async Distributed.addprocs(7)
Distributed.nprocs()
# Check that you have same number of processors and threads
Distributed.nprocs()
numthreads = Base.Threads.nthreads()

using Dates						# for e.g. DateTime, Dates.now()
using DifferentialEquations # for ODEProblem
using Sundials				# for CVODE_BDF
using SciMLBase

Distributed.workers()
# Check that you have same number of processors and threads
Distributed.nprocs()
numthreads = Base.Threads.nthreads()

@everywhere using Distributed
@everywhere using Dates						# for e.g. DateTime, Dates.now()
@everywhere using DifferentialEquations # for ODEProblem
@everywhere using Sundials				# for CVODE_BDF
@everywhere using SciMLBase


# Download & include the pre-saved model structure/rates (all precalculated for speed; 1.8 MB)
# @everywhere include("/GitHub/BioGeoJulia.jl/test/model_p_object.jl")
url = "https://gist.githubusercontent.com/nmatzke/ed99ab8f5047794eb25e1fdbd5c43b37/raw/b3e6ddff784bd3521d089642092ba1e3830699c0/model_p_object.jl"
download(url,  "model_p_object.jl")
@everywhere include("model_p_object.jl")

# Load the ODE functions
url = https://gist.githubusercontent.com/nmatzke/e63296ce22659e1ed222c7cd21d029bf/raw/a8b0e1b22d105f8ea4c632749f3fd89d5bcb7fc6/simd_vs_spawn_setup_v4_Tsit5.jl"
download(url,  "simd_vs_spawn_setup_v4_Tsit5.jl")
@everywhere include("simd_vs_spawn_setup_v4_Tsit5.jl")

#include("/GitHub/BioGeoJulia.jl/test/simd_vs_spawn_setup_v2.jl")
#@everywhere include("/GitHub/BioGeoJulia.jl/test/simd_vs_spawn_setup_v4_Tsit5.jl")

# Load the pre-saved model structure/rates (all precalculated for speed; 1.8 MB)
@everywhere p_Es_v5 = load_ps_127();



# Set up output object
numstates = 127
number_of_solves = 10

solve_results1 = Array{Float64, 2}(undef, number_of_solves, numstates)
solve_results1 .= 0.0
solve_results2 = Array{Float64, 2}(undef, number_of_solves, numstates)
solve_results2 .= 0.0
length(solve_results1)
length(solve_results1[1])
sum(sum.(solve_results1))


# Precalculate the Es for use in the Ds
Es_tspan = (0.0, 60.0)
prob_Es_v7 = DifferentialEquations.ODEProblem(Es_v7_simd_sums, p_Es_v5.uE, Es_tspan, p_Es_v5);
sol_Es_v7 = solve(prob_Es_v7, Tsit5(), save_everystep=true, 
abstol=1e-12, reltol=1e-9);

p_Ds_v7 = (n=p_Es_v5.n, params=p_Es_v5.params, p_indices=p_Es_v5.p_indices, p_TFs=p_Es_v5.p_TFs, uE=p_Es_v5.uE, terms=p_Es_v5.terms, sol_Es_v5=sol_Es_v7);


# Set up ODE inputs
u = collect(repeat([0.0], numstates));
u[2] = 1.0
du = similar(u);
du .= 0.0;
p = p_Ds_v7;
t = 1.0;

# ODE functions to integrate (single-step; ODE solvers will run this many many times)
@time Ds_v5_tmp(du,u,p,t)
@time Ds_v5_tmp(du,u,p,t)
@time Ds_v7_simd_sums(du,u,p,t)
@time Ds_v7_simd_sums(du,u,p,t)

#@btime Ds_v5_tmp(du,u,p,t)
# 7.819 ms (15847 allocations: 1.09 MiB)

#@btime Ds_v7_simd_sums(du,u,p,t)
# 155.858 μs (3075 allocations: 68.66 KiB)



tspan = (0.0, 1.0)
prob_Ds_v7 = DifferentialEquations.ODEProblem(Ds_v7_simd_sums, p_Ds_v7.uE, tspan, p_Ds_v7);

sol_Ds_v7 = solve(prob_Ds_v7, Tsit5(), save_everystep=false, abstol=1e-12, reltol=1e-9);


@time core_op_plain(u, tspan, p_Ds_v7);
@time core_op_plain(u, tspan, p_Ds_v7);
@time core_op_simd(u, tspan, p_Ds_v7);
@time core_op_simd(u, tspan, p_Ds_v7);
# 6.541634 seconds (8.75 M allocations: 749.374 MiB, 3.13% gc time, 98.47% compilation time)
# 0.099980 seconds (451.86 k allocations: 27.272 MiB)
# 0.050578 seconds (199.93 k allocations: 8.041 MiB, 89.84% compilation time)
# 0.005377 seconds (85.34 k allocations: 1.949 MiB)

# Output is (runtime, sum_of_solutions)
serial_with_plain_v7(tspan, p_Ds_v7, solve_results1; number_of_solves=number_of_solves)
serial_with_plain_v7(tspan, p_Ds_v7, solve_results1; number_of_solves=number_of_solves)
serial_with_plain_v7(tspan, p_Ds_v7, solve_results1; number_of_solves=number_of_solves)
# (duration, sum_of_solutions)
# (1.264, 8.731365050656382)
# (1.135, 8.731365050656382)
# (1.129, 8.731365050656382)

serial_with_simd_v7(tspan, p_Ds_v7, solve_results1; number_of_solves=number_of_solves)
serial_with_simd_v7(tspan, p_Ds_v7, solve_results1; number_of_solves=number_of_solves)
serial_with_simd_v7(tspan, p_Ds_v7, solve_results1; number_of_solves=number_of_solves)
# (duration, sum_of_solutions)
# (0.078, 8.731365050656382)
# (0.05, 8.731365050656382)
# (0.05, 8.731365050656382)

@everywhere include("/GitHub/BioGeoJulia.jl/test/simd_vs_spawn_setup_v4.jl")

j=2
new_worker = j
solve_results2[j,j] = 1.0
x=Distributed.remotecall(core_op_plain, new_worker, solve_results2[j,:], tspan, p_Ds_v7)
@async fetch(x)

function istaskdone2(x)
	doneTF = x.v != nothing;
	return doneTF
end



parallel_with_plain_v5(tspan, p_Ds_v7, solve_results2; number_of_solves=10)
x = Dates.now()



function parallel_with_simd_v7(tspan, p_Ds_v7, solve_results2; number_of_solves=10)
	start_time = Dates.now()
	number_of_threads = Base.Threads.nthreads()
	curr_numthreads = Base.Threads.nthreads()
		
	# Individual ODE solutions will occur over different timeperiods,
	# initial values, and parameters.  We'd just like to load up the 
	# cores for the first jobs in the list, then add jobs as earlier
	# jobs finish.
	tasks = Any[]
	tasks_started_TF = Bool[]
	tasks_fetched_TF = Bool[]
	task_numbers = Any[]
	task_inc = 0
	are_we_done = false
	current_running_tasks = Any[]
	
	# List the tasks
	for i in 1:number_of_solves
		# Temporary u
		solve_results2[i,:] .= 0.0
		
		# Change the ith state from 0.0 to 1.0
		solve_results2[i,i] = 1.0

		task_inc = task_inc + 1
		push!(tasks_started_TF, false) # Add a "false" to tasks_started_TF
		push!(tasks_fetched_TF, false) # Add a "false" to tasks_fetched_TF
		push!(task_numbers, task_inc)
	end
	
	# Total number of tasks
	num_tasks = length(tasks_fetched_TF)

	iteration_number = 0
	while(are_we_done == false)
		iteration_number = iteration_number+1
		
		# Launch tasks when thread (core) is available
		for j in 1:num_tasks
			if (tasks_fetched_TF[j] == false)
				if (tasks_started_TF[j] == false) && (curr_numthreads > 0)
					# Start a task
					push!(tasks, Base.Threads.@spawn core_op_simd(solve_results2[j,:], tspan, p_Ds_v7))
					curr_numthreads = curr_numthreads-1;
					tasks_started_TF[j] = true;
					push!(current_running_tasks, task_numbers[j])
				end
			end
		end
		
		# Check for finished tasks
		tasks_to_check_TF = ((tasks_started_TF.==true) .+ (tasks_fetched_TF.==false)).==2
		if sum(tasks_to_check_TF .== true) > 0
			for k in 1:sum(tasks_to_check_TF)
				if (tasks_fetched_TF[current_running_tasks[k]] == false)
					if (istaskstarted(tasks[k]) == true) && (istaskdone(tasks[k]) == true)
						sol_Ds_v7 = fetch(tasks[k]);
						solve_results2[current_running_tasks[k],:] .= sol_Ds_v7.u[length(sol_Ds_v7.u)].+0.0
						tasks_fetched_TF[current_running_tasks[k]] = true
						current_tasknum = current_running_tasks[k]
						deleteat!(tasks, k)
						deleteat!(current_running_tasks, k)
						curr_numthreads = curr_numthreads+1;
						print("\nFinished task #")
						print(current_tasknum)
						print(", current task k=")
						print(k)
						break # break out of this loop, since you have modified current_running_tasks
					end
				end
			end
		end

		are_we_done = sum(tasks_fetched_TF) == length(tasks_fetched_TF)
		# Test for concluding the while loop
		are_we_done && break
	end # END while(are_we_done == false)

	end_time = Dates.now()
	duration = (end_time - start_time).value / 1000.0
	sum_of_solutions = sum(sum.(solve_results2))
	print("\n")
	return (duration, sum_of_solutions)
end

tspan = (0.0, 1.0)

# Output is (runtime, sum_of_solutions)
serial_with_plain_v7(tspan, p_Ds_v7, solve_results1; number_of_solves=number_of_solves)
serial_with_plain_v7(tspan, p_Ds_v7, solve_results1; number_of_solves=number_of_solves)
serial_with_plain_v7(tspan, p_Ds_v7, solve_results1; number_of_solves=number_of_solves)
# (duration, sum_of_solutions)
# (1.264, 8.731365050656382)
# (1.135, 8.731365050656382)
# (1.129, 8.731365050656382)

serial_with_simd_v7(tspan, p_Ds_v7, solve_results1; number_of_solves=number_of_solves)
serial_with_simd_v7(tspan, p_Ds_v7, solve_results1; number_of_solves=number_of_solves)
serial_with_simd_v7(tspan, p_Ds_v7, solve_results1; number_of_solves=number_of_solves)
# (duration, sum_of_solutions)
# (0.078, 8.731365050656382)
# (0.05, 8.731365050656382)
# (0.05, 8.731365050656382)


parallel_with_plain_v5(tspan, p_Ds_v7, solve_results2; number_of_solves=number_of_solves)
# Faster than serial plain version
# (duration, sum_of_solutions)
# (2.214, 8.731365050211458)
# (2.194, 8.731365050211458)
# 

parallel_with_simd_v7(tspan, p_Ds_v7, solve_results2; number_of_solves=number_of_solves)
# Dramatically slower than serial simd version
# ...and with the wrong answer!
# (duration, sum_of_solutions)
# (227.698, 9.81070102530404)
# 



I haven’t been able to look into it, but are you doing something that isn’t thread safe? For example, are you modifying the parameter vector but not making a per-thread copy of it?

Holy @#*@! All I had to do was deepcopy(p_Ds_v7), the parameters vector, in each Base.Threads.@spawn call.

I had thought I wasn’t writing anything to the parameters vector, but actually I now remember/see that I added 4 blank floats to the parameters vector to provide workspace for the simd calculations. These are not read back out, but apparently that was it.

Now it’s blazingly fast, and accurate:

tspan = (0.0, 1.0)
parallel_with_plain_v5(tspan, p_Ds_v7, solve_results2; number_of_solves=number_of_solves)
# Faster than serial plain version
# (duration, sum_of_solutions)
# (0.351, 8.731365050398926)
# (0.343, 8.731365050398926)
# (0.366, 8.731365050398926)


serial_with_simd_v7(tspan, p_Ds_v7, solve_results1; number_of_solves=number_of_solves)
# (5.819, 8.731365050398926)
# (0.069, 8.731365050398926)
# (0.071, 8.731365050398926)
# (0.07, 8.731365050398926)

parallel_with_simd_v7(tspan, p_Ds_v7, solve_results2; number_of_solves=number_of_solves)
# New version with deepcopy of parameters input to solve
# (0.023, 8.731365050398926)
# (0.016, 8.731365050398926)
# (0.017, 8.731365050398926)

# Old parallel_with_simd_v7 was dramatically slower than serial simd version
# (and inaccurate)
# (duration, sum_of_solutions)
# (136.966, 9.61313614002137)
# (141.843, 9.616688089683372)

Thankyou thankyou Chris! And thanks for your amazing Julia work!

1 Like

Duplicate on performance - Solving ODEs in parallel in Julia: variable answers, hangs after several executions - Stack Overflow

Yes, if you use a shared cache and you’re mutating it always be careful to have per-thread versions.