I didn’t really understand why our usage was wrong, but in any case we tried to find the simplest way to address the issue. What I am rather unhappy with is that in v1.12 threading code has to be so much more complex. One of the big strengths of Julia was how simple/easy was to parallelize an existing code, but now in v1.12 it is significantly more complex, with lots of book-keeping required manually by the user. This gives vibes more like C than like Julia.
This is how we were using threadid, and how we now have to use it in v1.12:
# setup
ds = mutable_datastructure()
dss = [deepcopy(ds) for _ in 1:Threads.nthreads()]
outputs = zeros(length(some_iterable))
# pre v1.12 way:
Threads.@threads for j in some_iterable
i = Threads.threadid()
ds = dss[i]
outputs[j] = computation!(ds, j)
end
# post v1.12 way:
threadchannel = Channel{Int}(Threads.nthreads())
for i in 1:nbuffers
put!(threadchannel, i)
end
Threads.@threads for j in some_iterable
i = take!(threadchannel)
ds = dss[i]
outputs[j] = computation!(ds, j)
put!(threadchannel, i)
end
close(threadchannel)
I guess my question is why couldn’t we make the first version work in v1.12 just out of the box? Or, is there a simpler and more elegant way to make version 1 work in v1.12 that is not as verbose and book-keeping heavy as the v1.12 version?
using OhMyThreads
outputs = @tasks for j in some_iterable
@set collect=true # makes this into a `map`-like operation
@local ds = mutable_datastructure() # Creates 1-instance of your mutable data structure per task
computation!(ds, j)
end
For what it’s worth, what you were doing was incorrect long before v1.12, it’s just that it broke even more with 1.12. There’s a blogpost about this here that explains why uses of threadid like this cause race conditions. PSA: Thread-local state is no longer recommended. It should really be updated to mention OhMyThreads.jl though
Instead of using treadid, just create your own “taskid” below
using Base.Threads
numofdata = 100
@show rawdata = rand(numofdata)
# prepare the processed_data
processed_data = zeros(numofdata)
NumOfThreads = Threads.nthreads()
# Manually set the number of threads to 4
# NumOfThreads = 4
# Sanity check
if NumOfThreads > length(rawdata)
NumOfThreads = length(rawdata)
end
startpos = 1
endpos = numofdata
function CalcChunk(NumOfThreads,i,startpos,endpos)
numofdata = endpos - (startpos - 1)
chunksize = numofdata ÷ NumOfThreads
firstpos = startpos + chunksize * (i - 1)
lastpos = startpos + chunksize * i - 1
if i == NumOfThreads
# The last thread takes up any remainding elements
lastpos = endpos
end
return (firstpos,lastpos)
end
Threads.@threads for taskid = 1:NumOfThreads
global NumOfThreads, startpos, endpos
for pos = range( CalcChunk(NumOfThreads,taskid,startpos,endpos)... )
println("Task $(taskid), Working with rawdata[$pos] = $(rawdata[pos])")
individualdata = rawdata[pos]
processed_data[pos] = individualdata * individualdata
end
end
println("Done")
Actually, the idea behind the change (which happened already in Julia 1.7, not 1.12) is to make it simpler, not more complex. It’s unfortunate that so much of idiomatic Julia code used threadid, when it was never a good fit for Julia’s threading model.
A core idea behind Julia’s threading model is that the user is supposed to think in terms of tasks, and not threads. Threads are a resource provided by the operating system, which is handled transparently by the runtime. It is analogous to how a user should think of memory: Of course we can think about how much we consume, but we don’t have any control of where in memory an object is allocated or in which order on the heap a collection of variables are placed. Similar with threads: We can think about how well our program makes use of N threads provided by the operating system, but we should not reason about which task runs on which thread - and ideally, we shouldn’t even write code that depends on N threads being available.
Generally, this simplifies Julia code, but there are some cases where the user is forced to think about threads explicitly. E.g. if you call into some C code where each thread needs to be initialized independently. That’s a rare case though.
In your case, it IS especially tricky because you need to instantiate as few copies of ds as possible, while still guaranteeing that each concurrent task operates on a distinct copy (that’s how I take it, anyway). For this, I would do the same as @Mason suggests above.
Note that the underlying implementation in OhMyThreads still doesn’t express its abstraction in terms of threads, AFAICT, but instead expresses it as a number of iterations shared across a smaller number of tasks.
# setup
n = Threads.nthreads()
ds = mutable_datastructure()
dss = [deepcopy(ds) for _ in 1:n]
outputs = zeros(length(some_iterable))
Threads.@threads for (i, c) in enumerate(chunks(some_iterable; n=n))
local ds = dss[i]
for j in c
outputs[j] = computation!(ds, j)
end
end
@Mason thank you, to clarify: does the above code also work if instead of ds = xxx() I do ds = vector[i]? That is, let’s assume that creating the mutable data structure is expensive and I want to avoid re-doing it often. Or, does this code automatically create only one instance of ds per thread?
Sorry @Datseris I’m having a lot of trouble understanding the way you’ve phrased parts of your question, so maybe this isn’t what you’re asking, but I think the core of your question comes down to the last sentence:
No, it creates one instance of ds per task. You may choose to have only one task per thread, but you can also have 10000 tasks if you want (set with ntasks=10000). The default in OhMyThreads is for it to create a number of tasks that’s equal to the number of threads, but you can do more or less.
What @local does is create a TaskLocalValue (from TaskLocalValues.jl) before the loop, and then lets you access it in the loop. i.e. the code is equivalent to (but faster than)
using OhMyThreads: tmap, TaskLocalValue
const ds = TaskLocalValue(() -> mutable_datastructure())
outputs = map(some_iterable) do j
computation!(ds[], j)
end
Each time ds is accessed on a new Task, the function () -> mutable_datastructure() is executed to create a new task-local-value to use in the computation.
Thanks, yes this answers it, sorry I am still trying to wrap my head around everything. One task per thread is what I want so I will leave the snippet you pasted as is and now incorporate it into all the libraries I’ve gotten the automated issue opened that warns about the old behavior problems in 1.12 (and before).
For a simple for loop where data races are not a problem such as
using Base.Threads
function foo(x)
xsq = Vector{Float64}(undef, length(x))
@threads for i in 1:length(x)
xsq[i] = x[i]^2
end
return xsq
end
foo(0:10_000)
Is this still the recommended method or would OhMyThreads be preferred?
Threads.@threads for j in some_iterable
i = Threads.threadid()
ds = dss[i]
outputs[j] = computation!(ds, j)
end
is that the data in dss is shared between tasks/threads and therefore access needs to be synchronized. Relying on threadid can only ever work if each loop body is run to completion before any other thread could reuse the data, i.e., it breaks if the code yields somewhere and another task – on the same thread – takes over.
Instead, channels can be used to coordinate access to the shared resources – as you already did in your # post v1.12 way example. This pattern can be abstracted a bit with some helper functions, e.g.,
function make_shared_resources(n, constructor)
res = [constructor() for _ in 1:n]
ch = Channel{eltype(res)}(n)
for x in res
put!(ch, x)
end
ch
end
function with_resource(f, resource)
x = take!(resource) # obtain shared resource
try
f(x) # do your work
finally
put!(resource, x) # put resource back
end
end
# Your example
dss = make_shared_resources(Threads.nthreads(), mutable_datastructure)
Threads.@threads for j in some_iterable
with_resource(dss) do ds
outputs[j] = computation!(ds, j)
end
end
function main()
N = 100
dss = OncePerThread{Vector{Float64}}() do
zeros(N)
end
res = zeros(N)
Threads.@threads for i in 1:N
ds = dss()
res[i] = computation!(ds,i)
end
return res
end
You should use OncePerTask in that setup. The problem is that @threads is a misnomer, it should have been @tasks, but the name was coined a long time ago when tasks did not migrate between threads.
function main()
N = 100
dss = OncePerThread{Vector{Float64}}() do
zeros(N)
end
res = zeros(N)
@sync for i in 1:N
@spawn begin
ds = dss()
res[i] = computation!(ds,i)
end
end
return res
end
It’s ok, but inefficient since each @spawn creates a new task, so dss() is called just once per task anyway. But your former example, with @threads would work fine with the OncePerTask. The @threads macro creates a number of tasks, each with a for loop inside.
Ok thats pretty much my use case 90% of time so I just replace the constructor with OncePerTask and the inner with the call instead of the indexing to id. This is far from ugly actually I just don’t like the whole Channel closure thing
Thank you !
function main()
N = 100
dss = OncePerTask{Vector{Float64}}() do
zeros(N)
end
res = zeros(N)
Threads.@threads for i in 1:N
ds = dss()
res[i] = computation!(ds,i)
end
return res
end