The @threads macro executes the loop body in an unspecified order and potentially concurrently. It does not specify the exact assignments of the tasks and the worker threads. The assignments can be different for each execution. The loop body code (including any code transitively called from it) must not make any assumptions about the distribution of iterations to tasks or the worker thread in which they are executed. The loop body for each iteration must be able to make forward progress independent of other iterations and be free from data races. As such, invalid synchronizations across iterations may deadlock while unsynchronized memory accesses may result in undefined behavior.
What I wonder is: can one make an assumption that when a function gets called within a loop body, the entire function body will be executed by single thread?
Good points, everyone. To make my concern concrete: Consider the function
th = Base.Threads.threadid()
... some code
a[th] = 1
... some code
th = Base.Threads.threadid()
return a[th] == 1 ? 5 * a[th] : 0
This function takes an argument which is supposed to be a thread-private
storage. Then use the function inside this threaded loop:
a = fill(0, Base.Threads.nthreads())
Base.Threads.@threads for th in 1:Base.Threads.nthreads()
b = f(a)
Please never mind that the code doesn’t make much sense.
My question is, is it possible that the th in the line a[th] = 1 is not the same th as in the return line?
Because if that is possible, then some other copy of this executing function
could refer to the “same” th, thereby breaking the private storage.
It’s pretty simple: With default options, neither @threads nor @spawn creates (thread-)sticky tasks. Hence, there are no guarantees whatsoever about the task-thread mapping. In particular, you must assume that tasks can migrate between threads at any point in time. If you want guarantees (i.e. sticky tasks), use @threads :static or @tspawnat from e.g. ThreadPools.jl or ThreadPinning.jl.
Note that, at this time, tasks might not migrate much in practice. But they can and you should assume that they do if you want to write stable code.
Quite generally, threadid() is a footgun. Why? Because, as I indicated in my previous post, the default macros create non-sticky tasks, so the return value of threadid() can vary over the course of executing a piece of code. Fundamentally, this is because Julia offers task-based multithreading. The central concept is a task, not a thread.
(In this context, @threads is somewhat of a misnomer IMO. I think @vchuravy has voiced the same at some point. Something like @parallel or @multithreaded would have probably been a better name.)
For the reasons mentioned above, unless you have a good reason (which you might or might not have), you should try to think in terms of task-local storage rather than thread-local storage. For this, things like Base.task_local_storage and ChunkSplitters.jl might be helpful.
It would have more or less the same potential pitfalls as the current threadid() (other than being presumably a constant per task); that is, making an array of a fixed size and indexing it with taskid() can have hidden dataraces if you accidentally (or on purpose) access an entry that isn’t “for” your task. Storing things that other tasks shouldn’t mess with in a task local storage, on the other hand, prevents this kind of data race semantically and forces explicit sharing of state across tasks, leading to a less error-prone architecture by design.
What you say makes sense. But, I do not have the option of copying things into a task-local storage: they are too big. I have to work with a global array. They way I understand it, once I give a task some data to process, no other task will ever interfere: meaning when I ask for a task id, it will always be the same. Isn’t that right?
Only insofar that hypothetical task id may stay the same. However, there can be other tasks being spawned as part of your algorithm, either internally or explicitly, that increase the highest task id. So there’s no equivalent to nthreads to use for sizing that global array you’re referencing. Internally, if you’ve already given the task all data it needs to compute its result, what would it need its own task id for? That number is not necessarily in 1:nthreads() after all.
I do not want to unnecessarily duplicate the data. It consumes memory, and a large chunk of it.
Each task should work by accessing only a little bit of the array, not the whole thing. In order for it to stay within the range it is supposed to see, I can use the chunk number. If I spawn as many tasks as there are chunks, I can identify the chunk by the task. Does that make sense?
Yes, it makes sense. This is more or less the way ChunkSplitters works and, as far as I’m aware, this approach is safe even when tasks can migrate.
In the example given in the ChunkSplitters README, Threads.@threads will spawn as many tasks as there are chunks, and ichunk identifies a chunk (and maybe this is nitpicking, but ichunk may not really be considered as a task identifier, because there might exist many other tasks that were not spawned by this Threads.@threads invocation)
function sum_parallel(f, x; nchunks=Threads.nthreads())
s = fill(zero(eltype(x)), nchunks)
Threads.@threads for ichunk in 1:nchunks
for i in chunks(x, ichunk, nchunks)
s[ichunk] += f(x[i])
I would second @PetrKryslUCSD : when working with large problems it appears to be prohibitive to copy all the data into task local storage. Instead, one would partition the index sets and take care about avoiding write conflicts by proper scheduling of non-conflicting accesses (e.g. by coloring the partition graph and having several multithreaded loops over each of the colors).
I am still puzzled by threadid() as a footgun. How and why is this different from omp_get_thread_num() ? My mental model for multitrheading comes from OpenMP (admittely also from a couple of years in the past).
I plan do dive deeper into this, but probably will find time only in a couple of weeks.
I’m sorry if I am not making a crystal clear argument. ChunkSplitters is great, but I want to use a function which has been designed as serial, and it calls into another function (which is supposed work on a part of a global array). I don’t want to pass another argument to either one of them (i.e. the chunk number) when it is run in a parallel task. So somehow I need to propagate that chunk number through the task from the calling environment where I spawned the task to an interior function two layers down.