Why has simple threading using `threadid` become so complex in v1.12...?

one more option for if you don’t have control over if user will remember to do OncePer correctly (e.g. if this code is actually split into multiple functions and the dss is carried by some composite data structure):

function main()
    N = 100
    Nthreads = Threads.nthreads()
    thread_locks = [ReentrantLock() for _ in 1:Nthreads]
    dss = [zeros(N) for _ in 1:Nthreads]
    res = zeros(N)

    Threads.@threads for i in 1:N
        tid = Threads.threadid()
        tlock = thread_locks[tid]
        Base.@lock tlock begin
            ds = dss[tid]
            res[i] = computation!(ds,i)
        end
    end
    return res
end

Note: This is potentially not safe against # of threads changing at run time.

Pretty sure that’s just generally not safe either, and has the same problems as the original kind of code shown in the blog post and in the OP here - there’s no guarantee that the task will not be migrated to a different thread between any of the steps here, so it might end up holding a logically invalid lock from a different thread, or hold one lock and update a different index in dss, etc.

that can’t happen because tid is set only once before we get the lock and try to lock it. Like yes task can migrate, but the lock[tid]-dss[tid] are always paired so it’s fine. (it’s just that tid may not be threadid() at each step, but that’s why we don’t call threadid() at each step)

1 Like

I think your take here is right, since the resource being contested is dss[idx] which is protected by lock thread_locks[idx]; then you’re choosing a hopefully-available idx by whatever theadid() you started on, but if you clash with another thread it’s fine because you have a lock.

Perhaps the logic would be more clear with

using Base: Lockable # public on 1.11+
dss = [Lockable(zeros(N)) for _ in 1:Nthreads]

Then the loop would look like

    Threads.@threads for i in 1:N
        ds = dss[Threads.threadid()] # some hopefully-available resource
        Base.@lock ds begin
            # use the resource once we have exclusive access
            res[i] = computation!(ds[],i)
        end
    end

But maybe you could also avoid the threadid() code smell with idx = mod1(i, length(dss)) or something.

Or maybe even better, use a channel, take the resource when you need it, put it back when you’re done.

1 Like

Note that fussing around with locks and channels is fine when each iteration of your loop is slow, but if youre paralellizing a loop where each iterations is very fast, you’ll totally kill performance by doing this.

1 Like

@Mason my use case is very similar to @Datseris 's:

model = fit(Model, data)
state = [deepcopy(model) for i in 1:Threads.nthreads()]
Threads.@threads for i in 1:1000
  m = state[Threads.threadid()]
  predict!(m, newdata)
end

The fit step is expensive. How to best adjust the proposed solution in Why has simple threading using `threadid` become so complex in v1.12...? - #2 by Mason?

Fundamentally the issue is that threadid is only marginally better than asking “what cpu core am I currently running on”.

All the attempts to write a single loop, use @threads and then negotiate shared resources are deeply silly if you think about what that macro roughly does:

model = fit(Model, data)
state = [deepcopy(model) for i in 1:Threads.nthreads()]
Threads.@threads for i in 1:1000
  m = state[Threads.threadid()]
  predict!(m, newdata)
end

is equivalent to

model = fit(Model, data)
state = [deepcopy(model) for i in 1:Threads.nthreads()]
@sync for chunk in ChunkSplitters.index_chunks(1:1000;n= Threads.nthreads())
  @spawn for item in chunk
      m = state[Threads.threadid()]
      predict!(m, newdata)
   end
end

Look at that code. It is silly stupid: In the for item in chunk loop, the relevant threadid / m is constant. Or, according to your mental model it’s supposed to be constant, and it’s non-constancy is what causes the bugs.

You already know what you need to do: The same kind of transformation that is the compiler’s bread and butter, i.e. loop invariant code motion:

model = fit(Model, data)
state = [deepcopy(model) for i in 1:Threads.nthreads()]
@sync for (chunk_idx, chunk) in enumerate(ChunkSplitters.index_chunks(1:1000;n= Threads.nthreads()))
  @spawn begin 
      m = state[chunk_idx]
      for item in chunk
          predict!(m, newdata)
      end
   end
end

Acquiring the sparse resource moves out of the inner loop.

None of you would ever write the silly stupid code that Threads.@threads expands to.

It’s just that Threads.@threads does 2 nice things: It deals with chunking the input, and it deals with the @sync/@spawn, and because it conflates these things you don’t see the obvious place to put your state (inside the outer loop, outside the inner loop).

I think that’s partly the fault of Base for not exporting a blindingly obvious equivalent of ChunkSplitters.index_chunks.

