Huge performance fluctuations in parallel benchmark: insights?

Yes, the pattern

@sync for taskid in 1:ntasks
    Threads.@spawn f(taskid)
end

behaves much more nicely than threadid because it ensures each task has a unique taskid. It’s just a plain local variable inside the task. So it won’t change and is not shared with other tasks.

Also, I think determining performance fluctuation would be very hard when there’s any allocation (even if it’s a very tiny one). Calling GC.gc before benchmark doesn’t help because when the GC tries to stop world with respect to other task is non-deterministic.

3 Likes

1.)
Is this a Cloud or a “private server”?

For example, some Cloud providers selling shared vCPU.

in the Hetzner ( https://www.hetzner.com/cloud )
has a

  • “default” vCPU
    • not mentioning, but this is shared with other users;
  • Dedicated vCPU Instances: Maximum performance with dedicated vCPUs. Ideal for CPU intensive applications like highly frequented web servers, video encoding, machine learning or research calculations.”

2.)
My second idea is the NUMA increased synchronization costs.

NUMA node0 CPU(s):     0-7,16-23
NUMA node1 CPU(s):     8-15,24-31
1 Like

lscpu is a good tool to show your CPU types and cores
Even better use hwloc / lstopo

https://www.open-mpi.org/projects/hwloc/lstopo/
For linux yum install hwloc or apt-get install hwloc

If this is a physical machine which you have access to have you tried disabling hyperthreading in the BIOS?

This article is good too

I see, but probably there is a missunderstanding. In my case I have an output which has nthreads() copies of the data, not ntasks copies, for threading.

This is a simplified version of your suggestion above:

    output_threaded = [ deepcopy(output) for _ 1:nthreads() ]
    @sync for thread_id in 1:nthreads() 
        Threads.@spawn for i in 1:ntasks
            output_threaded[thread_id] = inner_loop!(...) 
        end
    end 

This is wrong, isn’t it? I can have two threads writing to the same output_threaded position in this case, wright?

It seems to me that I would need something like:

    output_threaded = [ deepcopy(output) for _ 1:nthreads() ]
    @sync for i in 1:ntasks
        Threads.@spawn begin
            thread_id = threadid()
            output_threaded[thread_id] = inner_loop!(...) 
        end
    end 

but I know that is is not a good pattern for the reasons you pointed above.

(@tkf the parallel code does not allocate. But I will separate it to make things clearer)

Thanks @ImreSamu and @johnh. Those are good advices. It is a private cluster, so I will try these suggestions.

Note that we have Hwloc.jl, which also downloads the library via Hwloc_jll.jl.

Depending on the details of the machine, just setting JULIA_EXCLUSIVE=1 might do the job (i.e. pin julia threads to physical cpus). Otherwise, numactl and likwid-pin could be options, see this discussion for more information.

5 Likes

no that’s not what he said. it’s more like:

    output_threaded = [ deepcopy(output) for _ 1:nthreads() ]
    @sync for thread_id in 1:nthreads()  
        for i in 1:ntasks
        Threads.@spawn
            output_threaded[i] = inner_loop!(give_me_work_for_task(i)) 
        end
    end 
1 Like

Yes, I got that. But that does not work in my case, because output_threaded is a vector of length nthreads(), not ntasks.

Interesting, this is what I get from topology():

julia> topology()
Machine (31.97 GB)
    Package L#0 P#0 (15.97 GB)
        NUMANode (15.97 GB)
        L3 (20.0 MB)
            L2 (256.0 kB) + L1 (32.0 kB) + Core L#0 P#0
                PU L#0 P#0
                PU L#1 P#16
            L2 (256.0 kB) + L1 (32.0 kB) + Core L#1 P#1
                PU L#2 P#1
                PU L#3 P#17
            L2 (256.0 kB) + L1 (32.0 kB) + Core L#2 P#2
                PU L#4 P#2
                PU L#5 P#18
            L2 (256.0 kB) + L1 (32.0 kB) + Core L#3 P#3
                PU L#6 P#3
                PU L#7 P#19
            L2 (256.0 kB) + L1 (32.0 kB) + Core L#4 P#4
                PU L#8 P#4
                PU L#9 P#20
            L2 (256.0 kB) + L1 (32.0 kB) + Core L#5 P#5
                PU L#10 P#5
                PU L#11 P#21
            L2 (256.0 kB) + L1 (32.0 kB) + Core L#6 P#6
                PU L#12 P#6
                PU L#13 P#22
            L2 (256.0 kB) + L1 (32.0 kB) + Core L#7 P#7
                PU L#14 P#7
                PU L#15 P#23
    Package L#1 P#1 (16.0 GB)
        NUMANode (16.0 GB)
        L3 (20.0 MB)
            L2 (256.0 kB) + L1 (32.0 kB) + Core L#8 P#0
                PU L#16 P#8
                PU L#17 P#24
            L2 (256.0 kB) + L1 (32.0 kB) + Core L#9 P#1
                PU L#18 P#9
                PU L#19 P#25
            L2 (256.0 kB) + L1 (32.0 kB) + Core L#10 P#2
                PU L#20 P#10
                PU L#21 P#26
            L2 (256.0 kB) + L1 (32.0 kB) + Core L#11 P#3
                PU L#22 P#11
                PU L#23 P#27
            L2 (256.0 kB) + L1 (32.0 kB) + Core L#12 P#4
                PU L#24 P#12
                PU L#25 P#28
            L2 (256.0 kB) + L1 (32.0 kB) + Core L#13 P#5
                PU L#26 P#13
                PU L#27 P#29
            L2 (256.0 kB) + L1 (32.0 kB) + Core L#14 P#6
                PU L#28 P#14
                PU L#29 P#30
            L2 (256.0 kB) + L1 (32.0 kB) + Core L#15 P#7
                PU L#30 P#15
                PU L#31 P#31

