How to iterate in a thread safe way?

I have the following code:

λ_noms = sort(unique(vcat(7:0.1:9.5, 7.8:0.02:8.2, 7.98:0.005:8.02)))
rel_sigmas = zeros(length(λ_noms))
i = 1
Threads.@threads for λ_nom in λ_noms
    println("lambda_nom: $λ_nom")
    rel_sigma_ = 100 * rel_sigma(λ_nom=λ_nom)
    rel_sigmas[i] = rel_sigma_
    i += 1
end

Is that thread safe? Is there a cleaner way to index the resulting array?

You could use

Threads.@threads for (i,λ_nom) in pairs(λ_noms)
...
end

And, no, I don’t think updating an external i is thread-safe.

Thanks a lot! Can you explain what the pairs() function actually does?

You’re looking for the enumerate function here rather than pairs.

2 Likes

It returns a pair with the index of the element, and the element of the collection.

Maybe it is even safer to do this:

Threads.@threads for i in eachindex(λ_noms, rel_sigmas)
    println("lambda_nom: $(λ_nom[i])")
    rel_sigma_ = 100 * rel_sigma(λ_nom=λ_nom[i])
    rel_sigmas[i] = rel_sigma_
end

Which will error if the indexes of λ_noms and rel_sigmas are not the same (which I think you expect so).

1 Like

Indeed, there is the possible complication of the indexes of your lambda_noms not having the same range of indexes than your output vector. If that is not the case, I would probably go with eachindex, to guarantee that applies.

Otherwise use enumerate, with care.

1 Like

How can you guys use pairs or enumerate with Thread.@threads? It always resulted in errors in my experience.

2 Likes

I’m going to be honest; after looking at the docstring for both pairs and enumerate, I think I just got confused about the difference between them :zipper_mouth_face: . Neither one really guards against issues with mismatched array indices; I absolutely agree that using eachindex is definitely the best practice here.

I guess you’ll see the difference if using an offset array:


julia> x = OffsetArray(1:3, -2)
1:3 with indices -1:1

julia> for (i, val) in pairs(x)
           @show i, val
       end
(i, val) = (-1, 1)
(i, val) = (0, 2)
(i, val) = (1, 3)

julia> for (i, val) in enumerate(x)
           @show i, val
       end
(i, val) = (1, 1)
(i, val) = (2, 2)
(i, val) = (3, 3)

Thus enumerate was the right choice there (although both pairs and enumerate are giving the same indexes there), for the code he posted. But if he had initialized with this:

rel_sigmas = similar(λ_noms)

pairs would be the right choice.

If you wanted to increment i, consider using atomics.

julia> i = Threads.Atomic{Int}(0)
Base.Threads.Atomic{Int64}(0)

julia> Threads.@threads for _ in 1:1000
           Threads.atomic_add!(i,1)
       end

julia> i[]
1000

Well, I think this is really an overkill for the given task…

1 Like

I’m not sure to which extent this example is representative of your real task but, in this case at least, it falls into the category of classical higher order functions (map in this case) and you’d probably be better off using a battle-tested parallel implementation such as what ThreadsX provides:

julia> λ_noms = sort(unique(vcat(7:0.1:9.5, 7.8:0.02:8.2, 7.98:0.005:8.02)));

julia> rel_sigma(x) = exp(rand()*x)
rel_sigma (generic function with 1 method)

julia> using ThreadsX
julia> rel_sigmas = ThreadsX.map(rel_sigma, λ_noms)
48-element Vector{Float64}:
   40.6612060246467
   16.49068401058819
  109.36091646835631
  [...]
5 Likes

This fails:

function rel_sigma(;λ_nom)
    sleep(1)
    rand()
end

λ_noms = sort(unique(vcat(7:0.1:9.5, 7.8:0.02:8.2, 7.98:0.005:8.02)))
rel_sigmas = zeros(length(λ_noms))
Threads.@threads for (i, λ_nom) in pairs(λ_noms)
    println("lambda_nom: $λ_nom")
    rel_sigma_ = 100 * rel_sigma(λ_nom=λ_nom)
    rel_sigmas[i] = rel_sigma_
end

with the exception:

julia> @time include("src/mwe4.jl")
ERROR: LoadError: TaskFailedException

    nested task error: MethodError: no method matching firstindex(::Base.Pairs{Int64, Float64, LinearIndices{1, Tuple{Base.OneTo{Int64}}}, Vector{Float64}})

Oh, probably that’s what Martin meant. The eachindex option certainly works.

1 Like

Sometimes you need a collect to actually construct an array, i.e.

Threads.@threads for pair in collect(enumerate(myarr))
    i, x = pair
    # do something
end

This usually works for me.

Edit: the eachindex is probably the best way to go, as it avoids collect and is a bit safer.

2 Likes

For now I use:

using ThreadsX

function rel_sigma(λ_nom)
    sleep(1)
    rand()
end

λ_noms = sort(unique(vcat(7:0.1:9.5, 7.8:0.02:8.2, 7.98:0.005:8.02)))
rel_sigmas = ThreadsX.map(rel_sigma, λ_noms)

which seams to work fine. Of course it would also be nice if an approach without any extra dependency would would, I try that later…

1 Like

This works:

julia> Threads.@threads for i in eachindex(λ_noms, rel_sigmas)
           λ_nom = λ_noms[i]
           println("lambda_nom: $(λ_nom)")
           rel_sigma_ = 100 * rel_sigma(λ_nom)
           rel_sigmas[i] = rel_sigma_
       end
lambda_nom: 7.0
lambda_nom: 7.1
lambda_nom: 7.2
lambda_nom: 7.3

(this one requires that λ_noms and rel_sigmas have the same indices)

ps: Using for i in 1:length(λ_noms) would do the same here, without the above guarantee, and possibly failing if the arrays have strange indexing (like offset arrays).

(and sorry for the previous confusion)

1 Like

Ah, that’s nice. This also works:

v = LinRange(0,1,20)
Threads.@threads for (i,x) in collect(enumerate(v))
       println(x)
end

But yeah, I don’t think is worth the allocation.

It should be nice to make this work without the collect, though. It seems hard because enumerate(x) doesn’t have a supertype and @threads wants a firstindex (easy, as we can do first(enumerate(v))) but also getindex method… Maybe this one is a small thing to add to possible improvements of @threads

1 Like

Probably pairs and enunerate were thought to use with non-indexable collections, mainly.

Maybe to implement the first and getindex then some sort of trait would be required, for the input, and that’s not part of the arrays interface.

1 Like