Little parallel speedup on binary tree structure in Julia 1.3.0-rc1

Hi all,

I am trying to use @spawn in Julia 1.3.0-rc1 to parallelize some slow calculations.

My data is in a binary tree structure, where the calculations have to first be done at the tips, then the nodes ancestral to those tips, then the nodes ancestral to those nodes, etc.

It seems that this should be a dynamic calculation, where the calculations are stacked from the tip nodes to the root node, and that this could be parallized when many cores are available.

I’ve got code that sets up an example tree structure (7 nodes, 4 tips and 3 internal nodes) and does calculations down it.

The slow calculation here is countloop(), by increasing the number of iterations it gets slower.

I would expect the serial calculation to take about 6 time units, as there are 6 slow countloop() operations, one for each edge.

I would expect the serial calculation to take about 3 time units, because

  • the 4 tip edge countloop() operations can be done on 4 different cores
  • the next 2 internal edge countloop() operations can be done on 2 different cores
  • the last internal edge countloop() operations can be done on 1 core

However, no matter slow I make countloop(), I don’t seem to get multicore performance advantages in the 50% range.

I’m posting the complete run-from-scratch code below, and the resulting @benchmark I see on a single-core vs many-core run. Any advice / thoughts very much appreciated!

Cheers and thanks!!
Nick

# Julia 1.3.0-rc1

import Base.Threads.@spawn
using Random

# A function that runs fast or slow, depending on the size
function countloop(num_iterations)
	x = 0.0
	random_number_generator = MersenneTwister(current_nodeIndex);

	for i in 1:num_iterations
	   x = x + (randn(random_number_generator, 1)[1] / num_iterations)
	end
	return(x)
end

# Results structure "Res"
# Set up for a binary tree with 4 tips and 3 internal nodes
#   (7 nodes total)
# This contains the tree structure in "uppass_edgematrix".
# uppass_edgematrix left  column: ancestral  nodeIndexes
# uppass_edgematrix right column: descendant nodeIndexes
struct Res
	root_nodeIndex::Int64
	uppass_edgematrix::Array{Int64}
	data_at_each_nodeIndex_edgeTop::Array{Float64}
	data_at_each_nodeIndex_edgeBot::Array{Float64}
	thread_for_each_nodeOp::Array{Int64}
	thread_for_each_edgeOp::Array{Int64}
end

# Simple default tree (7 nodes, 4 tip nodes, 3 internal nodes
function construct_Res()
	root_nodeIndex = 7
	uppass_edgematrix = [7 6; 7 5; 5 4; 5 3; 3 2; 3 1]
	data_at_each_nodeIndex_edgeTop = [1.0, 2.0, 3.0, 4.0, 0.0, 0.0, 0.0]
	data_at_each_nodeIndex_edgeBot = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
	thread_for_each_nodeOp = collect(repeat([0], 7))
	thread_for_each_edgeOp = collect(repeat([0], 7))
	res = Res(root_nodeIndex, uppass_edgematrix, data_at_each_nodeIndex_edgeTop, data_at_each_nodeIndex_edgeBot, thread_for_each_nodeOp, thread_for_each_edgeOp)
	return res
end


# NodeOp: Spawning an operation on a node, different behavior
#         if the node has 2 descendants (internal node)
#         or 1 descendant (tip node)

