Sum result of Threads.foreach()

Hi!

First, I read several other posts about adjacent problems to this, but I don’t think any of them had a direct answer. If this post is redundant, I apologize; I’m not sure what key terms to search to get more specific results than I already have.

Tl;Dr – I’d like to loop over a Channel{Tuple{Int64,Int64}} until it’s exhausted, passing each Tuple into a function and then averaging the result of that function for each Tuple. I’m using @atomic to do that now, but I’d like to add this to a package that might get used in environments with older versions of Julia, so I’m wondering if there is a way to do this that is more backwards compatible.

My current loop is

mutable struct Atomic{T}; @atomic x::T; end
sum_coherence = Atomic(0.0)
num_pairs = Atomic(0)
Threads.foreach(topic_word_pairs) do pair 
    confirmation = calculate_confirmation(pair, model.corp)
    @atomic sum_coherence.x += confirmation
    @atomic num_pairs.x += 1
end
coherence = sum_coherence.x / num_pairs.x

Where topic_word_pairs is the channel.

I know that I could just do this single threaded, but as calculate_confirmation() is quite expensive, I’d love to find a way to parallelize it.

Since I’m submitting this to someone else’s package, I’m trying to avoid adding dependencies and make my code work with as many versions of Julia as possible (I think @atomic is only in 1.7, yeah?). I’d appreciate your thoughts on the best approach!

why not send the results down another channel and sum them there ?

there is a section on the producer - consumer model in the docs

https://docs.julialang.org/en/v1/manual/asynchronous-programming/

1 Like

I wasn’t totally sure how to do that.

Maybe like this?

mean_coherence = Channel{Float64}() do ch
    Threads.foreach(topic_word_pairs) do pair
        confirmation = calculate_confirmation(pair, model.corp)
        put!(ch, confirmation)
    end
end
    
for r in mean_coherence
    sum_coherence += r
    num_pairs += 1
end

Missed the second half of your post. Reading that now. Thanks!

I saw your … replying as I was editing my comment to add the link

1 Like

Just to close the loop on this – I was able to get this working with the code I listed in this comment. I played around with the @async macro that is used in the example that @lawless-m linked to, but I didn’t have luck getting it to run, and since I have a solution, I’m going to move forward.

Thanks!

Your example is close to what I would imagine but I would imagine after creating the channel spawn a thread which runs a function that just reads from the channel and makes the sum then start shoving items into the channel finally at the end wait on the result of the spawned thread.

Pseudocode

ch=Channel...
th = @spawn(sumoverchannel)
foreach do ...
res = wait(th)

You would need to have a special flag value you can send on the channel at the end to get the spawned thread to terminate and return it’s value

I’m super new to all this multithreading stuff, @dlakelan. Would you mine explaining the benefit of spawning another thread instead of using the main thread to loop over the collection channel and add everything up?

The accumulator will run in parallel to the generators in the foreach, so the whole process will finish sooner. Otherwise you’re running the accumulator after doing all the generation. since “foreach” says:

This function will wait for all internally spawned tasks to complete before returning.

Using atomics, locks, or channels is very likely not the right approach if you are using parallelism for speeding things up (unless you know what you are doing). I normally suggest using Folds.sum or FLoops.@floop for this. For more information, see: A quick introduction to data parallelism in Julia

That said, if “no package” is the hard requirement, you can write something like the following (untested):

topic_word_pairs::AbstractArray  # assumption

basesize = cld(length(topic_word_pairs), Threads.nthreads())
chunks = Iterators.partition(topic_word_pairs, basesize)
sums_coherence = zeros(length(chunks))
nums_paris = zeros(Int, length(chunks))
@sync for (i, chunk) in enumerate(chunks)
    Threads.@spawn begin
        local sum_coherence = 0.0
        local num_pairs = 0
        for pair in chunk
            confirmation = calculate_confirmation(pair, model.corp)
            sum_coherence += confirmation
            num_pairs += 1
        end
        sums_coherence[i] = sum_coherence
        nums_pairs[i] = num_pairs
    end
end

sum_coherence = sum(sums_coherence)
num_pairs = sum(nums_pairs)
1 Like

Thank you for this example! It makes a lot of sense to me to do the splitting before breaking out into parallel so you don’t have the overhead of making a bunch of new tasks.

I’ve used Folds before and really like it. I’m just a new contributor, so I’m trying to keep my footprint as small as possible for now.