Hi all,
I am trying to use @spawn in Julia 1.3.0-rc1 to parallelize some slow calculations.
My data is in a binary tree structure, where the calculations have to first be done at the tips, then the nodes ancestral to those tips, then the nodes ancestral to those nodes, etc.
It seems that this should be a dynamic calculation, where the calculations are stacked from the tip nodes to the root node, and that this could be parallized when many cores are available.
I’ve got code that sets up an example tree structure (7 nodes, 4 tips and 3 internal nodes) and does calculations down it.
The slow calculation here is countloop(), by increasing the number of iterations it gets slower.
I would expect the serial calculation to take about 6 time units, as there are 6 slow countloop() operations, one for each edge.
I would expect the serial calculation to take about 3 time units, because
- the 4 tip edge countloop() operations can be done on 4 different cores
- the next 2 internal edge countloop() operations can be done on 2 different cores
- the last internal edge countloop() operations can be done on 1 core
However, no matter slow I make countloop(), I don’t seem to get multicore performance advantages in the 50% range.
I’m posting the complete run-from-scratch code below, and the resulting @benchmark I see on a single-core vs many-core run. Any advice / thoughts very much appreciated!
Cheers and thanks!!
Nick
# Julia 1.3.0-rc1
import Base.Threads.@spawn
using Random
# A function that runs fast or slow, depending on the size
function countloop(num_iterations)
x = 0.0
random_number_generator = MersenneTwister(current_nodeIndex);
for i in 1:num_iterations
x = x + (randn(random_number_generator, 1)[1] / num_iterations)
end
return(x)
end
# Results structure "Res"
# Set up for a binary tree with 4 tips and 3 internal nodes
# (7 nodes total)
# This contains the tree structure in "uppass_edgematrix".
# uppass_edgematrix left column: ancestral nodeIndexes
# uppass_edgematrix right column: descendant nodeIndexes
struct Res
root_nodeIndex::Int64
uppass_edgematrix::Array{Int64}
data_at_each_nodeIndex_edgeTop::Array{Float64}
data_at_each_nodeIndex_edgeBot::Array{Float64}
thread_for_each_nodeOp::Array{Int64}
thread_for_each_edgeOp::Array{Int64}
end
# Simple default tree (7 nodes, 4 tip nodes, 3 internal nodes
function construct_Res()
root_nodeIndex = 7
uppass_edgematrix = [7 6; 7 5; 5 4; 5 3; 3 2; 3 1]
data_at_each_nodeIndex_edgeTop = [1.0, 2.0, 3.0, 4.0, 0.0, 0.0, 0.0]
data_at_each_nodeIndex_edgeBot = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
thread_for_each_nodeOp = collect(repeat([0], 7))
thread_for_each_edgeOp = collect(repeat([0], 7))
res = Res(root_nodeIndex, uppass_edgematrix, data_at_each_nodeIndex_edgeTop, data_at_each_nodeIndex_edgeBot, thread_for_each_nodeOp, thread_for_each_edgeOp)
return res
end
# NodeOp: Spawning an operation on a node, different behavior
# if the node has 2 descendants (internal node)
# or 1 descendant (tip node)
# Touch the nodes from the root to the tips
# When tip edges are reached, spawn calculations on those edges
# When the sister edges meet at an internal node, start calculating that internal edge
function nodeOp(current_nodeIndex, res)
# Record the current thread ID
tmp_threadID = Threads.threadid()
res.thread_for_each_nodeOp[current_nodeIndex] = tmp_threadID
# TF=true/false, find the edges that end in the current nodeIndex
TF = res.uppass_edgematrix[:,1] .== current_nodeIndex
if (sum(TF) == 2)
# If 2 edges have this node as an ancestor this is an internal node,
# spawn calculations on the nodes above
parent_nodeIndexes = res.uppass_edgematrix[TF,2]
tmp1 = @spawn nodeOp(parent_nodeIndexes[1], res)
tmp2 = @spawn nodeOp(parent_nodeIndexes[2], res)
nodeData_at_top = fetch(tmp1) + fetch(tmp2)
res.data_at_each_nodeIndex_edgeTop[current_nodeIndex] = nodeData_at_top
# After the calculations from the nodes above have been done,
# calculate down this edge, if not the rootnode
if (current_nodeIndex != res.root_nodeIndex)
tmp3 = @spawn edgeOp(current_nodeIndex, res)
nodeData_at_bottom = fetch(tmp3)
res.data_at_each_nodeIndex_edgeBot[current_nodeIndex] = nodeData_at_bottom
return(nodeData_at_bottom)
else
# It's the root, so just return the calc
calc = res.data_at_each_nodeIndex_edgeTop[current_nodeIndex]
return(calc)
end
elseif (sum(TF) == 0)
# If 1 edge has this node as an ancestor this is a tip node,
# so spawn calculation on the edge
tmp3 = @spawn edgeOp(current_nodeIndex, res)
nodeData_at_bottom = fetch(tmp3)
res.data_at_each_nodeIndex_edgeBot[current_nodeIndex] = nodeData_at_bottom
return(nodeData_at_bottom)
end
end
# EdgeOp: Spawning an operation on an edge
function edgeOp(current_nodeIndex, res)
tmp_threadID = Threads.threadid()
y = countloop(100000000)
res.thread_for_each_edgeOp[current_nodeIndex] = tmp_threadID
nodeData_at_top = res.data_at_each_nodeIndex_edgeTop[current_nodeIndex]
nodeData_at_bottom = nodeData_at_top / 2.0
return(nodeData_at_bottom)
end
res = construct_Res();
res.data_at_each_nodeIndex_edgeTop
res.data_at_each_nodeIndex_edgeBot
res.thread_for_each_nodeOp
res.thread_for_each_edgeOp
current_nodeIndex = 7
calc = nodeOp(current_nodeIndex, res)
res.data_at_each_nodeIndex_edgeTop
res.data_at_each_nodeIndex_edgeBot
res.thread_for_each_nodeOp
res.thread_for_each_edgeOp
using BenchmarkTools # for @benchmark
res = construct_Res();
@benchmark calc = nodeOp(current_nodeIndex, res)
res.thread_for_each_nodeOp
res.thread_for_each_edgeOp
#######################################################
# Results:
#######################################################
# Serial (1 thread only)
#
# julia> @benchmark calc = nodeOp(current_nodeIndex, res)
# BenchmarkTools.Trial:
# memory estimate: 53.64 GiB
# allocs estimate: 600000215
# --------------
# minimum time: 38.078 s (4.89% GC)
# median time: 38.078 s (4.89% GC)
# mean time: 38.078 s (4.89% GC)
# maximum time: 38.078 s (4.89% GC)
# --------------
# samples: 1
# evals/sample: 1
#
# julia> res.thread_for_each_nodeOp
# 7-element Array{Int64,1}:
# 1
# 1
# 1
# 1
# 1
# 1
# 1
#
# julia> res.thread_for_each_edgeOp
# 7-element Array{Int64,1}:
# 1
# 1
# 1
# 1
# 1
# 1
# 0
# Parallel (24 cores, only a max of 4 would ever be
# used at once however)
#
# @benchmark calc = nodeOp(current_nodeIndex, res)
# BenchmarkTools.Trial:
# memory estimate: 53.64 GiB
# allocs estimate: 600000240
# --------------
# minimum time: 33.753 s (37.75% GC)
# median time: 33.753 s (37.75% GC)
# mean time: 33.753 s (37.75% GC)
# maximum time: 33.753 s (37.75% GC)
# --------------
# samples: 1
# evals/sample: 1
#
# julia> res.thread_for_each_nodeOp
# 7-element Array{Int64,1}:
# 10
# 20
# 2
# 15
# 11
# 2
# 1
#
# julia> res.thread_for_each_edgeOp
# 7-element Array{Int64,1}:
# 21
# 7
# 12
# 12
# 2
# 3
# 0