Building on @greg_plowman’s response, the version below is closer to a dynamically scheduled threaded reduction operator:
function tmapreduce(f::Function, op, v0, itr)
@assert length(itr) > 0
mutex = Mutex()
output = deepcopy(v0) # make a deepcopy of starting value
poppable_itr = Vector(deepcopy(itr)) # convert to set to be able to pop
# array of input arguments to f, to store input for each thread
inputs = Vector{typeof(itr[1])}(nthreads()) # deprecated soon?
@threads for i in eachindex(itr)
lock(mutex)
inputs[threadid()] = pop!(poppable_itr)
unlock(mutex)
loop_output = f(inputs[threadid()])
lock(mutex)
output = op(output, loop_output)
unlock(mutex)
end
return output
end
It pops an input argument from the poppable iterator, and passes that to the function. A Mutex is used to maintain thread safety.
For 4 threads, and the following test functions:
function f(i::Int)
i <= nthreads()^2 && Libc.systemsleep(1.0)
return 1
end
function g(i::Int)
i%nthreads() == 0 && Libc.systemsleep(1.0)
return 1
end
just check my logic:
julia> sum([i<=nthreads()^2 for i in 1:nthreads()^3])
16
julia> sum([i%nthreads()==0 for i in 1:nthreads()^3])
16
which means that iterating over 1:nthreads()^3 should take 16 seconds for both f
and g
.
A cursory glance at the timings shows:
julia> @time tmapreduce(f, +, 0, 1:nthreads()^3)
4.012927 seconds (86 allocations: 4.016 KiB)
64
julia> @time tmapreduce(g, +, 0, 1:nthreads()^3)
5.023034 seconds (86 allocations: 4.016 KiB)
64
julia> @time threaded_mapreduce(f, +, 0, 1:nthreads()^3)
16.037183 seconds (7 allocations: 336 bytes)
64
julia> @time threaded_mapreduce(g, +, 0, 1:nthreads()^3)
4.007422 seconds (7 allocations: 336 bytes)
64
But sometimes tmapreduce
doesn’t work as intended:
julia> @time tmapreduce(f, +, 0, 1:nthreads()^3)
16.030326 seconds (86 allocations: 4.016 KiB)
64
tmapreduce
isn’t perfect by any means, but sometimes avoids the worst case scenario.