Strategies for parallelization of fast non-uniform tasks

Which are the recommended strategies/packages to deal with the parallelization of many (thousands of) tasks that are:

  1. Individually relatively fast (costs associated to spawning may not be negligible).
  2. Possibly somewhat heterogeneous.
  3. Possibly have to mutate a large array in arbitrary positions.

Probably the possibilities are different depending on what the output is. There are two classes of problems in my case:

  1. output is a scalar, and is conceivable to just compute (and even store) it ntasks times and reduce the result at the end (i. e. the total energy of a system of particles).

  2. output is a large array, which can be mutated in any position by any of the tasks. The array is large, such that it cannot be copied ntasks (thousands of) times (i. e. the forces acting on each particle of a system). In this case, I have always opted to create nthreads() copies of the output, and let the tasks on each thread mutate one of the copies. Here locks start to be possibility, but since the tasks may be fast, locking is prohibitive.

The alternatives I have considered so far are:

a) Run nthreads() tasks, and randomly distribute the work within these tasks.

  • Good: cost of spawning is low.
  • Bad: If one of the threads gets a larger workload, it ends up alone.

Basic syntax:

@threads for it in 1:nthreads()
    for itask in random_task_splitter(ntasks,nthreads)
        output[it] = ... work on one task
    end
end

This is what I’m doing now, but I notice that I have a performance tail associated with the heterogeneity of what each thread gets that is not negligible.

b) Run ntasks tasks with @threads:

  • Good: nothing really (because the tasks are assigned to each thread statically).
  • Bad: cost of spawning many tasks.
  • Bad: requires using threadid() to identify the output where the task must write, which is discouraged (by @tkf).

Basic syntax:

@threads for itask in 1:ntasks
    it = threadid()
    output[it] = ... work on one task
end

c) The above, but using ThreadPools.@qthreads:

  • Good: good distribution of the workload.
  • Bad: the cost of spawning is high, and if the tasks are fast, this becomes slow compared to (a).
  • Bad: also requires identifying the output by threadid() .

Basic syntax:

ThreadPools.@qthreads for itask in 1:ntasks
    it = threadid()
    output[it] = ... work on one task
end

I have some good results with this one if the tasks become more expensive, but for faster tasks it slows down everything relative to (a) as well.

Any insight is appreciated.

Have you looked at the @tkf -verse? FoldsThreads.jl implements work-stealing for load balancing, and overhead seems to be pretty negligible.

julia> using BenchmarkTools, FoldsThreads, FLoops

julia> function f(n, exec=WorkStealingEx())
           @floop exec for i in 1:n
               @reduce(s += rand(1:2))
           end
           s
       end
f (generic function with 2 methods)

julia> @belapsed f(10^9)
1.487454
1 Like

Thanks for that specific tip. I did look to his packages, and I am pretty sure that something there should be useful, but there are so many options that I got lost. I will try that one.

The problem I face (that I cannot see how the FLoops ecosystem can solve) is that in my case each task has to update an array in an unpredictable position (predicting that would require a lot of extra work). That is, it is fundamentally something like this:

function f_serial(in, out)
    for i in eachindex(out)
        j = rand(1:length(out)) # the mutation index is unknown
        @inbounds out[j] += in[i]
    end
    return out
end

That is why I have to copy the out vector nthreads() times, and only allow mutating each of the copies by each of the threads. Within that pattern, I must guarantee that no two threads write to the same copy of the out vector. The pattern I use is the (a), above:

function f_threads(in, out)
    out_threaded = [ copy(out) for i in 1:nthreads() ]
    @threads for it in 1:nthreads()
        for i in it:nthreads():length(out) # simple splitter
            j = unpredictable_index(i,length(out))
            out_threaded[it][j] += in[i]
        end
    end
    # reduce
    out .= out_threaded[1]
    for i in 2:nthreads()
        out .+= out_threaded[i]
    end
    return out
end

This actually works relatively well (even here where the work on each task is very small):

julia> f_serial(in,copy(out)) ≈ f_threads(in,copy(out))
true

julia> @btime f_serial($in,$(copy(out)));
  3.700 ms (0 allocations: 0 bytes)

julia> @btime f_threads($in,$(copy(out)));
  1.653 ms (58 allocations: 6.11 MiB)

But, in real scenarios, I am observing that spawning one large task per thread is causing bad balancing of the workload. To migrate a task from one workload to the other, the task must also change the index of the output vector in output_threaded that it will modify, such that two independent tasks working simultaneously do not write to the same copy of the output.

