Why has simple threading using `threadid` become so complex in v1.12...?

Hi, several of my repos got raised an issue for the upcoming v1.12 and changes to multithreading, notifying of erroneous usage of threadid. E.g., Likely erroneous use of `Threads.nthreads` and `Threads.threadid` · Issue #169 · JuliaDynamics/RecurrenceAnalysis.jl · GitHub

I didn’t really understand why our usage was wrong, but in any case we tried to find the simplest way to address the issue. What I am rather unhappy with is that in v1.12 threading code has to be so much more complex. One of the big strengths of Julia was how simple/easy was to parallelize an existing code, but now in v1.12 it is significantly more complex, with lots of book-keeping required manually by the user. This gives vibes more like C than like Julia.

This is how we were using threadid, and how we now have to use it in v1.12:

# setup
ds = mutable_datastructure()
dss = [deepcopy(ds) for _ in 1:Threads.nthreads()]
outputs = zeros(length(some_iterable))

# pre v1.12 way:
Threads.@threads for j in some_iterable
    i = Threads.threadid()
    ds = dss[i]
    outputs[j] = computation!(ds, j)
end

# post v1.12 way:
threadchannel = Channel{Int}(Threads.nthreads())
for i in 1:nbuffers
    put!(threadchannel, i)
end

Threads.@threads for j in some_iterable
    i = take!(threadchannel)
    ds = dss[i]
    outputs[j] = computation!(ds, j)
    put!(threadchannel, i)
end
close(threadchannel)

I guess my question is why couldn’t we make the first version work in v1.12 just out of the box? Or, is there a simpler and more elegant way to make version 1 work in v1.12 that is not as verbose and book-keeping heavy as the v1.12 version?

I would write this as

using OhMyThreads
outputs = @tasks for j in some_iterable
    @set collect=true # makes this into a `map`-like operation
    @local ds = mutable_datastructure() # Creates 1-instance of your mutable data structure per task
    computation!(ds, j)
end

For what it’s worth, what you were doing was incorrect long before v1.12, it’s just that it broke even more with 1.12. There’s a blogpost about this here that explains why uses of threadid like this cause race conditions. PSA: Thread-local state is no longer recommended. It should really be updated to mention OhMyThreads.jl though

18 Likes

Instead of using treadid, just create your own “taskid” below

using Base.Threads

numofdata = 100
@show rawdata = rand(numofdata)
# prepare the processed_data
processed_data = zeros(numofdata)

NumOfThreads = Threads.nthreads()
# Manually set the number of threads to 4
# NumOfThreads = 4

# Sanity check
if NumOfThreads > length(rawdata) 
    NumOfThreads = length(rawdata)
end

startpos = 1
endpos = numofdata

function CalcChunk(NumOfThreads,i,startpos,endpos)
    numofdata = endpos - (startpos - 1)
    chunksize = numofdata ÷ NumOfThreads
    firstpos = startpos + chunksize * (i - 1)
    lastpos  = startpos + chunksize * i - 1
    if i == NumOfThreads
        # The last thread takes up any remainding elements
        lastpos = endpos
    end
    return (firstpos,lastpos)
end

Threads.@threads for taskid = 1:NumOfThreads
    global NumOfThreads, startpos, endpos
    for pos = range( CalcChunk(NumOfThreads,taskid,startpos,endpos)... )
        println("Task $(taskid), Working with rawdata[$pos] = $(rawdata[pos])")
        individualdata = rawdata[pos]
        processed_data[pos] = individualdata * individualdata
    end
end


println("Done")

Actually, the idea behind the change (which happened already in Julia 1.7, not 1.12) is to make it simpler, not more complex. It’s unfortunate that so much of idiomatic Julia code used threadid, when it was never a good fit for Julia’s threading model.

A core idea behind Julia’s threading model is that the user is supposed to think in terms of tasks, and not threads. Threads are a resource provided by the operating system, which is handled transparently by the runtime. It is analogous to how a user should think of memory: Of course we can think about how much we consume, but we don’t have any control of where in memory an object is allocated or in which order on the heap a collection of variables are placed. Similar with threads: We can think about how well our program makes use of N threads provided by the operating system, but we should not reason about which task runs on which thread - and ideally, we shouldn’t even write code that depends on N threads being available.

Generally, this simplifies Julia code, but there are some cases where the user is forced to think about threads explicitly. E.g. if you call into some C code where each thread needs to be initialized independently. That’s a rare case though.

In your case, it IS especially tricky because you need to instantiate as few copies of ds as possible, while still guaranteeing that each concurrent task operates on a distinct copy (that’s how I take it, anyway). For this, I would do the same as @Mason suggests above.
Note that the underlying implementation in OhMyThreads still doesn’t express its abstraction in terms of threads, AFAICT, but instead expresses it as a number of iterations shared across a smaller number of tasks.

28 Likes

The documentation for both OhMyThreads.jl and ChunkSplitters.jl do a great job at explaining how “simple” multithreading can work.

As an example, using ChunkSplitters.jl, you could do:

