How to speed up tasks?

i have a function that i need to call 10s of thousands of times that only takes a few milliseconds to run. there is no I/O. and it doesn’t return anything, so no communication. it is strictly just compute. how do i best parallelize it?

i first tried threads. evaluating a simply dummy function as many times as i have threads takes 5x longer than just evaluating it once:

julia> Threads.nthreads()

julia> foo(x) = rand(x,x,x)

julia> @time foo(100);   # showing here and below just the best of 3 runs
  0.006792 seconds (2 allocations: 7.629 MiB)

julia> @time Threads.@threads for _ in 1:Threads.nthreads()
  0.036396 seconds (29.44 k allocations: 184.669 MiB)

julia> 0.036396 / 0.006792

if i try to scale it up, there is still about 2x overhead:

julia> @time Threads.@threads for _ in 1:1000
  0.696078 seconds (31.39 k allocations: 7.452 GiB, 46.63% gc time)

julia> 0.696078 / 0.006792 / (1000/24)

i next tried processes, as ideally i’d do 20,000 evaluations simultaneously, and no machine has that many cores.

julia> using Distributed

julia> addprocs();   # 24 hyperthreaded workers on my machine

julia> @everywhere foo(x) = rand(x,x,x)

evaluating the same dummy function just once on a local process takes 2x as long:

julia> @time wait(@spawnat 2 foo(100));
  0.013471 seconds (175 allocations: 8.188 KiB)

julia> 0.013471 / 0.006792

evaluating it as many times as i have workers takes 4x as long:

julia> @time @sync for p in workers()
           @spawnat p foo(100)
  0.027544 seconds (3.55 k allocations: 168.047 KiB)

julia> 0.027544 / 0.006792

can’t do any better when using multi-threaded tasks to coordinate the processes:

julia> @time begin
           ts = [Threads.@spawn remotecall_wait(foo, p, 100) for p in workers()]
           for t in ts; wait(t); end
  0.079460 seconds (49.64 k allocations: 2.571 MiB)

julia> @time Threads.@threads for p in workers()
           remotecall_wait(foo, p, 100)
  0.039949 seconds (19.57 k allocations: 1.013 MiB)

ideally i’d liked to spawn 100s of workers on my cluster:

julia> using ClusterManagers

julia> addprocs_lsf(100; ssh_cmd=`ssh login1`, throttle=10)

julia> @everywhere foo(x) = rand(x,x,x)

julia> nworkers()

the time to evaluate once on a remote process is faster by 2x than doing so on a local process:

julia> @time wait(@spawnat 125 foo(100));
  0.007510 seconds (176 allocations: 8.203 KiB)

julia> 0.013471 / 0.007510

still evaluating it as many times as i have workers takes 9x longer than just evaluating it once locally:

julia> @time @sync for p in workers()
           @spawnat p foo(100)
  0.059993 seconds (17.84 k allocations: 837.359 KiB)

julia> 0.059993 / 0.006792

scaling up to 1000 evaluations there is still 7x overhead:

julia> @time @sync for _ in 1:1000
           @spawnat :any foo(100)
  0.394211 seconds (375.43 k allocations: 13.295 MiB)

julia> 0.394211 / 0.006792 / (1000/124)

it seems as if there is a lot of overhead in the tasks managing the threads and processes. does anyone have any ideas as to how to do this more efficiently? thanks!

1 Like

I don’t understand: why do you need to run the function with the same input multiple times? For its sideeffects?

nope, you want that nthread() number to be <=12 sir because you’re CPU bound and over spawnning will only make things slower.

also, you’re benchmarking rand(x,x,x) mostly

julia> Threads.nthreads()
julia> big_ = rand(100,100,20);

julia> foo(x) = sum(sin.(x).^3)

julia> @time foo(big_);
  0.002271 seconds (3 allocations: 1.526 MiB)

julia> @time Threads.@threads for _ in 1:1000
  0.316662 seconds (21.09 k allocations: 1.491 GiB, 5.14% gc time)

julia> 1000* 0.002271/0.3166612
1 Like

There’s always going to be some communication overhead, if that is “a lot” depends on what you need to do in each loop iteration.

Also, I might have misunderstood but shouldn’t you care about the overall time it takes to run in parallel vs. serial?

using BenchmarkTools

foo(x) = rand(x,x,x)
function f_parallel(n)
    Threads.@threads for _ in 1:Threads.nthreads()
function f_serial(n)
    for _ in 1:Threads.nthreads()
@btime f_parallel(100);
3.422 ms (57 allocations: 61.04 MiB)
@btime f_serial(100);
11.499 ms (16 allocations: 61.04 MiB)

@btime f_parallel(1);
8.213 μs (49 allocations: 6.27 KiB)
@btime f_serial(1);
369.678 ns (8 allocations: 768 bytes)

Whether the overhead is “a lot” or not will depend on the specific problem: a problem where you have to do a lot of computations for each loop iteration will spend very little time communicating between workers/processes relative to the time each worker/process will spend in the loop (and therefore have “small” overhead costs). Viceversa, a problem with few very simple instructions for each loop iteration will spend much more time in communications than in computations (e.g. in the above f_parallel(100) is 3.3 times faster than f_serial(100), while f_parallel(1) is 20 times slower than f_serial(1).

1 Like

It appears that your problem is “trivially parallelizable”. In this case, you probably want to split the 10s of thousands calculations into nthreads groups and send to each thread 10s of thousands / nthreads calculations. Scalling will be almost perfect with that.

Something like this:

julia> Threads.nthreads()

julia> nset = collect(1:100_000_000);

julia> foo(i) = sqrt((i*(i-1))%(i+1)^2) # some function
foo (generic function with 1 method)

julia> function fparallel(nset) 
         nset_perthread = round(Int64,length(nset)/Threads.nthreads())
         Threads.@threads for i in 1:Threads.nthreads()
            jfirst = nset_perthread*(i-1)+1
            jlast = jfirst + nset_perthread - 1
            for j in jfirst:jlast
fparallel (generic function with 1 method)

julia> function fserial(nset) 
         for i in 1:length(nset)
fserial (generic function with 1 method)

julia> @btime fserial($nset)
  786.934 ms (0 allocations: 0 bytes)

julia> @btime fparallel($nset)
  245.395 ms (21 allocations: 2.98 KiB)

rand will access a global stream, no? It does not model a method that just takes data and process it. It seems like an unfortunate choice of “dummy” method.

1 Like

OK, so in your example the function is a dummy. I get it now. But still: what is the result of the computation if the function does not return anything?

thanks for all the suggestions guys. i ended up distributing the spawning of jobs to work around the compute taking about as long as a spawn. like this:

@everywhere function divideconquer(r)
    @sync begin
        if length(r)>2
            m = div(length(r)-1, 2)
            @spawnat r[2] divideconquer(r[2:m+1])
            @spawnat r[m+2] divideconquer(r[m+2:end])
        elseif length(r)>1
            @spawnat r[2] foo(100)

@fetchfrom workers()[1] divideconquer(workers());

I may be asking a question with an obvious answer (not to me, alas), but what do you wish to accomplish? If the foo function returns something, how exactly is that something combined from different processes to produce the final answer? Is this an embarrassingly parallel job?

1 Like