The pattern above has the additional cost of creating nthreads() copies of the output array. While on CPUs this is acceptable (because nthreads() is relatively small, i. e. 2-64), I think that I cannot use the same strategy when porting this to a GPU.

The code of the example above, using a fake unpredictable index generator, is here:

Code
using Test
import Base.Threads: @threads, nthreads, threadid

# Generate an "unpredictable" index to mutate within 1:n, but
# deterministically, for testing
unpredictable_index(i,n) = max(1, floor(Int,n*(1+sin(2000*i))/2))

function f_serial(in, out)
    for i in eachindex(out)
        j = unpredictable_index(i,length(out))
        @inbounds out[j] += in[i]
    end
    return out
end

function f_threads(in, out)
    out_threaded = [ copy(out) for i in 1:nthreads() ]
    @threads for it in 1:nthreads()
        for i in it:nthreads():length(out) # simple splitter
            j = unpredictable_index(i,length(out))
            out_threaded[it][j] += in[i]
        end
    end
    # reduce
    out .= out_threaded[1]
    for i in 2:nthreads()
        out .+= out_threaded[i]
    end
    return out
end

in = rand(100_000);
out = zeros(100_000);

@test f_serial(in,copy(out))  ≈ f_threads(in,copy(out))


edit: I think that this problem might be another problem, which may require some other ideas. I will post it separately.

Hi Leandro,

I’ve played a bit with that MWE. First observation: f_threads scales better on my 6-core machine for larger problem sizes. For 100_000_000 I see

  21.058 s (0 allocations: 0 bytes)
  5.139 s (45 allocations: 4.47 GiB)

Secondly, I tried to use atomics like

function f_threads2(in, out)
    out_atomic = [Threads.Atomic{typeof(o)}(o) for o in out]
    @threads for it in 1:nthreads()
        for i in it:nthreads():length(out) # simple splitter
            j = unpredictable_index(i,length(out_atomic))
            Threads.atomic_add!(out_atomic[j], in[i])
        end
    end
    return [o[] for o in out_atomic]
end

which unfortunately on my machine resulted in

7.861 s (100000036 allocations: 2.98 GiB)

I could imagine this to be an alternative approach depending on problem size and number of cores?

Edit: due to different results when double-checking…

1 Like

Thanks for the input. The scaling of this strategy is not bad if the tasks result to be uniform. The problem is that there we are launching only nthreads() threads, thus if one of them gets a higher workload, the total time will be bounded by that the work on that thread.

The strategy of using atomics could help if the the tasks were, each, long. When they are fast the cost of locking/unlocking the access to the variables does not pay off (the MWE is a lower limit on how fast they can be, but what you observed there I see in my examples as well).