# setup
n = Threads.nthreads()
ds = mutable_datastructure()
dss = [deepcopy(ds) for _ in 1:n]
outputs = zeros(length(some_iterable))

Threads.@threads for (i, c) in enumerate(chunks(some_iterable; n=n))
    local ds = dss[i]
    for j in c
        outputs[j] = computation!(ds, j)
    end
end
5 Likes

@Mason thank you, to clarify: does the above code also work if instead of ds = xxx() I do ds = vector[i]? That is, let’s assume that creating the mutable data structure is expensive and I want to avoid re-doing it often. Or, does this code automatically create only one instance of ds per thread?

Sorry @Datseris I’m having a lot of trouble understanding the way you’ve phrased parts of your question, so maybe this isn’t what you’re asking, but I think the core of your question comes down to the last sentence:

No, it creates one instance of ds per task. You may choose to have only one task per thread, but you can also have 10000 tasks if you want (set with ntasks=10000). The default in OhMyThreads is for it to create a number of tasks that’s equal to the number of threads, but you can do more or less.

What @local does is create a TaskLocalValue (from TaskLocalValues.jl) before the loop, and then lets you access it in the loop. i.e. the code is equivalent to (but faster than)

using OhMyThreads: tmap, TaskLocalValue
const ds = TaskLocalValue(() -> mutable_datastructure())
outputs = map(some_iterable) do j
    computation!(ds[], j)
end

Each time ds is accessed on a new Task, the function () -> mutable_datastructure() is executed to create a new task-local-value to use in the computation.

2 Likes

Thanks, yes this answers it, sorry I am still trying to wrap my head around everything. One task per thread is what I want so I will leave the snippet you pasted as is and now incorporate it into all the libraries I’ve gotten the automated issue opened that warns about the old behavior problems in 1.12 (and before).

1 Like

For a simple for loop where data races are not a problem such as

using Base.Threads
function foo(x)
    xsq = Vector{Float64}(undef, length(x))
    @threads for i in 1:length(x)
        xsq[i] = x[i]^2
    end
    return xsq
end

foo(0:10_000)

Is this still the recommended method or would OhMyThreads be preferred?

this is fine

The problem with the pattern

Threads.@threads for j in some_iterable
    i = Threads.threadid()
    ds = dss[i]
    outputs[j] = computation!(ds, j)
end

is that the data in dss is shared between tasks/threads and therefore access needs to be synchronized. Relying on threadid can only ever work if each loop body is run to completion before any other thread could reuse the data, i.e., it breaks if the code yields somewhere and another task – on the same thread – takes over.

Instead, channels can be used to coordinate access to the shared resources – as you already did in your # post v1.12 way example. This pattern can be abstracted a bit with some helper functions, e.g.,

function make_shared_resources(n, constructor)
    res = [constructor() for _ in 1:n]
    ch = Channel{eltype(res)}(n)
    for x in res
        put!(ch, x)
    end
    ch
end

function with_resource(f, resource)
    x = take!(resource) # obtain shared resource
    try
        f(x) # do your work
    finally
        put!(resource, x) # put resource back
    end
end

# Your example
dss = make_shared_resources(Threads.nthreads(), mutable_datastructure)
Threads.@threads for j in some_iterable
    with_resource(dss) do ds
        outputs[j] = computation!(ds, j)
    end
end
1 Like

If I prefer this one is it still good practise ?

function main()
           N = 100
           dss = OncePerThread{Vector{Float64}}() do
               zeros(N)
           end
           res = zeros(N)
           Threads.@threads for i in 1:N
               ds = dss()
               res[i] = computation!(ds,i)
           end
           return res
       end

even though its thinking about threads again

No, because tasks migration means that multiple tasks can end up accessing the same ds at the same time and overwriting eschother’s work.

1 Like

Oh ok so whats the point of the OncePer family ?

You should use OncePerTask in that setup. The problem is that @threads is a misnomer, it should have been @tasks, but the name was coined a long time ago when tasks did not migrate between threads.

2 Likes

Oh ok so this is fine

function main()
           N = 100
           dss = OncePerThread{Vector{Float64}}() do
               zeros(N)
           end
           res = zeros(N)
           @sync for i in 1:N
               @spawn begin
                           ds = dss()
                           res[i] = computation!(ds,i)
               end
           end
           return res
       end

or the :static case should also

It’s ok, but inefficient since each @spawn creates a new task, so dss() is called just once per task anyway. But your former example, with @threads would work fine with the OncePerTask. The @threads macro creates a number of tasks, each with a for loop inside.

1 Like

Ok thats pretty much my use case 90% of time so I just replace the constructor with OncePerTask and the inner with the call instead of the indexing to id. This is far from ugly actually I just don’t like the whole Channel closure thing
Thank you !

1 Like

This has the same problem.

Yeah I see, really need to think of tasks

function main()
           N = 100
           dss = OncePerTask{Vector{Float64}}() do
               zeros(N)
           end
           res = zeros(N)
           Threads.@threads for i in 1:N
               ds = dss()
               res[i] = computation!(ds,i)
           end
           return res
       end
1 Like