@sync for taskid in 1:ntasks
Threads.@spawn f(taskid)
end
behaves much more nicely than threadid because it ensures each task has a unique taskid. It’s just a plain local variable inside the task. So it won’t change and is not shared with other tasks.
Also, I think determining performance fluctuation would be very hard when there’s any allocation (even if it’s a very tiny one). Calling GC.gc before benchmark doesn’t help because when the GC tries to stop world with respect to other task is non-deterministic.
not mentioning, but this is shared with other users;
“Dedicated vCPU Instances: Maximum performance with dedicated vCPUs. Ideal for CPU intensive applications like highly frequented web servers, video encoding, machine learning or research calculations.”
2.)
My second idea is the NUMA increased synchronization costs.
I see, but probably there is a missunderstanding. In my case I have an output which has nthreads() copies of the data, not ntasks copies, for threading.
This is a simplified version of your suggestion above:
output_threaded = [ deepcopy(output) for _ 1:nthreads() ]
@sync for thread_id in 1:nthreads()
Threads.@spawn for i in 1:ntasks
output_threaded[thread_id] = inner_loop!(...)
end
end
This is wrong, isn’t it? I can have two threads writing to the same output_threaded position in this case, wright?
It seems to me that I would need something like:
output_threaded = [ deepcopy(output) for _ 1:nthreads() ]
@sync for i in 1:ntasks
Threads.@spawn begin
thread_id = threadid()
output_threaded[thread_id] = inner_loop!(...)
end
end
but I know that is is not a good pattern for the reasons you pointed above.
(@tkf the parallel code does not allocate. But I will separate it to make things clearer)
Thanks @ImreSamu and @johnh. Those are good advices. It is a private cluster, so I will try these suggestions.
Depending on the details of the machine, just setting JULIA_EXCLUSIVE=1 might do the job (i.e. pin julia threads to physical cpus). Otherwise, numactl and likwid-pin could be options, see this discussion for more information.
output_threaded = [ deepcopy(output) for _ 1:nthreads() ]
@sync for thread_id in 1:nthreads()
for i in 1:ntasks
Threads.@spawn
output_threaded[i] = inner_loop!(give_me_work_for_task(i))
end
end
julia> topology()
Machine (31.97 GB)
Package L#0 P#0 (15.97 GB)
NUMANode (15.97 GB)
L3 (20.0 MB)
L2 (256.0 kB) + L1 (32.0 kB) + Core L#0 P#0
PU L#0 P#0
PU L#1 P#16
L2 (256.0 kB) + L1 (32.0 kB) + Core L#1 P#1
PU L#2 P#1
PU L#3 P#17
L2 (256.0 kB) + L1 (32.0 kB) + Core L#2 P#2
PU L#4 P#2
PU L#5 P#18
L2 (256.0 kB) + L1 (32.0 kB) + Core L#3 P#3
PU L#6 P#3
PU L#7 P#19
L2 (256.0 kB) + L1 (32.0 kB) + Core L#4 P#4
PU L#8 P#4
PU L#9 P#20
L2 (256.0 kB) + L1 (32.0 kB) + Core L#5 P#5
PU L#10 P#5
PU L#11 P#21
L2 (256.0 kB) + L1 (32.0 kB) + Core L#6 P#6
PU L#12 P#6
PU L#13 P#22
L2 (256.0 kB) + L1 (32.0 kB) + Core L#7 P#7
PU L#14 P#7
PU L#15 P#23
Package L#1 P#1 (16.0 GB)
NUMANode (16.0 GB)
L3 (20.0 MB)
L2 (256.0 kB) + L1 (32.0 kB) + Core L#8 P#0
PU L#16 P#8
PU L#17 P#24
L2 (256.0 kB) + L1 (32.0 kB) + Core L#9 P#1
PU L#18 P#9
PU L#19 P#25
L2 (256.0 kB) + L1 (32.0 kB) + Core L#10 P#2
PU L#20 P#10
PU L#21 P#26
L2 (256.0 kB) + L1 (32.0 kB) + Core L#11 P#3
PU L#22 P#11
PU L#23 P#27
L2 (256.0 kB) + L1 (32.0 kB) + Core L#12 P#4
PU L#24 P#12
PU L#25 P#28
L2 (256.0 kB) + L1 (32.0 kB) + Core L#13 P#5
PU L#26 P#13
PU L#27 P#29
L2 (256.0 kB) + L1 (32.0 kB) + Core L#14 P#6
PU L#28 P#14
PU L#29 P#30
L2 (256.0 kB) + L1 (32.0 kB) + Core L#15 P#7
PU L#30 P#15
PU L#31 P#31
It seems that in this case it would be reasonable to choose threadas from 0 to 7, which would all share the same memory (I guess this is good in this case, since the threads have to access shared arrays of data). Makes sense?
The point of this pattern is to @spawn off one task per available thread, but to give the task a STATIC identifier that doesn’t change as the scheduler moves the task from thread to thread, thereby giving it a fixed place in memory to store its private computational info.
Yes, I would think so. As you can see, and as is typically the case, the P numbering of the logical processing units (PU) is so that the first 0-N are in different cores and N+1:2N are the “hyperthread units”. So, as I said above, often you should get away with just setting JULIA_EXCLUSIVE=1 which, IIRC, will pin N Julia threads to the the first N cores / processing units.
(Note that I’m writing this down from the top of my head, so it might be good to check again )
Uhm… admittedly I don’t understand how those patterns are delivering that, and how that can avoid two tasks to write over the same output concurrently. I’ll have to think about that more deeply…
When a task is spawned it’s executing the code for the particular value that taskid had at the particular point in the outer loop when the spawn occurred… that number, let’s call it 3, is just a constant number that it “owns” (because every other task was spawned with a different value of taskid, like 1,2). So it can be an identifier for where to put its private stuff (in a big array that you preallocate for example).
whereas, Threads.threadid() returns the number of the OS thread that the task is currently running on, and can change as the scheduler moves the task between threads, the “taskid” is just a number that never changes.
I still do not see how that can work in that case (I don’t understand why you say that ntasks=nthreads(). This is a MWE of the pattern I’m using:
out = [ 0., 0. ] # potentially a very large array
out_threaded = [ deepcopy(out) for _ in 1:Threads.nthreads() ]
splitter(thread_id,n) = thread_id:Threads.nthreads():n
n = 1000 # number of tasks
Threads.@threads for thread_id in 1:Threads.nthreads()
for task in splitter(thread_id,n)
for i in eachindex(out_threaded[thread_id])
out_threaded[thread_id][i] += 1.0 # expensive stuff
end
end
end
# reduction
out .= out_threaded[1]
for i in 2:Threads.nthreads()
out .= out .+ out_threaded[i]
end
How do you suggest that to be performed using spawn?
I’m finding that really complicated to think about. I really like the channel based methods. why not do:
chin = Channel(nthreads)
chout = Channel(nthreads)
for i in 1:nthreads
@spawn dowork(chin,chout)
end
for work in 1:nworks
put!(chin, describe_some_work(i))
end
for work in 1:nworks
results = take!(chout)
workres[i] = results ## or have the results describe which "i" to use
end
close(chin);
close(chout);
along those lines. basically “dowork” runs a loop that reads some work to be done from chin, does the work, and pushes the result to chout, on an exception in take! it returns.
This is super easy to think about, and results in full speed even if some work takes a lot longer than other work.
Thanks again. I will try to reason on that lines to see if it fits.
The critical point in my case is that I have nthreads() copies of a possibly large output array, and I want each thread to update one of the copies to avoid concurrency. At the end I reduce the copies of the arrays to the actual output array using some reduction function.
Thus, for instance, describe_some_work(1) could write to output_threaded[1], but describe_some_work(15) potentially has to, as well, write to output_threaded[1] . I do not see in that pattern how do you avoid these two calls to describe_some_work to be executed simultaneously.
I cannot pass a thread-safe output to each work, because these are potentially large arrays, and I can only support copying them a few times. Using locks I think would be much worse for performance.
I’ve updated the example above to make it slightly more similar to the real case.
It’s helpful to hear more about your problem. I really don’t understand the reason why you have multiple copies of an output array. In the case where you are doing something like my channels based method, the idea is that each thread gets its chunk of stuff to do… and then puts the results to the output channel. It’s the responsibility of the “main” thread to read those outputs from the output channel and put them in their right place.
imagine dowork is like:
function dowork(chin, chout)
while true
try
(thingid,nextthing) = take!(chin)
catch e ## when the input channel is closed this will occur, there's no more work
return nothing
end
result = calc_some_stuff(nextthing)
put!(chout, (thingid, result))
end
end
So now, instead of the “spawned” task using a big array to put its output into, it just puts it into the channel.
when it comes to what to do with that output, the main task needs to “do the right thing” but it’s the only task that handles the output array, so there’s no concurrency there.
I cannot know in advance from the task number, to which position of the output array that work will have to write to. For instance, if the calc_some_stuff function was:
function calc_some_stuff(output_array)
i = rand(1:length(output_array))
output_array[i] += 1
end
How can that be thread safe relative to the output_array without using locks or copying the array for each thread?
Ok, this is more understandable I guess. But again, the channels method will avoid both having multiple copies, and simultaneous access. The output is put to the output channel, and the “collector thread” decides where to put the results in the single copy of the output.
I will study that possibility. But I’m not confident that I can use that approach. The “output” of the function is the modification of the output array (I’m computing the forces, for example, in a particle simulation). To separate the “computation” from the updating of the array, I would need to store some intermediate array for the computation, to accumulate the forces computed by that task (which does not compute a single component of the force, but many), to then add them to the final array. That would use less memory, but would require a more complicated program structure.
(also the function to be computed in my case is a generic function which is provided by the user in the form of a closure, such that I don’t really control what the function does. I offer an interface that allows the user to provide different types of inputs and output variables, and define custom reduction functions if that is required. The “copy everything” approach allows that flexibility somewhat easily).