Behavior of threads

The manual states:

The @threads macro executes the loop body in an unspecified order and potentially concurrently. It does not specify the exact assignments of the tasks and the worker threads. The assignments can be different for each execution. The loop body code (including any code transitively called from it) must not make any assumptions about the distribution of iterations to tasks or the worker thread in which they are executed. The loop body for each iteration must be able to make forward progress independent of other iterations and be free from data races. As such, invalid synchronizations across iterations may deadlock while unsynchronized memory accesses may result in undefined behavior.

What I wonder is: can one make an assumption that when a function gets called within a loop body, the entire function body will be executed by single thread?

1 Like

No.

For the function in question, if you do not use @threads on the loop that calls it, can you know that the entire function body will be executed by a single thread?

The function could use @threads internally, not to mention that due to task preemption, the task executing the loop could move between threads.

Good points, everyone. To make my concern concrete: Consider the function

function f(a)
    th = Base.Threads.threadid()
    ... some code
    a[th] = 1
    ... some code
    th = Base.Threads.threadid()
    return     a[th] == 1 ? 5 * a[th] : 0
end

This function takes an argument which is supposed to be a thread-private
storage. Then use the function inside this threaded loop:

a = fill(0, Base.Threads.nthreads())
Base.Threads.@threads for th in 1:Base.Threads.nthreads()
    b = f(a)
end

Please never mind that the code doesnā€™t make much sense.
My question is, is it possible that the th in the line a[th] = 1 is not the same th as in the return line?
Because if that is possible, then some other copy of this executing function
could refer to the ā€œsameā€ th, thereby breaking the private storage.

I think I chanced upon the solution: the argument :static for the @threads macro
should guarantee what I want.

a = fill(0, Base.Threads.nthreads())
Base.Threads.@threads :static for th in 1:Base.Threads.nthreads()
    b = f(a)
end

Also relevant: Behavior of `Threads.@threads for` loop
With solution by @carstenbauer

Note that the solution in this other thread isnā€™t quite accurate anymore, because the default scheduler of @threads has changed from :static to :dynamic in Julia 1.8.

Itā€™s pretty simple: With default options, neither @threads nor @spawn creates (thread-)sticky tasks. Hence, there are no guarantees whatsoever about the task-thread mapping. In particular, you must assume that tasks can migrate between threads at any point in time. If you want guarantees (i.e. sticky tasks), use @threads :static or @tspawnat from e.g. ThreadPools.jl or ThreadPinning.jl.

Note that, at this time, tasks might not migrate much in practice. But they can and you should assume that they do if you want to write stable code.

2 Likes

Quite generally, threadid() is a footgun. Why? Because, as I indicated in my previous post, the default macros create non-sticky tasks, so the return value of threadid() can vary over the course of executing a piece of code. Fundamentally, this is because Julia offers task-based multithreading. The central concept is a task, not a thread.

(In this context, @threads is somewhat of a misnomer IMO. I think @vchuravy has voiced the same at some point. Something like @parallel or @multithreaded would have probably been a better name.)

For the reasons mentioned above, unless you have a good reason (which you might or might not have), you should try to think in terms of task-local storage rather than thread-local storage. For this, things like Base.task_local_storage and ChunkSplitters.jl might be helpful.

4 Likes

Thank you very much for this insight!

Maybe this is a dumb question, but would it make sense to have a taskid()?

It would have more or less the same potential pitfalls as the current threadid() (other than being presumably a constant per task); that is, making an array of a fixed size and indexing it with taskid() can have hidden dataraces if you accidentally (or on purpose) access an entry that isnā€™t ā€œforā€ your task. Storing things that other tasks shouldnā€™t mess with in a task local storage, on the other hand, prevents this kind of data race semantically and forces explicit sharing of state across tasks, leading to a less error-prone architecture by design.

What you say makes sense. But, I do not have the option of copying things into a task-local storage: they are too big. I have to work with a global array. They way I understand it, once I give a task some data to process, no other task will ever interfere: meaning when I ask for a task id, it will always be the same. Isnā€™t that right?

Iā€™m not sure I follow - as far as Iā€™m aware, there is no size limit on things you can put into task_local_storage:

julia> task_local_storage(:test, rand(10_000_000));

julia> task_local_storage(:test2, rand(10_000_000));

julia> task_local_storage(:test3, rand(1_000_000_000));

Only insofar that hypothetical task id may stay the same. However, there can be other tasks being spawned as part of your algorithm, either internally or explicitly, that increase the highest task id. So thereā€™s no equivalent to nthreads to use for sizing that global array youā€™re referencing. Internally, if youā€™ve already given the task all data it needs to compute its result, what would it need its own task id for? That number is not necessarily in 1:nthreads() after all.

I do not want to unnecessarily duplicate the data. It consumes memory, and a large chunk of it.

Each task should work by accessing only a little bit of the array, not the whole thing. In order for it to stay within the range it is supposed to see, I can use the chunk number. If I spawn as many tasks as there are chunks, I can identify the chunk by the task. Does that make sense?

Thatā€˜s exactly what ChunkSplitters.jl are doing, and threads donā€˜t need copies, they use shared memory.

Yes, it makes sense. This is more or less the way ChunkSplitters works and, as far as Iā€™m aware, this approach is safe even when tasks can migrate.

In the example given in the ChunkSplitters README, Threads.@threads will spawn as many tasks as there are chunks, and ichunk identifies a chunk (and maybe this is nitpicking, but ichunk may not really be considered as a task identifier, because there might exist many other tasks that were not spawned by this Threads.@threads invocation)

function sum_parallel(f, x; nchunks=Threads.nthreads())
    s = fill(zero(eltype(x)), nchunks)
    Threads.@threads for ichunk in 1:nchunks
        for i in chunks(x, ichunk, nchunks)
            s[ichunk] += f(x[i])
        end
    end
    return sum(s)
end

I would second @PetrKryslUCSD : when working with large problems it appears to be prohibitive to copy all the data into task local storage. Instead, one would partition the index sets and take care about avoiding write conflicts by proper scheduling of non-conflicting accesses (e.g. by coloring the partition graph and having several multithreaded loops over each of the colors).

I am still puzzled by threadid() as a footgun. How and why is this different from omp_get_thread_num() ? My mental model for multitrheading comes from OpenMP (admittely also from a couple of years in the past).

I plan do dive deeper into this, but probably will find time only in a couple of weeks.

Iā€™m sorry if I am not making a crystal clear argument. ChunkSplitters is great, but I want to use a function which has been designed as serial, and it calls into another function (which is supposed work on a part of a global array). I donā€™t want to pass another argument to either one of them (i.e. the chunk number) when it is run in a parallel task. So somehow I need to propagate that chunk number through the task from the calling environment where I spawned the task to an interior function two layers down.

Maybe Iā€˜m not understanding the setting, but it seems that just using Threads.@threads on the loop calling the function on the individual parts of the array should work (without using threadid()).

1 Like