Long term, Threads.@threads should be deprecated. It teaches the wrong abstractions and is toxic to how people think. (can’t be deprecated now since Base can’t be arsed to export a proper way to split the input into chunks)

3 Likes

Do we really need ChunkSplitters.jl? Why not use Iterators.partition?

See No more 1st class support for Threads? [thread-local storage] - #34 by foobar_lv2

Also note that if one does not need to store the task-local buffer and does not need the indices, this can be simplified to:

model = fit(Model, data)
# split work in n=nthreads() chunks
@threads for chunk in ChunkSplitters.chunks(1:1000; n=Threads.nthreads())
   m = deepcopy(model)
   for item in chunk
       predict!(m, newdata)
   end
end

Which makes pretty clear that the only n copies of model are created, and that there is not race condition among their uses.

3 Likes

You don’t need to use chunksplitters. Indeed, using chunksplitters is kinda silly since it’s such a trivial package.

On the other hand, Base.partition doesn’t do the job properly (Threads.@threads does!); and even if you are OK with that, you still need to wrangle it without making an off-by-1 error.

You will write an obvious one-liner, have bugs, feel silly because it’s such obvious code, and eventually end up with a working obvious one-liner. And the next time a colleague asks you, you will be tempted to tell them “use Threads.@threads if you don’t need expensive state that lives over the entire chunk” because it’s too miserable to get right and too silly to use a dependency for this, and the problem perpetuates forever :frowning:

1 Like

In defense of ChunkSplitters.jl :rofl:, although it is indeed trivial, it has some options on how to split the data (consecutive or round-robin and minimum chunk size) which can simplify load balancing and give the user some additional control over the splitting strategy. And OhMyThreads.jl leverages those and does much more.

(also the fact that @threads works for enumerate is a feature of ChunkSplitters.jl, which is what allows the use of the chunk indices for indexing the chunk-local buffers - in general @threads does not work with enumerate - a minor inconvenience).

6 Likes

Using @threads with chunksplitters is a complete anti-pattern. @threads has two jobs that are conflated: Do the chunk splitting, and do the @sync / @spawn. If you do the chunk-splitting outside, then you should also use @sync / @spawn.

I’m not attacking that package. It has a simple job, it does its simple job right, it has a good API, it has the right kind of options, it is well-maintained by people who are trusted and active in the community. It is excellent.

My only issues are that (1) chunksplitters is borderline too small to be an independent package (software supply chain hassle scales badly with number of independent packages. You wouldn’t start a task for each iteration in tmap for most applications, and you shouldn’t have a separate package for each tiny bit of functionality), and (2) this really is a thing that should be batteries-included. There should be one preferred officially encouraged pattern to do basic multi-threading, and that pattern must be convenient without uncurated extra dependencies.

This is especially poignant because the package registry is uncurated (as opposed to stdlib). This is not like your linux distro shipping a default install without some essential tool, it’s like the olden days of windows shipping without zipfile handling.

A curated subset of the registry would also go a long way. If openssh stops working on arch, then the buck stops at arch, not upstream; if some AUR package stops working on arch, then the buck stops at the user who decided to install that (but in egregious cases of e.g. malware, the team might graciously intervene, just like apple might or might not intervene and pull an app). The abdication of responsibility by microsoft, apple and google (and mozilla!) is breathtaking – every tiny volunteer-run linux distro manages to do better (the debian openssl apocalypse notwithstanding).

2 Likes

I don’t disagree, but @threads splits the work into nthreads, which is not necessarily what one wants.

What I think ChunkSpliters doesn’t do and a better tool should do is to allow adaptive chunks.

I think that’s really complex I know MLIR does it (Reactant.jl ) but it’s more of an AOT thing I think

Yeah, I thought about that as well. Would be nice!

The unfortunate fact is that adaptive chunks must have one lock cmpxchg per iteration, in order to check whether somebody else has stolen work from the chunk. This comes at ~20 cycles, and will completely mess up performance for very cheap per-item operations. On the other hand, for largish per-item operations (> 100k cycles), you can just spawn a task per op. And on the third hand, if you have reasonably many operations to do and the distribution of costs is not too wild (e.g. has finite and small standard deviation), then the law of large numbers ensures that a non-adaptive split into chunks is good enough.

Adaptive chunks are a nice feature, but they cannot be the default, and they have a relatively narrow band of usefulness – only for fat-tailed distributions of runtime where many of the tasks are too cheap for their own Task.

