Huge performance fluctuations in parallel benchmark: insights?

When a task is spawned it’s executing the code for the particular value that taskid had at the particular point in the outer loop when the spawn occurred… that number, let’s call it 3, is just a constant number that it “owns” (because every other task was spawned with a different value of taskid, like 1,2). So it can be an identifier for where to put its private stuff (in a big array that you preallocate for example).

whereas, Threads.threadid() returns the number of the OS thread that the task is currently running on, and can change as the scheduler moves the task between threads, the “taskid” is just a number that never changes.

1 Like

I still do not see how that can work in that case (I don’t understand why you say that ntasks=nthreads(). This is a MWE of the pattern I’m using:

out = [ 0., 0. ] # potentially a very large array
out_threaded = [ deepcopy(out) for _ in 1:Threads.nthreads() ]

splitter(thread_id,n) = thread_id:Threads.nthreads():n

n = 1000 # number of tasks

Threads.@threads for thread_id in 1:Threads.nthreads()
    for task in splitter(thread_id,n)
        for i in eachindex(out_threaded[thread_id])
            out_threaded[thread_id][i] += 1.0 # expensive stuff
        end
    end
end

# reduction
out .= out_threaded[1]
for i in 2:Threads.nthreads()
    out .= out .+ out_threaded[i]
end

How do you suggest that to be performed using spawn?

I’m finding that really complicated to think about. I really like the channel based methods. why not do:

chin = Channel(nthreads)
chout = Channel(nthreads)

for i in 1:nthreads
    @spawn dowork(chin,chout)
end

for work in 1:nworks
   put!(chin, describe_some_work(i))
end

for work in 1:nworks
   results = take!(chout)
   workres[i] = results ## or have the results describe which "i" to use
end

close(chin);
close(chout);

along those lines. basically “dowork” runs a loop that reads some work to be done from chin, does the work, and pushes the result to chout, on an exception in take! it returns.

This is super easy to think about, and results in full speed even if some work takes a lot longer than other work.

1 Like

Thanks again. I will try to reason on that lines to see if it fits.

The critical point in my case is that I have nthreads() copies of a possibly large output array, and I want each thread to update one of the copies to avoid concurrency. At the end I reduce the copies of the arrays to the actual output array using some reduction function.

Thus, for instance, describe_some_work(1) could write to output_threaded[1], but describe_some_work(15) potentially has to, as well, write to output_threaded[1] . I do not see in that pattern how do you avoid these two calls to describe_some_work to be executed simultaneously.

I cannot pass a thread-safe output to each work, because these are potentially large arrays, and I can only support copying them a few times. Using locks I think would be much worse for performance.

I’ve updated the example above to make it slightly more similar to the real case.

It’s helpful to hear more about your problem. I really don’t understand the reason why you have multiple copies of an output array. In the case where you are doing something like my channels based method, the idea is that each thread gets its chunk of stuff to do… and then puts the results to the output channel. It’s the responsibility of the “main” thread to read those outputs from the output channel and put them in their right place.

imagine dowork is like:


function dowork(chin, chout)
   while true
      try 
          (thingid,nextthing) = take!(chin)
      catch e ## when the input channel is closed this will occur, there's no more work
           return nothing
      end
      result = calc_some_stuff(nextthing)
      put!(chout, (thingid, result))
   end
end

So now, instead of the “spawned” task using a big array to put its output into, it just puts it into the channel.

when it comes to what to do with that output, the main task needs to “do the right thing” but it’s the only task that handles the output array, so there’s no concurrency there.

I cannot know in advance from the task number, to which position of the output array that work will have to write to. For instance, if the calc_some_stuff function was:

function calc_some_stuff(output_array) 
    i = rand(1:length(output_array))
    output_array[i] += 1
end

How can that be thread safe relative to the output_array without using locks or copying the array for each thread?

Ok, this is more understandable I guess. But again, the channels method will avoid both having multiple copies, and simultaneous access. The output is put to the output channel, and the “collector thread” decides where to put the results in the single copy of the output.

1 Like

I will study that possibility. But I’m not confident that I can use that approach. The “output” of the function is the modification of the output array (I’m computing the forces, for example, in a particle simulation). To separate the “computation” from the updating of the array, I would need to store some intermediate array for the computation, to accumulate the forces computed by that task (which does not compute a single component of the force, but many), to then add them to the final array. That would use less memory, but would require a more complicated program structure.

(also the function to be computed in my case is a generic function which is provided by the user in the form of a closure, such that I don’t really control what the function does. I offer an interface that allows the user to provide different types of inputs and output variables, and define custom reduction functions if that is required. The “copy everything” approach allows that flexibility somewhat easily).

Ok, I see your point here. how about something like this:

cache = [deepcopy(forces) for i in 1:ntask]
chins = [Channel(1) for i in 1:ntask]
whoneedswork = Channel(ntask)

function dowork(myid, cache,whoneeds,myinput)
   while true
      put!(whoneeds,myid)
      nextwork = take!(myinput)
      calcresult!(cache,nextwork)
   end
end

for i in 1:ntask
    @spawn dowork(i,cache[i],whoneedswork,chins[i])
end

while true
    nextworker = take!(whoneedswork)
    mergeresults!(cache[nextworker],answer)
    put!(chins[nextworker],get_next_work())
end


EDIT: adjusted code, need to put a request for work before trying to take! work

1 Like

You could just use JuliaFolds :slight_smile:

2 Likes

Absolutely, @tkf, using something someone smarter than me implemented is my goal. Currently I have not done that because I could not exactly see how to implement the flexibility I need with one of these packages. But that is because I didn’t really had the time to dig into it. If you can point at least which package should I look into, I would be mostly thankful.

You can of course use @threads and/or @spawn directly for maximizing the flexibility. But it comes with the cost of understanding what they really are.

Another cons of directly using @threads and @spawn is that you are explicitly describing the scheduling policy. But the fluctuations observed in the OP could be due to some unfortunate interactions with the scheduler and your algorithm. If so, abstractions like JuliaFolds help since you can try other schedulers from FoldsThreads.jl.

Probably FLoops.jl fits the requirement here. I’ve been trying to add smooth tutorials like Efficient and safe approaches to mutation in data parallelism.

2 Likes

I already played with Floops (which for other situations was always very good). In this particular case I didn’t adopt it because I need to allow the user to provide its custom reduction function as a parameter of my function, something that at the time I didn’t figure out if was possible. But I will look again. Thanks!

FLoops.jl is a syntax sugar for parallel reduce. Since most of the data parallel operations can be lowered to a reduce, I don’t think there are major data parallel computations that cannot be written in FLoops.jl. (Some could be tedious or inefficient, of course.)

You can maybe just call the user-defined function inside the @reduce syntax.

1 Like

As far as I understand, I have 16 physical cores (32 virtual). I’m running with julia -t 8. This is what lspcu says […]:

Hi. Not an advice, rather a question. I am wondering, if there is 16 physical cores (8 cores per socket and 2 sockets as presented in lspcu output), shouldn’t julia be better run with -t 16 when one want to utilize only physical cores? I am also wondering what is the %CPUs value in case of -t 8 of Linux top command? I would expect it to be at about 25% so only half of physical cores being utilized. Am I right or is it totally different in case of dual socket configurations? Really curious as most of the machines I am currently using are dual sockets.

1 Like

Just to be clear, I’m not using -t 8 with the intention of using all physical cores. My goal in this benchmark was to use half of them (and I do get 800% CPU for a maximum of 3200% that this machine can display, so all as you say).

Thanks for the clarification. I was not sure if I was not missing something or by any chance not oversubscribing. I am relatively new to the topics of parallelization. Interesting thread.

@leandromartinez98

If someone has any will or reason to inspect this further, to reproduce the example, do: […]

I tried to replicate your test.

I run BenchmarkTools (@btime) 11 times each calculation for N=10_000:N=10_000_000 with GC.gc() and 31 times @time for N=2_000_000 also with GC.gc() on 1.7.0-beta3.0, libopenblas64_.so, BenchmarkTools v1.1.4 and CellListMap v0.5.18 with julia set as -t 112, -t 56, -t 28, -t 16 and JULIA_EXCLUSIVE=1 julia -t 56, -t 28, -t 16.

I just took a very quick look and it seems that there is some volatility and there are some differences (not sure if of that scale as presented by you) that may potentially make me change my understanding of the best way in which I should run julia in the future (tbc). However, I’d like to underline that it was just a quick look. Also noticed that JULIA_EXCLUSIVE=1 is changing default libopenblas64_.so settings.

Should it be to your interest, I am providing a link to zipped txt files [https://ibmb2.s3.eu-de.cloud-object-storage.appdomain.cloud/CellListMap.zip]. If you have any remarks wrt the “methodology”, please let me know. Thx for the package.

1 Like

Thanks! I still didn’t find a solution (or the cause), I am probably leaning to change the parallelization strategy at some point.

I had tested it in some other machine, but I don’t have access to that kind of number of processors. I will look carefully to your results. Let me know if you discover or suspect of anything.

Thanks!

Thanks! The package is very interesting.

[…] I am probably leaning to change the parallelization strategy at some point.

Are you planning to support GPUs? if so only CUDA or other manufacturers as well?

I had tested it in some other machine, but I don’t have access to that kind of number of processors.

I run it on a 2 sockets machine with quite similar proportions between the number of CPUs as presented in your example. I thought that it might be useful.

[…] I still didn’t find a solution (or the cause), […]

I am enclosing the code that I used to prepare the results. I did some reading on BenchmarkTools.jl, TimerOutputs.jl and @profile. I am wondering if it be useful to utilize more advanced options of those tools? Or maybe you have any suggestion on how to transfer @btime and @time outputs into a dataframe in order to prepare the charts? I will be happy to try to adjust the code or if you find it suitable and you decide to provide update with examples, to run the tests again.

I will look carefully to your results. Let me know if you discover or suspect of anything.

As for the software side, I do not have enough knowledge to analize it in detail, even less about compilers. On a more general level, one thing that came to my mind was OpenBLAS. When I was using other packages, particularly AlphaZero.jl, I spotted that sometimes the results are sensitive to particular BLAS library, the way julia is run with this library or the output depends on the “particular load” (tbc).

As for the hardware, I guess I can try to elaborate on the CPU design, L1, L2, L3 caches and some peculiarities of memory access and scheduling, however, I will risk it … :slight_smile: … maybe @tkf would be interested and willing to provide some insights? Seriously, I guess that it would be very interesting.

Again, I am very happy to do some additional tests and to prepare results within the limits of my knowledge should there be such a need.

Links:
https://ibmb2.s3.eu-de.cloud-object-storage.appdomain.cloud/CellListMap_t2.jl
https://ibmb2.s3.eu-de.cloud-object-storage.appdomain.cloud/CellListMap_launch_local_t2.sh

1 Like