Why has simple threading using `threadid` become so complex in v1.12...?

one more option for if you don’t have control over if user will remember to do OncePer correctly (e.g. if this code is actually split into multiple functions and the dss is carried by some composite data structure):

function main()
    N = 100
    Nthreads = Threads.nthreads()
    thread_locks = [ReentrantLock() for _ in 1:Nthreads]
    dss = [zeros(N) for _ in 1:Nthreads]
    res = zeros(N)

    Threads.@threads for i in 1:N
        tid = Threads.threadid()
        tlock = thread_locks[tid]
        Base.@lock tlock begin
            ds = dss[tid]
            res[i] = computation!(ds,i)
        end
    end
    return res
end

Note: This is potentially not safe against # of threads changing at run time.

Pretty sure that’s just generally not safe either, and has the same problems as the original kind of code shown in the blog post and in the OP here - there’s no guarantee that the task will not be migrated to a different thread between any of the steps here, so it might end up holding a logically invalid lock from a different thread, or hold one lock and update a different index in dss, etc.

that can’t happen because tid is set only once before we get the lock and try to lock it. Like yes task can migrate, but the lock[tid]-dss[tid] are always paired so it’s fine. (it’s just that tid may not be threadid() at each step, but that’s why we don’t call threadid() at each step)

1 Like

I think your take here is right, since the resource being contested is dss[idx] which is protected by lock thread_locks[idx]; then you’re choosing a hopefully-available idx by whatever theadid() you started on, but if you clash with another thread it’s fine because you have a lock.

Perhaps the logic would be more clear with

using Base: Lockable # public on 1.11+
dss = [Lockable(zeros(N)) for _ in 1:Nthreads]

Then the loop would look like

    Threads.@threads for i in 1:N
        ds = dss[Threads.threadid()] # some hopefully-available resource
        Base.@lock ds begin
            # use the resource once we have exclusive access
            res[i] = computation!(ds[],i)
        end
    end

But maybe you could also avoid the threadid() code smell with idx = mod1(i, length(dss)) or something.

Or maybe even better, use a channel, take the resource when you need it, put it back when you’re done.

1 Like

Note that fussing around with locks and channels is fine when each iteration of your loop is slow, but if youre paralellizing a loop where each iterations is very fast, you’ll totally kill performance by doing this.

1 Like

@Mason my use case is very similar to @Datseris 's:

model = fit(Model, data)
state = [deepcopy(model) for i in 1:Threads.nthreads()]
Threads.@threads for i in 1:1000
  m = state[Threads.threadid()]
  predict!(m, newdata)
end

The fit step is expensive. How to best adjust the proposed solution in Why has simple threading using `threadid` become so complex in v1.12...? - #2 by Mason?

Fundamentally the issue is that threadid is only marginally better than asking “what cpu core am I currently running on”.

All the attempts to write a single loop, use @threads and then negotiate shared resources are deeply silly if you think about what that macro roughly does:

model = fit(Model, data)
state = [deepcopy(model) for i in 1:Threads.nthreads()]
Threads.@threads for i in 1:1000
  m = state[Threads.threadid()]
  predict!(m, newdata)
end

is equivalent to

model = fit(Model, data)
state = [deepcopy(model) for i in 1:Threads.nthreads()]
@sync for chunk in ChunkSplitters.index_chunks(1:1000;n= Threads.nthreads())
  @spawn for item in chunk
      m = state[Threads.threadid()]
      predict!(m, newdata)
   end
end

Look at that code. It is silly stupid: In the for item in chunk loop, the relevant threadid / m is constant. Or, according to your mental model it’s supposed to be constant, and it’s non-constancy is what causes the bugs.

You already know what you need to do: The same kind of transformation that is the compiler’s bread and butter, i.e. loop invariant code motion:

model = fit(Model, data)
state = [deepcopy(model) for i in 1:Threads.nthreads()]
@sync for (chunk_idx, chunk) in enumerate(ChunkSplitters.index_chunks(1:1000;n= Threads.nthreads()))
  @spawn begin 
      m = state[chunk_idx]
      for item in chunk
          predict!(m, newdata)
      end
   end