It seems that in this case it would be reasonable to choose threadas from 0 to 7, which would all share the same memory (I guess this is good in this case, since the threads have to access shared arrays of data). Makes sense?

but ntasks = nthreads

The point of this pattern is to @spawn off one task per available thread, but to give the task a STATIC identifier that doesn’t change as the scheduler moves the task from thread to thread, thereby giving it a fixed place in memory to store its private computational info.

Yes, I would think so. As you can see, and as is typically the case, the P numbering of the logical processing units (PU) is so that the first 0-N are in different cores and N+1:2N are the “hyperthread units”. So, as I said above, often you should get away with just setting JULIA_EXCLUSIVE=1 which, IIRC, will pin N Julia threads to the the first N cores / processing units.

(Note that I’m writing this down from the top of my head, so it might be good to check again :grinning_face_with_smiling_eyes:)

3 Likes

Uhm… admittedly I don’t understand how those patterns are delivering that, and how that can avoid two tasks to write over the same output concurrently. I’ll have to think about that more deeply…

When a task is spawned it’s executing the code for the particular value that taskid had at the particular point in the outer loop when the spawn occurred… that number, let’s call it 3, is just a constant number that it “owns” (because every other task was spawned with a different value of taskid, like 1,2). So it can be an identifier for where to put its private stuff (in a big array that you preallocate for example).

whereas, Threads.threadid() returns the number of the OS thread that the task is currently running on, and can change as the scheduler moves the task between threads, the “taskid” is just a number that never changes.

1 Like

I still do not see how that can work in that case (I don’t understand why you say that ntasks=nthreads(). This is a MWE of the pattern I’m using:

out = [ 0., 0. ] # potentially a very large array
out_threaded = [ deepcopy(out) for _ in 1:Threads.nthreads() ]

splitter(thread_id,n) = thread_id:Threads.nthreads():n

n = 1000 # number of tasks

Threads.@threads for thread_id in 1:Threads.nthreads()
    for task in splitter(thread_id,n)
        for i in eachindex(out_threaded[thread_id])
            out_threaded[thread_id][i] += 1.0 # expensive stuff
        end
    end
end

# reduction
out .= out_threaded[1]
for i in 2:Threads.nthreads()
    out .= out .+ out_threaded[i]
end

How do you suggest that to be performed using spawn?

I’m finding that really complicated to think about. I really like the channel based methods. why not do:

chin = Channel(nthreads)
chout = Channel(nthreads)

for i in 1:nthreads
    @spawn dowork(chin,chout)
end

for work in 1:nworks
   put!(chin, describe_some_work(i))
end

for work in 1:nworks
   results = take!(chout)
   workres[i] = results ## or have the results describe which "i" to use
end

close(chin);
close(chout);

along those lines. basically “dowork” runs a loop that reads some work to be done from chin, does the work, and pushes the result to chout, on an exception in take! it returns.

This is super easy to think about, and results in full speed even if some work takes a lot longer than other work.

1 Like

Thanks again. I will try to reason on that lines to see if it fits.

The critical point in my case is that I have nthreads() copies of a possibly large output array, and I want each thread to update one of the copies to avoid concurrency. At the end I reduce the copies of the arrays to the actual output array using some reduction function.

Thus, for instance, describe_some_work(1) could write to output_threaded[1], but describe_some_work(15) potentially has to, as well, write to output_threaded[1] . I do not see in that pattern how do you avoid these two calls to describe_some_work to be executed simultaneously.

I cannot pass a thread-safe output to each work, because these are potentially large arrays, and I can only support copying them a few times. Using locks I think would be much worse for performance.

I’ve updated the example above to make it slightly more similar to the real case.

It’s helpful to hear more about your problem. I really don’t understand the reason why you have multiple copies of an output array. In the case where you are doing something like my channels based method, the idea is that each thread gets its chunk of stuff to do… and then puts the results to the output channel. It’s the responsibility of the “main” thread to read those outputs from the output channel and put them in their right place.

imagine dowork is like:


function dowork(chin, chout)
   while true
      try 
          (thingid,nextthing) = take!(chin)
      catch e ## when the input channel is closed this will occur, there's no more work
           return nothing
      end
      result = calc_some_stuff(nextthing)
      put!(chout, (thingid, result))
   end
end

So now, instead of the “spawned” task using a big array to put its output into, it just puts it into the channel.

when it comes to what to do with that output, the main task needs to “do the right thing” but it’s the only task that handles the output array, so there’s no concurrency there.

I cannot know in advance from the task number, to which position of the output array that work will have to write to. For instance, if the calc_some_stuff function was:

function calc_some_stuff(output_array) 
    i = rand(1:length(output_array))
    output_array[i] += 1
end

How can that be thread safe relative to the output_array without using locks or copying the array for each thread?

Ok, this is more understandable I guess. But again, the channels method will avoid both having multiple copies, and simultaneous access. The output is put to the output channel, and the “collector thread” decides where to put the results in the single copy of the output.

1 Like

I will study that possibility. But I’m not confident that I can use that approach. The “output” of the function is the modification of the output array (I’m computing the forces, for example, in a particle simulation). To separate the “computation” from the updating of the array, I would need to store some intermediate array for the computation, to accumulate the forces computed by that task (which does not compute a single component of the force, but many), to then add them to the final array. That would use less memory, but would require a more complicated program structure.

(also the function to be computed in my case is a generic function which is provided by the user in the form of a closure, such that I don’t really control what the function does. I offer an interface that allows the user to provide different types of inputs and output variables, and define custom reduction functions if that is required. The “copy everything” approach allows that flexibility somewhat easily).