I want to use Base.OncePerThread but it’s not clear what the intended usage is.
Is the following a correct / safe / intended way to use Base.OncePerThread ?
finalset = Set()
tempsets = OncePerThread() do
return Set()
end
Threads.@threads for i in 1:100
push!(tempsets(), rand("qwe", 3) |> String)
end
usedtempsets = @view tempsets.xs[tempsets.ss .== 0x01]
union!(finalset, usedtempsets...)
finalset
If this is intended why is it such a pain to get the per-thread values out again?
Just tempsets.xs[tempsets.ss .== 0x01] without the @view errors and OncePerThread doesn’t support iterate so am I not supposed to access the memory after the threads are done with it?
Contrary to what the name of the macro implies, @threads does not launch OS threads (which OncePerThread is referring to), but tasks (which is what OncePerTask is about). I believe OncePerThread is mostly intended for doing per-OS-thread initialization of third party C libraries that assume OS threads as the means of concurrency.
I think most of the pain here stems from the fact that Julia’s parallelism is really focused on Tasks not on threads.
That being said: using OhMyThreads.jl you can write:
using OhMyThreads: tmap, chunks
tmpsets = tmap(chunks(1:100); n=Threads.nthreads()) do chunk
tmpset = Set()
for i in chunk
push!(tempset, rand("qwe", 3) |> String)
end
return tmpset
end
reduce(union!, tmpsets)
Indeed, the reason I’m using OncePerThread in the example is because Set is not thread safe and I don’t want to use locks since I don’t need to.
So all OncePerThread is supposed to be doing in the example is allocating memory that each thread “owns” and then doing clean up after they are done by collecting all their work.
This is a pattern I often use, and I’m interested if OhMyThreads provide anything pattern for this. I couldn’t find any. Perhaps @Mason knows?
Note the specific issues here:
The reduction operation is not thread safe, but OhMyThreads’ @set reduce seems to assume thread-safety.
The reduction must happen after all the sets have been computed in parallel, and not at each iteration. Does OhMyThreads provide functionality for this?
We require task-local state (a Set each)
Here is one solution. It works, but it’s not super efficient:
trytake! should really be in Base, and could use Channel internals to be faster
Here, I only create one set per concurrently running task. This may not be needed - perhaps one per total task would be fine.
There’s a bunch of overhead in the congested channel, which might be an issue.
using OhMyThreads: @tasks
function foo(n::Int, ntasks::Int=Threads.nthreads())
ntasks < 1 && throw(ArgumentError("Must use at least one task"))
ch = Channel{Set{String}}(Inf)
@tasks for i in 1:n
@set ntasks = ntasks
set = @something trytake!(ch) Set{String}()
push!(set, String(rand("abcdefghijklmnopqrstuvwxyz", 3)))
put!(ch, set)
end
close(ch)
@lock ch reduce(union!, ch, init=Set{String}())
end
function trytake!(ch::Channel)
isempty(ch) && return nothing
@lock ch begin
isempty(ch) ? nothing : Some(take!(ch))
end
end