What is the correct way to use Base.OncePerThread?

I want to use Base.OncePerThread but it’s not clear what the intended usage is.

Is the following a correct / safe / intended way to use Base.OncePerThread ?

finalset = Set()
tempsets = OncePerThread() do
    return Set()
end
Threads.@threads for i in 1:100
    push!(tempsets(), rand("qwe", 3) |> String)
end
usedtempsets = @view tempsets.xs[tempsets.ss .== 0x01]
union!(finalset, usedtempsets...)
finalset

If this is intended why is it such a pain to get the per-thread values out again?

Just tempsets.xs[tempsets.ss .== 0x01] without the @view errors and OncePerThread doesn’t support iterate so am I not supposed to access the memory after the threads are done with it?

Contrary to what the name of the macro implies, @threads does not launch OS threads (which OncePerThread is referring to), but tasks (which is what OncePerTask is about). I believe OncePerThread is mostly intended for doing per-OS-thread initialization of third party C libraries that assume OS threads as the means of concurrency.

3 Likes

I think most of the pain here stems from the fact that Julia’s parallelism is really focused on Tasks not on threads.

That being said: using OhMyThreads.jl you can write:

using OhMyThreads: tmap, chunks

tmpsets = tmap(chunks(1:100); n=Threads.nthreads()) do chunk
    tmpset = Set()
    for i in chunk
        push!(tempset, rand("qwe", 3) |> String)
    end
    return tmpset
end
reduce(union!, tmpsets)

Which reads and feels much clearer to me.

1 Like

Indeed, the reason I’m using OncePerThread in the example is because Set is not thread safe and I don’t want to use locks since I don’t need to.

So all OncePerThread is supposed to be doing in the example is allocating memory that each thread “owns” and then doing clean up after they are done by collecting all their work.

That example doesn’t seem to work
chunks complains that it needs a number of chunks and then tmap gives me an

UndefVarError: `threadpool` not defined in `OhMyThreads.Implementation`

I’m using OhMyThreads v0.8.3

This is a pattern I often use, and I’m interested if OhMyThreads provide anything pattern for this. I couldn’t find any. Perhaps @Mason knows?

Note the specific issues here:

  • The reduction operation is not thread safe, but OhMyThreads’ @set reduce seems to assume thread-safety.
  • The reduction must happen after all the sets have been computed in parallel, and not at each iteration. Does OhMyThreads provide functionality for this?
  • We require task-local state (a Set each)

Here is one solution. It works, but it’s not super efficient:

  1. trytake! should really be in Base, and could use Channel internals to be faster
  2. Here, I only create one set per concurrently running task. This may not be needed - perhaps one per total task would be fine.
  3. There’s a bunch of overhead in the congested channel, which might be an issue.
using OhMyThreads: @tasks

function foo(n::Int, ntasks::Int=Threads.nthreads())
	ntasks < 1 && throw(ArgumentError("Must use at least one task"))
	ch = Channel{Set{String}}(Inf)
	@tasks for i in 1:n
	    @set ntasks = ntasks
	    set = @something trytake!(ch) Set{String}()
	    push!(set, String(rand("abcdefghijklmnopqrstuvwxyz", 3)))
	    put!(ch, set)
	end
	close(ch)
	@lock ch reduce(union!, ch, init=Set{String}())
end

function trytake!(ch::Channel)
    isempty(ch) && return nothing
    @lock ch begin
        isempty(ch) ? nothing : Some(take!(ch))
    end
end

Sorry for missing the argument to chunks. I typed this on my phone and could not double check.

Anyhow, I believe the follow-up error to be a bug and opened an issue for that.

Not sure if this is still relevant for you but I fixed the bug and so my example should work with OhMyThreads.jl 0.8.4 which was tagged ~1h ago.

1 Like