# Touch the nodes from the root to the tips
# When tip edges are reached, spawn calculations on those edges
# When the sister edges meet at an internal node, start calculating that internal edge
function nodeOp(current_nodeIndex, res)
	# Record the current thread ID
	tmp_threadID = Threads.threadid()
	res.thread_for_each_nodeOp[current_nodeIndex] = tmp_threadID
	
	# TF=true/false, find the edges that end in the current nodeIndex
	TF = res.uppass_edgematrix[:,1] .== current_nodeIndex
	if (sum(TF) == 2)
		# If 2 edges have this node as an ancestor this is an internal node,
		# spawn calculations on the nodes above
		parent_nodeIndexes = res.uppass_edgematrix[TF,2]
		tmp1 = @spawn nodeOp(parent_nodeIndexes[1], res)
		tmp2 = @spawn nodeOp(parent_nodeIndexes[2], res)	
		nodeData_at_top = fetch(tmp1) + fetch(tmp2)
		res.data_at_each_nodeIndex_edgeTop[current_nodeIndex] = nodeData_at_top
		
		# After the calculations from the nodes above have been done,
		# calculate down this edge, if not the rootnode
		if (current_nodeIndex != res.root_nodeIndex)
			tmp3 = @spawn edgeOp(current_nodeIndex, res)
			nodeData_at_bottom = fetch(tmp3)
			res.data_at_each_nodeIndex_edgeBot[current_nodeIndex] = nodeData_at_bottom
			return(nodeData_at_bottom)
		else
			# It's the root, so just return the calc
			calc = res.data_at_each_nodeIndex_edgeTop[current_nodeIndex]
			return(calc)
		end
	elseif (sum(TF) == 0)
		# If 1 edge has this node as an ancestor this is a tip node,
		# so spawn calculation on the edge
		tmp3 = @spawn edgeOp(current_nodeIndex, res)
		nodeData_at_bottom = fetch(tmp3)
		res.data_at_each_nodeIndex_edgeBot[current_nodeIndex] = nodeData_at_bottom
		return(nodeData_at_bottom)
	end
end

# EdgeOp: Spawning an operation on an edge
function edgeOp(current_nodeIndex, res)
	tmp_threadID = Threads.threadid()
	y = countloop(100000000)

	res.thread_for_each_edgeOp[current_nodeIndex] = tmp_threadID
	nodeData_at_top = res.data_at_each_nodeIndex_edgeTop[current_nodeIndex]
	nodeData_at_bottom = nodeData_at_top / 2.0
	return(nodeData_at_bottom)
end

res = construct_Res();

res.data_at_each_nodeIndex_edgeTop
res.data_at_each_nodeIndex_edgeBot
res.thread_for_each_nodeOp
res.thread_for_each_edgeOp

current_nodeIndex = 7
calc = nodeOp(current_nodeIndex, res)

res.data_at_each_nodeIndex_edgeTop
res.data_at_each_nodeIndex_edgeBot
res.thread_for_each_nodeOp
res.thread_for_each_edgeOp

using BenchmarkTools # for @benchmark
res = construct_Res();
@benchmark calc = nodeOp(current_nodeIndex, res)
res.thread_for_each_nodeOp
res.thread_for_each_edgeOp





#######################################################
# Results:
#######################################################

# Serial (1 thread only)
#
# julia> @benchmark calc = nodeOp(current_nodeIndex, res)
# BenchmarkTools.Trial: 
#   memory estimate:  53.64 GiB
#   allocs estimate:  600000215
#   --------------
#   minimum time:     38.078 s (4.89% GC)
#   median time:      38.078 s (4.89% GC)
#   mean time:        38.078 s (4.89% GC)
#   maximum time:     38.078 s (4.89% GC)
#   --------------
#   samples:          1
#   evals/sample:     1
# 
# julia> res.thread_for_each_nodeOp
# 7-element Array{Int64,1}:
#  1
#  1
#  1
#  1
#  1
#  1
#  1
# 
# julia> res.thread_for_each_edgeOp
# 7-element Array{Int64,1}:
#  1
#  1
#  1
#  1
#  1
#  1
#  0


# Parallel (24 cores, only a max of 4 would ever be 
# used at once however)
# 
# @benchmark calc = nodeOp(current_nodeIndex, res)
# BenchmarkTools.Trial: 
#   memory estimate:  53.64 GiB
#   allocs estimate:  600000240
#   --------------
#   minimum time:     33.753 s (37.75% GC)
#   median time:      33.753 s (37.75% GC)
#   mean time:        33.753 s (37.75% GC)
#   maximum time:     33.753 s (37.75% GC)
#   --------------
#   samples:          1
#   evals/sample:     1
# 
# julia> res.thread_for_each_nodeOp
# 7-element Array{Int64,1}:
#  10
#  20
#   2
#  15
#  11
#   2
#   1
# 
# julia> res.thread_for_each_edgeOp
# 7-element Array{Int64,1}:
#  21
#   7
#  12
#  12
#   2
#   3
#   0