Manage tasks from Threads.@spawn without using map()

Hi,
I need to analyze a long vector and calculate the occurrences of certain letters, as it will be very long it will be better to use parallel processing for this.

The thing is, I can do this with map(), storing tasks. But I would like to compare the efficiency in case of just using “for” in this process.
To create a reproducible example, let’s calculate the occurrences of the numbers “1” and “2” in a vector.

using ChunkSplitters

array1 = repeat([1,2],1111)
chk = chunks(1:length(array1); n=Threads.nthreads(), split=:batch) 
counts_total = Dict(ii  => 0 for ii in [1,2])

tasks = map(chk) do inds
    Threads.@spawn begin
        sub_counts = Dict(ii  => 0 for ii in [1,2])
        for val in array1[inds]
            sub_counts[val] += 1
        end
        sub_counts
    end
end
thread_sums = fetch.(tasks)
for sub in thread_sums
   merge!(+,counts_total,sub)
end
counts_total

In this case it is possible to obtain the general sum:

Dict{Int64, Int64} with 2 entries:
  2 => 1111
  1 => 1111

Trying to do the same thing just with a loop, I don’t know how to save the tasks so I have to update the total sum vector with merge! inside the loop:

using ChunkSplitters

array1 = repeat([1,2],1111)
chk = chunks(1:length(array1); n=Threads.nthreads(), split=:batch) 
counts_total = Dict(ii  => 0 for ii in [1,2])

for inds in chk
    Threads.@spawn begin
        sub_counts = Dict(ii  => 0 for ii in [1,2])
        for val in array1[inds]
            sub_counts[val] += 1
        end
        #as I can't store staks i have to merge! counts_total from here
        merge!(+,counts_total,sub_counts)
    end
end
counts_total

Therefore, there is an error in the general account:

Dict{Int64, Int64} with 2 entries:
  2 => 667
  1 => 667

Is there any way to do this with just for without using map?

Thank you very much,
my best

If you want to save tasks from the for loop, you need to allocate an array and push the tasks into it, e.g.:

using ChunkSplitters
array1 = repeat([1,2],1111)

chk = chunks(1:length(array1); n=Threads.nthreads(), split=:batch) 
counts_total = Dict(ii  => 0 for ii in [1,2])

tasks = Task[]
for inds in chk
    task = Threads.@spawn begin
        sub_counts = Dict(ii  => 0 for ii in [1,2])
        for val in array1[inds]
            sub_counts[val] += 1
        end
        return sub_counts
    end
    push!(tasks, task)
end

thread_sums = fetch.(tasks)
for sub in thread_sums
   merge!(+,counts_total,sub)
end
counts_total

I would generally recommend using OhMyThreads.jl though

1 Like

Another general tip: A Dict has constant time membership access but it is rather slow. If you want to count symbols and there is a limited (known) amount then it will likely be much faster if you store the counts in a Vector{Int} that you index by the element somehow.

@Satvik thank you for you attention and support. The code worked really well.
@abraemer Thanks for the tip, from what I saw there’s no way to put names in vectors, right? I’m going to think of some way to index numerically and create a fixed dictionary like:
A → 1, B → 2, C → 3 and so on…

Here is the benchmark:

values = repeat([1,2],1111000000);

@benchmark count_loop(values)

BenchmarkTools.Trial: 2 samples with 1 evaluation.
 Range (min … max):  3.610 s … 5.533 s  ┊ GC (min … max):  5.77% … 34.39%
 Time  (median):     4.571 s            ┊ GC (median):    23.09%
 Time  (mean ± σ):   4.571 s ± 1.360 s  ┊ GC (mean ± σ):  23.09% ± 20.24%

  █                                                     █  
  █▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁█ ▁
  3.61 s        Histogram: frequency by time       5.53 s <

 Memory estimate: 16.56 GiB, allocs estimate: 578.
@benchmark count_map(values)