What is worse, the non-uniformity of the workload (or, really, of the time to finish each workload) is not related only to the real work that each thread must perform (I am just realizing that now). For instance, if one of the threads is assigned to a virtual core, it may take longer. There are workarounds for this (like setting JULIA_EXCLUSIVE=1 and trying to bind each thread to a real core. This makes the computing resources available to each thread more uniform and the problem is further reduced.

But, ideally, one would like the threading scheme to just use the available resources as much as possible, if other threads have finished. That is what I’m trying to pursue. But, apparently, the strategies to balance the workload introduce a cost that does not pay off if the individual tasks are fast and if the the problem is not too heterogeneous.

Just for completeness sake. The distributed version using a shared array

function f_threads3(in, out)
    out_shared = SharedVector{Float64}(out)
    @sync @distributed for i in eachindex(out)
        j = unpredictable_index(i,length(out_shared))
        out_shared[j] += in[i]
    end
    return sdata(out_shared)
end

shows

  7.170 s (3244 allocations: 143.14 KiB)
1 Like

A useful threading pattern is to spawn N tasks and then send them work to do over a channel. Have them return the results over another channel. If the tasks are extremely short lived you are likely splitting the work at the wrong granularity.

In pseudo code because I’m not at a keyboard

workch = Channel()
reach = Channel()
for I in 1:nn
dowork(workch,resch)
end

coll = @spawn collector(resch)

for work in worklist
put(workch,work)
end

wait(coll)
2 Likes

I am trying to use channels, but I’m struggling with the limitation that I can only create a limited number of copies of the output array.

There are two limiting levels of granularity in the problem. One is the number of works (the “natural” loop to be threaded). These works are somewhat fast and I agree that spawning one different task for each of these works is not ideal.

The other limit is the number of copies of the output array. I cannot reliability make much more copies than the number of threads, risking overflowing the memory.

The pattern there that launches nthreads tasks works on the lowest limit of granularity, and has the workload issues. (Pattern (a)).

To increase the granularity and spawn more tasks, I either have to increase the number of copies of the output array to avoid writing concurrency, or keep track and lock the concurrent access of the tasks on the same output arrays. This last one is what I cannot find a way to do with an acceptable cost.

  1. Don’t spawn a task for each unit of work. Spawn a few worker tasks. Each reads work from the channel. It has a nice side effect of load balancing.
  2. Each worker writes the result (and its location) to the resch. The collector updates the output array.

An advantage of this approach is that no locking is required.

To be sincere, I cannot figure out how to structure that from the manual of channels. My attempt for now is something like:

function f_channel(input, output)
    channels = [ Channel{typeof(output)}(1) for _ in 1:nthreads() ]
    ntasks = length(output)
    @sync for it in 1:nthreads()
        put!(channels[it],deepcopy(output))
        for i in it:nthreads():ntasks
            @async begin
                j = unpredictable_index(i,length(output))
                out = take!(channels[it])
                @inbounds out[j] += input[i]
                put!(channels[it],out)
            end
        end
    end
    # reduce
    output .= take!(channels[1])
    for it in 2:nthreads()
        output .+= take!(channels[it])
    end
    return output
end

but clearly this is not what you all are suggesting (and it is slow).

If anyone has some didactic example somewhere of that pattern, I will really appreciate it.

The following is a little toy I mocked up once. In your case, the consumer would write to the output array. There may be more idiomatic or concise ways to code this.

nthread = Threads.nthreads()  # Set via env var JULIA_NUM_THREADS.
nworkers = nthread-1  # 1 for producer and consumer and main thread to share.

println("main thread ",Threads.threadid())

# Producer, spawn=true consumes a thread, false to share current thread.
c1 = Channel{Int}(2*(nworkers);spawn=false) do ch
	println("producer on thread ",Threads.threadid())
	for i=1:30
		println("sending ",i)
		put!(ch,i)
	end
	println("done sending")
end


# Consumer, spawn=true consumes a thread, false to share current thread.
consumertaskref = Ref{Task}()
c2 = Channel{Float64}(nworkers;taskref=consumertaskref,spawn=false) do ch
	println("consumer on thread ",Threads.threadid())
	for v in ch
		println("got ",v)
	end
	println("got all")
end

# CPU bound
function cpuworker(i,pch,cch)
	println("worker ",i," working on thread ",Threads.threadid())
	println("wid ",i," ",typeof(pch)," ",typeof(cch))
	for w in pch
		r = sum([sin(w*sin(j)) for j in 1:50_000_000])
	end
	println("cpuworker ",i," complete")
end

# Non-cpu bound
function sleepworker(i,pch,cch)
	println("worker ",i," working on thread ",Threads.threadid())
	println("wid ",i," ",typeof(pch)," ",typeof(cch))
	for w in pch
		r = w
		sleep(1)
		put!(cch,r)
	end
	println("sleepworker ",i," complete")
end

@sync for i=1:nworkers
	# Use Threads.@spawn for parallelism of CPU-bound work
	Threads.@spawn cpuworker(i,c1,c2)

	# Schedule tasks for non-CPU-bound work.
	# @async sleepworker(i,c1,c2)
end
close(c2) # close consumer's channel when all workers finish

# Make sure consumer has consumed everything
wait(consumertaskref.x)
println("all complete")
2 Likes

I’d try to “fuse” multiple tasks together such that you’d do more than (say) a couple of hundreds of microseconds to milliseconds computation for each @spawn. I’d look for alternative solutions only if that’s impossible (e.g., there are only a few “intensive enough” work items within thousands of items and they tend to be clustered into small regions). Note that this automatically happens in JuliaFolds API and also you can specify basesize to control the granularity of load-balancing.

But, if this is unavoidable, maybe you can use (say) the producer-consumer pattern with DualLinkedConcurrentRingQueue or LinkedConcurrentRingQueue in GitHub - JuliaConcurrent/ConcurrentCollections.jl: Concurrent data structures for Julia. It’s much faster than Channel at the cost of a larger memory footprint and weaker set of API. Since you are going to explicitly spawn consumer tasks (typically nthreads tasks), you can manually bound allocated output arrays. See also: Concurrency patterns for controlled parallelisms

For combining output arrays in the consumer tasks without waiting for all tasks to finish, it may be helpful to use SyncBarriers.reduce_barrier.

1 Like