What comes to my mind first is a producer/consumer style solution with a shared Channel
(doc page).
So I would propose a solution like:
function produce_from_iterator(it)
function _inner(channel)
for i in it
put!(channel, i)
end
close(channel)
end
return _inner
end
function makeresult(n::Int, nworkers=Threads.nthreads())
# make channel from iterator
it = makeiterator(n)
channel = Channel(produce_from_iterator(it))
# define what each task does
function task()
res = 0.0
try
while true
x = take!(channel)
res += f(x)
end
catch
end
return res
end
# create and start tasks
tasks = [Task(task) for _ in 1:nworkers]
schedule.(tasks)
res = mapreduce(fetch, +, tasks) # get and combine results
return res
end
Note that this will only be worth it if the work done on each element takes a long time to amortize the synchronization overhead introduced by the Channel
. See benchmark below
julia> using BenchmarkTools
julia> f(x) = log(sin(x)^2*tan(x)^2)
julia> makeiterator(n) = (i for i in 1:n)
julia> @btime makeresult(1_000_000) # 4 threads
6.153 s (2999538 allocations: 45.77 MiB)
-5.544428536817374e6
julia> @btime mapreduce(f, +, makeiterator(1_000_000))
39.459 ms (0 allocations: 0 bytes)
-1.3861071342043434e6
Also note that the result is different due to floating point inaccuracy in this case.
Perhaps it would be better to chunk the iterator more like so:
function produce_from_iterator(it, chunksize, T=Any)
function _inner(channel)
buffer = Vector{T}(undef, chunksize)
at = 1
for i in it
buffer[at] = i
if at == chunksize
put!(channel, copy(buffer))
at = 0
end
at += 1
end
put!(channel, resize!(buffer, at-1))
close(channel)
end
return _inner
end
function makeresult(n::Int, chunksize, nworkers=Threads.nthreads())
# make channel from iterator
it = makeiterator(n)
channel = Channel(produce_from_iterator(it, chunksize, Int))
# define what each task does
function task()
res = 0.0
try
while true
xs = take!(channel)
res += mapreduce(f, +, xs)
end
catch
end
return res
end
# create and start tasks
tasks = [Task(task) for _ in 1:nworkers]
schedule.(tasks)
res = mapreduce(fetch, +, tasks) # get and combine results
return res
end
Results:
julia> @btime makeresult(1_000_000,1) # same as above but worse
6.457 s (3999543 allocations: 106.81 MiB)
-5.544428536817374e6
julia> @btime makeresult(1_000_000,10)
698.351 ms (1299543 allocations: 32.04 MiB)
-5.544428536817317e6
julia> @btime makeresult(1_000_000,100)
121.202 ms (1029543 allocations: 24.11 MiB)
-5.544428536817373e6
EDIT: I realized that my complicated produce_from_iterator(it, chunksize)
could have been written much simpler using Iterators.partition
…