BenchmarkTools.Trial: 2 samples with 1 evaluation.
 Range (min … max):  3.477 s … 5.985 s  ┊ GC (min … max):  0.08% … 31.25%
 Time  (median):     4.731 s            ┊ GC (median):    19.79%
 Time  (mean ± σ):   4.731 s ± 1.773 s  ┊ GC (mean ± σ):  19.79% ± 22.04%

  █                                                     █  
  █▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁█ ▁
  3.48 s        Histogram: frequency by time       5.98 s <

 Memory estimate: 16.56 GiB, allocs estimate: 579.

Is there any better way to free up memory after the process? I’m using GC.gc() but it doesn’t seem to return to the previous state completely.

Thank you very much,
my best

Just to be sure: You don’t mean to have a Dict that does this mapping right? Because that would defeat the purpose ofc. You need to to that yourself somehow. If you have an ASCII string you could simply take the char value to index a Vector of size 255 for example. Or if you know what range of chars are allowed then you can shift and shrink that range ofc.

The benchmarks you posted show quite a lot of allocations. I wonder why that is, could you include the code of the functions as well?
Generally, you don’t need to “clean up” explicitly. Julia will do that for you automatically.

The code uses array1[inds], which makes a copy. If you add a @view here, the size of the allocations is vastly reduced:

julia> array1 = repeat([1,2],1111000);

julia> @benchmark count_loop($array1)
BenchmarkTools.Trial: 525 samples with 1 evaluation.
 Range (min … max):  7.491 ms … 270.054 ms  ┊ GC (min … max): 0.00% … 95.98%
 Time  (median):     8.342 ms               ┊ GC (median):    0.00%
 Time  (mean ± σ):   9.514 ms ±  11.939 ms  ┊ GC (mean ± σ):  8.18% ±  8.11%

  ▇█▁
  ███▇▆▄▆▅▆▆▅▄▄▄▄▃▃▃▃▄▄▃▃▃▃▃▂▃▃▃▃▂▂▃▁▂▂▃▂▁▂▁▂▁▂▁▁▂▁▁▁▁▁▁▁▁▁▁▂ ▃
  7.49 ms         Histogram: frequency by time        14.9 ms <

 Memory estimate: 16.96 MiB, allocs estimate: 127.

julia> @benchmark count_loop_view($array1)  # just added @view in front of array1[inds]
BenchmarkTools.Trial: 558 samples with 1 evaluation.
 Range (min … max):  6.615 ms … 15.631 ms  ┊ GC (min … max): 0.00% … 0.00%
 Time  (median):     8.470 ms              ┊ GC (median):    0.00%
 Time  (mean ± σ):   8.954 ms ±  1.973 ms  ┊ GC (mean ± σ):  0.00% ± 0.00%

  ▂▂█▅▅▂▁▁▁▁
  ███████████▃██▇▆█▆▆▆▅▅▅▅▆▇▅▄█▅▃▆▄▆▅▄▃▄▂▃▃▃▃▃▂▃▃▃▁▃▃▂▁▂▁▂▃▃ ▄
  6.62 ms        Histogram: frequency by time        14.7 ms <

 Memory estimate: 11.23 KiB, allocs estimate: 111.
Some minor adjustments to reduce allocations a bit further

You could also slightly reduce the number of allocations by changing

tasks = Task[]
for inds in chk
    ...
    push!(tasks, task)  # will have to reallocate when exceeding the currently allocated capacity
end

to

tasks = Vector{Task}(undef, length(chk))
for (task_idx, inds) in enumerate(chk)
    tasks[task_idx] = Threads.@spawn ...
end

and

thread_sums = fetch.(tasks)  # allocates a Vector{Dict{...}}
for sub in thread_sums
   merge!(+,counts_total,sub)
end

to

for task in tasks
   merge!(+, counts_total, fetch(task))
end

But as nthreads() is presumably relatively small, this won’t make too much of a difference. E.g. for nthreads() == 8:

julia> @benchmark count_loop_view_2($array1)
BenchmarkTools.Trial: 644 samples with 1 evaluation.
 Range (min … max):  6.624 ms … 13.889 ms  ┊ GC (min … max): 0.00% … 0.00%
 Time  (median):     7.319 ms              ┊ GC (median):    0.00%
 Time  (mean ± σ):   7.755 ms ±  1.187 ms  ┊ GC (mean ± σ):  0.00% ± 0.00%

  ▂▃▃▄█▃
  ███████▇▇▅▇▅▆▆▇▅▆▅▃▃▄▃▃▃▂▂▃▃▄▃▃▂▃▃▃▃▃▂▃▂▁▃▂▂▂▂▃▂▂▂▂▃▁▁▂▁▂▃ ▃
  6.62 ms        Histogram: frequency by time        11.8 ms <

 Memory estimate: 10.59 KiB, allocs estimate: 94. 

You need to to that yourself somehow. If you have an ASCII string you could simply take the char value to index a Vector of size 255 for example

Yes that was my idea without Dict…I will implement indexing in the vector and show how it turned out here.
but I didn’t understand what you suggested next, could you clarify this idea:

Or if you know what range of chars are allowed then you can shift and shrink that range ofc.

For now, here are the functions I used:

function count_map(values)
    array1 = values
    chk = chunks(1:length(array1); n=Threads.nthreads(), split=:batch) 
    counts_total = Dict(ii  => 0 for ii in [1,2])

    tasks = map(chk) do inds
        Threads.@spawn begin
            sub_counts = Dict(ii  => 0 for ii in [1,2])
            for val in array1[inds]
                sub_counts[val] += 1
            end
            sub_counts
        end
    end
    thread_sums = fetch.(tasks)
    for sub in thread_sums
       merge!(+,counts_total,sub)
    end
    return counts_total
end

function count_loop(values)
    array1 = values
    chk = chunks(1:length(array1); n=Threads.nthreads(), split=:batch) 
    counts_total = Dict(ii  => 0 for ii in [1,2])

    tasks = Task[]
    for inds in chk
        task = Threads.@spawn begin
            sub_counts = Dict(ii  => 0 for ii in [1,2])
            for val in array1[inds]
                sub_counts[val] += 1
            end
            return sub_counts
        end
        push!(tasks, task)
    end

    thread_sums = fetch.(tasks)
    for sub in thread_sums
       merge!(+,counts_total,sub)
    end
    counts_total
end

thanks

Dear @eldee
Thank you very much for the tips and implementations, I will be able to test it soon on a computer where I can allocate ± 50 threads and I will show the result here.
thank you
my best

@abraemer I made the changes you suggested.
@eldee I implemented your adjustments along with the suggestions above.

Performance is now much better after your suggestions. Thank you very much!
Here is the updated code and benchmark.

function count_loop2(values, vec_reference)
    chk = chunks(1:length(values); n=Threads.nthreads(), split=:batch) 
    counts_total = repeat([0],length(vec_reference))
    
    #indexing references values
    vec_reference_ids = findall(x->x==x, vec_reference)
    
    tasks = Vector{Task}(undef, length(chk))
    for (task_idx, inds) in enumerate(chk)
        tasks[task_idx] = Threads.@spawn begin
            sub_counts = repeat([0],length(vec_reference))
            for val in values[inds]
                cur_id = findall(x->x==val, vec_reference) #find value in reference index
                sub_counts[cur_id[1]] += 1
            end
            return sub_counts
        end
    end

    for task in tasks
       counts_total = counts_total + fetch(task)
    end

    return counts_total
end


array_test = repeat(["A","D"],1111111);
vec_ref = ["A", "B", "C", "D"];

@benchmark count_loop2(array_test, vec_ref)