I’d nevertheless like to see them. Ideally with an API like java.util.stream (Java Platform SE 8 ) . The standard APIs in julia world allow an init = ... keyword, but that doesn’t cut the cake, one needs an initializer = ... keyword, such that one later computes op(initializer(), element) etc. More mathematically speaking, the right mental model is not “map items to a group, then reduce”, but rather “you have a group of accumulators of items”.

I’d like to note that java stdlib doesn’t properly support this usecase of fat-tailed multi-threaded stream reduction either – they balk at the per-item cost of the lock cmpxchg as well, and have micro-batches that cannot be adaptively split further. This is a real performance annoyance at my dayjob (some of our streams have a mix of “10 ns, done” and “lol, grab a coffee” items; pray that the slow jobs don’t end up in the same micro-batch).

If I had too much time / motivation, I’d probably try to submit something along these lines to openjdk, as a new stream option (a .fatTailed() like the .parallel()).

This is an unhelpful hyperbole. There’s no reason one must use @sync / @spawn together with chunk-splitters. If one is fine with nthreads() tasks being spawned, it’s functionally the same and a little more concise to use @threads.

Then stop calling it ‘trivial’ every time you get a chance and complaining about its low number of lines of code. If it’s easy to get ChunkSplitters.jl right on your own do it on your own, if not, then add it as a dep. This hand-wringing over it being small isn’t helpful to this thread.

Pretty much everyone agrees with this. The problem is that people don’t agree on what that officially encouraged pattern and API should be. There was one API and set of patterns pushed when the threading infrastructure was new, and what we found out with time was that those APIs and patterns were actually bad.

Now we’re trying to figure out what should replace it, and it’s looking like it should be something resembling ChunkSplitters + a curated selection of pieces from OhMyThreads.

What’s currently missing is someone willing to take on the task of curating and upstreaming that selection, and dealing with all the bikeshedding that’ll result.

7 Likes

I don´t have the knowledge to answer this one: it would make sense to redistribute tasks after a fixed time? Like 1 s? Is it conceivable to check for the possibility of redistribution in that way without compromising the performance of tight loops?

Not from a high-level perspective. The issue is that:

  1. For very tight loops, checking for redistribution tanks performance. These loops must be unrolled and simd-ified.
  2. For fat-tailed distributions, you must check for redistribution for every single iteration.

A sensible approach would be: We eventially want to “un-tighten” tight loops anyways, in order to insert GC safepoints (otherwise your tight loop never hits a GC safepoint until it is done, which can take a while, and all other threads are waiting until that happens). This insertion of GC safepoints needs performance analysis of loops, and then insert a safe-point every whatever-many iterations.

If we eventually get this, we could tie in there: Check for redistribution at safe-points; if we don’t need a safepoint, then this cannot be expensive (otherwise we’d need a safepoint!), and we don’t need to check for re-distribution. On the other side, if we have a safepoint anyways, then it cannot hurt too much to check for re-distribution.

Suppose you have 10 billion jobs that take 3 ns each, and 10 jobs in that list that take 5 seconds each, on your 16 core machine, and some twist of fate has placed the bad jobs directly adjacent to each other. You have 8 seconds. This is the nightmare situation. (it’s also a lazy-user situation – of course the user should do a complex API dance to split the cheap and expensive jobs)

I’m saying: tying into a (hypothetical future) safepoint mechanism is “science fiction”, not “fantasy”. It is conceivable, but I don’t think this will ever happen. Not for us at julia, and not in JVM either (they at least have a working safepoint auto-insert mechanism!).

PS. But the atomics-based adaptive splitting can solve the situation where the cheap jobs take 20 ns without tanking performance. This would be awesome, currently we can only solve the issue if cheap jobs take 2000 ns (by doing task-per-job).

1 Like

An atomic per chunk should suffice?

N = Atomic{Int}(iterations)
numthreads = 24

@sync for _ in 1:numthreads
    @spawn begin
        local chunksize = compchunksize(N, numthreads)
        while (stop = atomic_sub!(N, chunksize)) > 0
            start = max(last-chunksize+1, 1)
            for i in start:stop
                ... meat i ...
            end
            chunksize = compchunksize(N, numthreads)
        end
    end
end

This pattern is at least useable for iterating over vector-like things.

This means that start:stop is a microbatch that cannot be adaptively split if start+3 takes a suprising amount of time.

If your distribution is fat-tailed and two bad jobs are in the same microbatch, then you’re fucked.

Imagine that most meat i only do a quick small computation, but some of them need to pull data over the network (from australia, with a brand-new https connection, suffering from tcp slowstart, and apparently transmitted via carrier pigeon. Only that you don’t know beforehand because it looks like a pointer load that happens to be an mmaped file on a network share).