end

Acquiring the sparse resource moves out of the inner loop.

None of you would ever write the silly stupid code that Threads.@threads expands to.

It’s just that Threads.@threads does 2 nice things: It deals with chunking the input, and it deals with the @sync/@spawn, and because it conflates these things you don’t see the obvious place to put your state (inside the outer loop, outside the inner loop).

I think that’s partly the fault of Base for not exporting a blindingly obvious equivalent of ChunkSplitters.index_chunks.

Long term, Threads.@threads should be deprecated. It teaches the wrong abstractions and is toxic to how people think. (can’t be deprecated now since Base can’t be arsed to export a proper way to split the input into chunks)

3 Likes

Do we really need ChunkSplitters.jl? Why not use Iterators.partition?

See No more 1st class support for Threads? [thread-local storage] - #34 by foobar_lv2

Also note that if one does not need to store the task-local buffer and does not need the indices, this can be simplified to:

model = fit(Model, data)
# split work in n=nthreads() chunks
@threads for chunk in ChunkSplitters.chunks(1:1000; n=Threads.nthreads())
   m = deepcopy(model)
   for item in chunk
       predict!(m, newdata)
   end
end

Which makes pretty clear that the only n copies of model are created, and that there is not race condition among their uses.

2 Likes

You don’t need to use chunksplitters. Indeed, using chunksplitters is kinda silly since it’s such a trivial package.

On the other hand, Base.partition doesn’t do the job properly (Threads.@threads does!); and even if you are OK with that, you still need to wrangle it without making an off-by-1 error.

You will write an obvious one-liner, have bugs, feel silly because it’s such obvious code, and eventually end up with a working obvious one-liner. And the next time a colleague asks you, you will be tempted to tell them “use Threads.@threads if you don’t need expensive state that lives over the entire chunk” because it’s too miserable to get right and too silly to use a dependency for this, and the problem perpetuates forever :frowning:

1 Like

In defense of ChunkSplitters.jl :rofl:, although it is indeed trivial, it has some options on how to split the data (consecutive or round-robin and minimum chunk size) which can simplify load balancing and give the user some additional control over the splitting strategy. And OhMyThreads.jl leverages those and does much more.

(also the fact that @threads works for enumerate is a feature of ChunkSplitters.jl, which is what allows the use of the chunk indices for indexing the chunk-local buffers - in general @threads does not work with enumerate - a minor inconvenience).

3 Likes

Using @threads with chunksplitters is a complete anti-pattern. @threads has two jobs that are conflated: Do the chunk splitting, and do the @sync / @spawn. If you do the chunk-splitting outside, then you should also use @sync / @spawn.

I’m not attacking that package. It has a simple job, it does its simple job right, it has a good API, it has the right kind of options, it is well-maintained by people who are trusted and active in the community. It is excellent.

My only issues are that (1) chunksplitters is borderline too small to be an independent package (software supply chain hassle scales badly with number of independent packages. You wouldn’t start a task for each iteration in tmap for most applications, and you shouldn’t have a separate package for each tiny bit of functionality), and (2) this really is a thing that should be batteries-included. There should be one preferred officially encouraged pattern to do basic multi-threading, and that pattern must be convenient without uncurated extra dependencies.

This is especially poignant because the package registry is uncurated (as opposed to stdlib). This is not like your linux distro shipping a default install without some essential tool, it’s like the olden days of windows shipping without zipfile handling.

A curated subset of the registry would also go a long way. If openssh stops working on arch, then the buck stops at arch, not upstream; if some AUR package stops working on arch, then the buck stops at the user who decided to install that (but in egregious cases of e.g. malware, the team might graciously intervene, just like apple might or might not intervene and pull an app). The abdication of responsibility by microsoft, apple and google (and mozilla!) is breathtaking – every tiny volunteer-run linux distro manages to do better (the debian openssl apocalypse notwithstanding).

I don’t disagree, but @threads splits the work into nthreads, which is not necessarily what one wants.

What I think ChunkSpliters doesn’t do and a better tool should do is to allow adaptive chunks.