BenchmarkTools.Trial: 26 samples with 1 evaluation.
 Range (min … max):   42.066 ms …    1.150 s  ┊ GC (min … max):  0.00% … 95.84%
 Time  (median):      44.384 ms               ┊ GC (median):     0.00%
 Time  (mean ± σ):   192.449 ms ± 357.426 ms  ┊ GC (mean ± σ):  76.82% ± 35.04%

  █                                                              
  █▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▅▁▁▅▁▁▁▁▁▁▅▁▁▁▅ ▁
  42.1 ms       Histogram: log(frequency) by time        1.15 s <

 Memory estimate: 356.08 MiB, allocs estimate: 6667172.

My best regards

Some comments:

This (and similarly below) should be zeros(Int, length(vec_reference)

I don’t think this does what you intend? Since x==x is always true (except for NaN I think) you this is equivalent to collect(1:length(vec_reference)). You don’t need this.

Here you should use findfirst(x->x==val, vec_reference) to just get first index (which is also the only index).

This make a copy of the slice. Use view:
for val in view(values, inds)

This can be made a bit more efficient using in-place operations (+ creates a new vector each time)

    for task in tasks
       counts_total .= counts_total .+ fetch(task)
    end
1 Like

FWIW, if you would want to write this with OhMyThreads.jl, this would be one (simple but not optimal) way to do it:

using OhMyThreads
using StaticArrays

function count_loop_omt(values, vec_reference)
    @tasks for v in values
        @set reducer = +
        @local subcounts = @MVector zeros(Int, length(vec_reference))

        subcounts .= 0
        subcounts[findfirst(==(v), vec_reference)] += 1
        subcounts
    end
end

And here are the timings on my laptop with 6 threads:

julia> @benchmark count_loop2($array_test, $vec_ref) samples=10 evals=3
BenchmarkTools.Trial: 9 samples with 3 evaluations.
 Range (min … max):  152.436 ms … 303.452 ms  ┊ GC (min … max):  0.00% … 67.66%
 Time  (median):     171.811 ms               ┊ GC (median):    52.73%
 Time  (mean ± σ):   188.230 ms ±  46.100 ms  ┊ GC (mean ± σ):  49.81% ± 18.78%

  ▁▁   ▁ █  ▁  ▁       ▁                                      ▁  
  ██▁▁▁█▁█▁▁█▁▁█▁▁▁▁▁▁▁█▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁█ ▁
  152 ms           Histogram: frequency by time          303 ms <

 Memory estimate: 356.04 MiB, allocs estimate: 6666732.

julia> @benchmark count_loop_omt($array_test, $vec_ref) samples=10 evals=3
BenchmarkTools.Trial: 10 samples with 3 evaluations.
 Range (min … max):  17.061 ms … 101.318 ms  ┊ GC (min … max):  0.00% … 82.55%
 Time  (median):     39.521 ms               ┊ GC (median):    52.99%
 Time  (mean ± σ):   43.200 ms ±  23.551 ms  ┊ GC (mean ± σ):  57.78% ± 26.70%

  █             █▁▁▁▁         ▁                              ▁  
  █▁▁▁▁▁▁▁▁▁▁▁▁▁█████▁▁▁▁▁▁▁▁▁█▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁█ ▁
  17.1 ms         Histogram: frequency by time          101 ms <

 Memory estimate: 101.74 MiB, allocs estimate: 2222464.

So about a 4-5x speedup over your version.

Or better yet, keep vec_reference sorted, and use searchsortedfirst.

1 Like

A (faster) manual version would be:

function count_parallel(values, vec_reference)
    tasks = map(chunks(values; n=Threads.nthreads())) do idcs
        Threads.@spawn begin
            subcounts = zeros(Int, length(vec_reference))
            for val in @view(values[idcs])
                subcounts[findfirst(==(val), vec_reference)] += 1
            end
            return subcounts
        end
    end
    return sum(fetch, tasks)
end

Here, we’ve dramatically cut down the allocations, resulting in better performance (and less variance):

BenchmarkTools.Trial: 10 samples with 3 evaluations.
 Range (min … max):   9.515 ms …  10.331 ms  ┊ GC (min … max): 0.00% … 0.00%
 Time  (median):     10.070 ms               ┊ GC (median):    0.00%
 Time  (mean ± σ):    9.985 ms ± 277.191 μs  ┊ GC (mean ± σ):  0.00% ± 0.00%

  ▁            ▁    ▁▁                    █  ▁       ▁     ▁ ▁  
  █▁▁▁▁▁▁▁▁▁▁▁▁█▁▁▁▁██▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁█▁▁█▁▁▁▁▁▁▁█▁▁▁▁▁█▁█ ▁
  9.52 ms         Histogram: frequency by time         10.3 ms <

 Memory estimate: 3.84 KiB, allocs estimate: 42.

That’s ~15x faster than your version and 2-4x faster than the OMT version above.

Note that whether you use map or for doesn’t matter here. You can also define

function count_parallel_for(values, vec_reference)
    tasks = Vector{Task}(undef, Threads.nthreads())
    for (t, idcs) in enumerate(chunks(values; n=Threads.nthreads()))
        tasks[t] = Threads.@spawn begin
            subcounts = zeros(Int, length(vec_reference))
            for val in @view(values[idcs])
                subcounts[findfirst(==(val), vec_reference)] += 1
            end
            return subcounts
        end
    end
    return sum(fetch, tasks)
end

which is, arguably, more verbose but has virtually identical performance:

BenchmarkTools.Trial: 10 samples with 3 evaluations.
 Range (min … max):   9.656 ms …  10.938 ms  ┊ GC (min … max): 0.00% … 0.00%
 Time  (median):     10.225 ms               ┊ GC (median):    0.00%
 Time  (mean ± σ):   10.303 ms ± 351.046 μs  ┊ GC (mean ± σ):  0.00% ± 0.00%

  █                  █   █ ███     █        █  █             █  
  █▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁█▁▁▁█▁███▁▁▁▁▁█▁▁▁▁▁▁▁▁█▁▁█▁▁▁▁▁▁▁▁▁▁▁▁▁█ ▁
  9.66 ms         Histogram: frequency by time         10.9 ms <

 Memory estimate: 3.84 KiB, allocs estimate: 42.
1 Like

You are right in your observations, thank you!

Hi @carstenbauer thanks for the didactics and implementations.
It was very interesting how it turned out on OhMyThreads.jl.
Thanks for the examples created, it was clear.

Unfortunately, we had problems with the “for” function when the values ​​vector has a size smaller than the reference. However, this error does not occur in the version with map. See:

values = ["A","B","B"] 
vec_reference = ["A", "B", "C", "D"]
count_parallel_for(values, vec_reference)

UndefRefError: access to undefined reference

Stacktrace:
 [1] getindex
   @ .\essentials.jl:13 [inlined]
 [2] _mapreduce(f::typeof(fetch), op::typeof(Base.add_sum), ::IndexLinear, A::Vector{Task})
   @ Base .\reduce.jl:442
 [3] _mapreduce_dim
   @ .\reducedim.jl:365 [inlined]
 [4] mapreduce
   @ .\reducedim.jl:357 [inlined]
 [5] _sum
   @ .\reducedim.jl:1015 [inlined]
 [6] sum
   @ .\reducedim.jl:1011 [inlined]
 [7] count_parallel_for(values::Vector{String}, vec_reference::Vector{String})
   @ Main .\In[7]:12
 [8] top-level scope
   @ In[59]:4

Sure, but that’s trivial to fix: the length of the tasks vector should be the length of chunks(values; n=Threads.nthreads()).

However, consider this to be one of the multiple reasons why I would use the map variant.

(Why are you trying to avoid map so much?)

Understood.
No special reason. I’m just learning how to use Julia, so I use these cases to delve deeper and understand the differences and then compare benchmarks.
Your help was very enlightening, thank you very much!