Appropiate use of Threads.@threads?

I’ve written the following:

using DataFrames
using Query

plots = readtable("plots.csv")
species = readtable("plot_species.csv")

# remove taxa without a species or those that have [ or {'s in their name
species = @from i in species begin
	@where (search.(i.currentTaxonName, '[') == 0)  &&
		(search.(i.currentTaxonName, '{') == 0) 	&&
		(search.(i.currentTaxonName, ' ') != 0)
	@select i
	@collect DataFrame
end

if !isfile("occurrence_counts.csv")
	occurrence_count = by(species, :currentTaxonName, nrow)
	rename!(occurrence_count, :currentTaxonName, :taxon)
	rename!(occurrence_count, :x1. :n)
	writetable("occurrence_counts.csv", occurrence_count)
else
	occurrence_counts = readtable("occurrence_counts.csv")
	unique_names = unique(convert(Array, occurrence_counts[:taxon]))
end

if !isfile("plot_alphas.csv")
	plot_alphas = by(species, :OBSERVATION_ID, df -> DataFrame(α=nrow))
	writetable("plot_alphas.csv", plot_alphas)
else
	plot_alphas = readtable("plot_alphas.csv")
end

###
### Calculating theta values
###
const N = 25
const ITER_COUNT = 100

function select_N_rand_plots(nums)
	s = Set{Int}()
	len = length(nums)

	while length(s) < N
		push!(s, nums[rand(1:len)])
	end

	return s
end

function calculate_mean_alpha(nums)
	cum_sum = 0
	
	for id in nums
		cum_sum += plot_alphas[plot_alphas[:id] .== id, :][:α][1]
	end
	
	cum_sum / length(nums)
end

function calculate_gamma(nums)
	spps = Set{String}()

	for id in nums
		occs = species[species[:OBSERVATION_ID] .== id, :][:currentTaxonName]
		for o in occs
			push!(spps, o)
		end
	end

	return length(spps)
end

unique_species = convert(Array, unique(species[:currentTaxonName]))
thetas = Float64[]
taxa = String[]
i = 1

for current_spp in unique_species
	occs = species[species[:currentTaxonName] .== current_spp, :]
	nrow(occs) < N && continue
	push!(taxa, current_spp)
	Θ = 0

	 Threads.@threads for a=1:ITER_COUNT
		sample_plots = select_N_rand_plots(convert(Array, occs[:OBSERVATION_ID]))

		γ = calculate_gamma(sample_plots)
		ᾱ = calculate_mean_alpha(sample_plots)
		Θ += (γ - ᾱ)
	end

	Θ /= ITER_COUNT
	push!(thetas, Θ)
	i += 1
	println("* finished $(current_spp) ($(i))")
end

results = DataFrame()
results[:taxa] = taxa
results[:Θ] = thetas
writetable("theta_species.csv", results)

…and I’m worried that my use of @threads is not correct. While working in Juno, if I run the entire script, with the @time macro placed on the outermost for loop, the amount of memory allocated seems quite large:

80.600184 seconds (210.53 M allocations: 45.076 GiB, 9.65% gc time)

that’s not thread-safe. Think about what each thread is doing. They will all grab the Θ and update it. What if two threads grab the Θ at the same time? They will both add and then write, but the second write won’t include the add of the first, so the value will be off. You should instead make an array with a separate Θ for each threadid(), reduce on each of those, and do a final reduce afterwards. The more expensive the computation the more rare this will be, but this code is open for this to occur and it will skew the results.

That’s a very expensive way to do that calculation. You don’t need spps, just the length, right? So instead of push!ing values, just keep a running counter for the length.

function calculate_gamma(nums)
	spps = 0

	for id in nums
		occs = species[species[:OBSERVATION_ID] .== id, :][:currentTaxonName]
                spps += length(occs)
	end

	return spps
end

But even then, occs = species[species[:OBSERVATION_ID] .== id, :][:currentTaxonName] that computation is going to allocate a lot. species[:OBSERVATION_ID] .== id will create a temporary array, and species[species[:OBSERVATION_ID] .== id, :] will then generate a temporary array for the slice. That could be a ton if the data set is decently sized! The first can be fixed by having a cache array to write into, then use a view:

cache = BitArray(length(species[:OBSERVATION_ID] ))
for id in nums
   cache .= species[:OBSERVATION_ID] .== id
   @view(species[cache,:])[:currentTaxonName]
end

I believe that should work, though likely you might want to handle the data slightly differently if you’re always cutting it by OBSERVATION_ID.

That gives you an idea for the kinds of optimizations you can do. I would highly recommend optimizing this before multithreading, since there’s a ton that can be improved.

1 Like

Thanks for the feedback! I have some questions about what you wrote:

Is something like this:

Θ = Array{Float64}(Threads.nthreads())

	Threads.@threads for a=1:ITER_COUNT
		sample_plots = select_N_rand_plots(convert(Array, occs[:OBSERVATION_ID]))

		γ = calculate_gamma(sample_plots)
		ᾱ = calculate_mean_alpha(sample_plots)
		Θ[Threads.threadid()] = (γ - ᾱ)
	end

	Θ = reduce(+, Θ)
	Θ /= ITER_COUNT
	push!(thetas, Θ)

along the lines of what you were talking about?

I’m not sure that does what I was hoping to anymore. The function is supposed to return the number of unique species contained within the plots that have nums as their ID numbers. By adding the length of occs each iteration, I may very well be counting the same species (if it occurs at different plots in nums) several times, right?

Are there other areas that you see as low-hanging optimization fruits? I’m rather new to this and would really appreciate any additional input!

Oh, it’s supposed to be the unique species? Then yes, you’ll need to do something different, but there should be a way to re-use cache vectors to cut allocations down. I am not entirely sure why you need to plot it either. Can’t you get similar information directly from the data and not have to go through the plot?

With that amount of allocations, I would profile and find out ways to cut down the allocations. The best way to optimize is to target what is actually taking a long time.

ProfileView.jl will give you a nice chart which shows where time is spent: use that to find out which lines you need to optimize. From experience I know that it will be something to with temporary arrays given how many allocations you have. There must be some point in there that is allocating a lot and clogging the whole system.

2 Likes

I installed ProfileView and think I have identified the culprit:

function calculate_gamma(nums)
    for id in nums
		---> occs = species[species[:OBSERVATION_ID] .== id, :][:currentTaxonName]
		...
	return length(spps)
end

Can you suggest any way in which I might reduce the time spent executing that line?

Sorry for the confusion. When I said “plot,” I was talking about locations at which the data that I’m working with were sampled.

From what you are doing, you want zeros and += in the loop.

You mean like this?

@time for current_spp in unique_species
	occs = species[species[:currentTaxonName] .== current_spp, :]
	nrow(occs) < N && continue
	push!(taxa, current_spp)
	Θ = zeros(Threads.nthreads())

	Threads.@threads for a=1:ITER_COUNT
		sample_plots = select_N_rand_plots(occs[:OBSERVATION_ID])

		γ = calculate_gamma(sample_plots)
		ᾱ = calculate_mean_alpha(sample_plots)
		Θ[Threads.threadid()] += (γ - ᾱ)
	end

	Θ = reduce(+, Θ)
	Θ /= ITER_COUNT
	push!(thetas, Θ)
	i += 1
	println("* finished $(current_spp) ($(i))")
end

I’ve reduced my time quite a bit thanks to the suggestion that I profile. I rewrote my gamma calculating function by removing the most expensive computation. Here’s my time now:

10.806910 seconds (45.52 M allocations: 2.283 GiB, 7.51% gc time)