Parallelizing a histogram

I am trying to parallelize a simple histogram:

n_bins =  20000000
n_elem = 100000000
hist = zeros(Int64, n_bins)

for i = 1:n_elem
   bin = rand(UInt64)%n_bins + 1
   hist[bin] += 1
end

In Fortran&OpenMP I would just use a reduction on hist or I could locks, but I am new to julia and I don’t quite understand how todo parallelism in julia and the documentation doesn’t have may examples.

In addition to this comes that the calculation time of the bins can strongly vary. It there an equivalent of OpenMPs schedule dynamic or schedule guided?

How would you best parallelize this code?

Your MWE is a bit tricky since rand is not thread safe. One approach, suggested in https://docs.julialang.org/en/v1/manual/parallel-computing/, is to use Future.randjump to produce non-overlapping random number sequences for different threads. There are a couple of ways you can parallelize this loop. First, use locks or atomics, another is to have each thread separately store their histogram, and then add them all together. Here’s the first, with atomics, which should be faster than with locks:

using Random, Base.Threads, Future
function parHistogram()
n_bins =  20000000
n_elem = 100000000
hist = [Atomic{Int64}(0) for k = 1:n_bins]
r = let m = MersenneTwister(1)
                [m; accumulate(Future.randjump, fill(big(10)^20, nthreads()-1), init=m)]
            end;
@threads for i = 1:n_elem
   bin = rand(r[threadid()],UInt64)%n_bins + 1
   atomic_add!(hist[bin],1)
end
hist
end

The trouble is, this is much slower than the sequential version, because there’s lots of contention for the updates and not very much work on each iteration. Here’s the second version:

using Random, Base.Threads
function parHistogram2()
n_bins =  20000000
n_elem = 100000000
hist_t = [zeros(Int64, n_bins) for k = 1:nthreads()]
r = let m = MersenneTwister(1)
                [m; accumulate(Future.randjump, fill(big(10)^20, nthreads()-1), init=m)]
   end;
@threads for i = 1:n_elem
   bin = rand(r[threadid()],UInt64)%n_bins + 1
   hist_t[threadid()][bin] += 1
end
hist = zeros(Int64, n_bins)
@threads for i = 1:n_bins
   for j = 1:nthreads()
      hist[i] += hist_t[j][i]
   end
end
hist
end

This version will get speedup, particularly as you increase the number of threads.

As for your second question, @threads uses a static blocked schedule. To get a dynamic schedule you will want to create tasks that can be allocated dynamically by a scheduler. You can do this using Base.Threads.@spawn, by turning your loop into a recursive function. I plan to post an example doing this within the next week or so.

1 Like

There is a parallelized histogram in SortingLab.jl. It’s used in the radixsort computation.

If you want to parallelize computation using multiple cores, then you have two options - either use the Distributed Processing and spawn worker processes, or use the Multi-threading.

In this particular example, you have a high number of iterations but the work for each iteration is miniscule. You won’t gain much performance by parallelizing such small tasks, in fact, just scheduling these small tasks will probably make it slower.

So, I hope this is not your real use case. But if it is, then you would be better off calling rand(UInt64, 100_000_000) to generate a block of random numbers instead of picking off one random number at a time. If you are constrained by memory then choose a block size and handle one block at a time.

I don’t think this is still true. Isn’t Julia automatically using thread-local RNGs when one isn’t explicitly passed?

3 Likes

If we had a definition for atomic_add on array elements, that might give us very efficient multi-threaded histogram filling.

For the RNG, it’s possible to use a counter-based RNG (GitHub - JuliaRandom/Random123.jl: Julia implementation of Random123.), that allows for fully parallel random number generation.

You mean like this?
Multi-Threading · The Julia Language!

EDIT:
Ah I see it doesn’t work on arrays yet:

Exactly - we do have Atomic, but we’ll need something like atomic_add!(A::Array, x, idxs)

Vote for it here: https://github.com/JuliaLang/julia/issues/32455 :slight_smile:

Is using lock or atomics really the best practice in OpenMP/Fortran? I’ve tried atomic add on array elements with llvmcall but the performance was horrible. In general, lock-based approach doesn’t seem to be a good fit if you want performance in data-parallel computations. I’d use divide-and-conquer approach instead.

2 Likes

hear hear

1 Like

OK, so I uploaded the code I used for trying out atomic increments on array elements: https://github.com/tkf/ParallelIncrements.jl

(Comment added: I just realized that the name of the package is completely misleading since it was about single thread performance. Initially, I was thinking about trying it with multiple threads.)

This package contains a function ref = atomicref(::Array, indices...) which can be used to for atomic operations like Threads.atomic_add!(ref, x).

As you can see in the README, the overhead is 10x in my laptop. It can go up to 100x and possibly more depending on the kind of optimization the compiler can do. It might be possible to get a lower overhead using other memory orderings but I haven’t tried it yet.

I’m not a big fan in this direction but I’m interested in whatever beneficial for HPC in Julia. So, I’m just sharing it just in case some people want to try it out (and maybe find some performance bugs I made in the sample code or something).

2 Likes

you can make N histograms and add each bins together in the end.

2 Likes

I feel this got a little of track. The I used the RNG for my minimal example, in my real application I don’t need an RNG. I just wanted to see how to parallelize this.

But the parallelization scheme are very useful. Thanks

Yes, that’s true. I wanted to set the seed so that the sequence of pseudo random numbers was reproducible.

Sorry for bumping an old thread. But since it’s pretty narrow (histogram) I didn’t want to open a new one.

thanks for the benchmark pkg and I read through https://github.com/JuliaLang/julia/issues/32455. I want to present a concrete case for a parallel histogram. The main loop like this:

hists = ...
for E in lazy_object_too_large
   a = do_work(E) # takes a little time
   begin
       unsafe_push!(hists[1], a[1])
       unsafe_push!(hists[2], a[2]) #can go up to O(100)
       ...
    end
end

While each push!() is fast, because it grows linearly as number of histogram, it quickly becomes comparable to the do_work(). Using lock based approach and marking the begin-block with @threads only made things worse; keep using unsafe push with @async also didn’t help help.

My guess is atomic array operation would help here but I might be wrong.

I recommend a data-parallel approach, which does not require any locks or atomics. If you can keep around nthreads() copies of histogram, it’s simple and reasonably efficient. For example:

using FLoops
using MicroCollections: SingletonDict

function simple_hist(input, bins; ex = nothing)
    @floop ex for x in input
        x < first(bins) && continue
        x > last(bins) && continue
        i = searchsortedfirst(bins, x)
        obs = SingletonDict(i => 1)
        @reduce() do (hist = fill!(similar(bins, Int), 0); obs)
            for (i, v) in pairs(obs)
                @inbounds hist[i] += v
            end
        end
    end
    return hist
end

A quick benchmark:

julia> xs = randn(2^20);

julia> @btime simple_hist(xs, -3:0.1:3; ex = SequentialEx());  # single-thread
  38.063 ms (2 allocations: 640 bytes)

julia> @btime simple_hist(xs, -3:0.1:3);  # multi-thread
  4.808 ms (75 allocations: 9.77 KiB)

julia> Threads.nthreads()
8

See Efficient and safe approaches to mutation in data parallelism for more techniques like this.

Of course, there are a lot of optimizations you can do if you are willing to eat the complexity of concurrent programming. My point here is that data-parallel is likely the best first step, not atomics- or lock-based approaches.

3 Likes

We’re aware of this [WIP] add threading by aminnj · Pull Request #21 · Moelf/FHist.jl · GitHub But as I have tried to hint:

stops us from doing easy data parallelism. Basically the real problem is the underlying file is huge and in some kind of segmented streamer format. If we’re using a HDD and/or some network mounted FS, reading from 20 different byte location just doesn’t work very well.

FoldsThreads.NondeterministicEx can be used for stream-style input and also for limiting the input data loaded in memory.

1 Like