Safe loop with push! multi-threading

Hello,

I would like to use multi-threading to speed-up a for loop.
For each instance, an expensive computation is done and then the result is pushed at the end of an array. Here is toy example:

x = collect(1:10)
f(x) = x+1
xlist = []
idx = []
Threads.@threads for i =1:length(x)
    xnew = f(x[i])
    push!(xlist, deepcopy(xnew))
    push!(idx, deepcopy(i))
end

However, the previous is not safe for multi-threading
xlist is different at each run and the elements are not ordered.

I have tried ThreadedIterables https://github.com/marekdedic/ThreadedIterables.jl but I don’t get any speed-up.

I don’t think you need deepcopy but you can’t just push! in this case because the operation is not atomic, try this instead:

An approach I like to do is have a Channel where the result is pushed into and spawn a task for the multithreaded loop. This way, a different task can take! from the channel and finish the aggregation safely.

I am trying to adapt the code of Thread-safe array building - #2 by yuyichao for Array{Float64,1}:

num_monte = Int(1e6)
solution_data = Vector{Array{Float64,1}}()
for i in 1:Threads.nthreads()
  push!(solution_data, Array{Float64,1}[])
end
Threads.@threads for i in 1:num_monte
  push!(solution_data[Threads.threadid()],[1.0; 2.0])
end

TaskFailedException:
MethodError: Cannot convert an object of type Array{Float64,1} to an object of type Float64
Closest candidates are:
convert(::Type{T}, !Matched::Ratios.SimpleRatio{S}) where {T<:AbstractFloat, S} at /home/mat/.julia/packages/Ratios/uRs4y/src/Ratios.jl:14
convert(::Type{T}, !Matched::T) where T<:Number at number.jl:6
convert(::Type{T}, !Matched::Number) where T<:Number at number.jl:7

Stacktrace:
[1] push!(::Array{Float64,1}, ::Array{Float64,1}) at ./array.jl:912
[2] macro expansion at ./In[87]:7 [inlined]
[3] (::var"#804#threadsfor_fun#28"{UnitRange{Int64}})(::Bool) at ./threadingconstructs.jl:61
[4] (::var"#804#threadsfor_fun#28"{UnitRange{Int64}})() at ./threadingconstructs.jl:28

Stacktrace:
[1] wait(::Task) at ./task.jl:267
[2] macro expansion at ./threadingconstructs.jl:69 [inlined]
[3] macro expansion at ./util.jl:175 [inlined]
[4] top-level scope at ./In[87]:6

Thank you for your answer,

I am not familiar with Channel in Julia, can you Channel-style rewriting of the toy problem?

In this particular example, the best approach would probably be to preallocate xlist and idx:

x = collect(1:10)
f(x) = x+1
xlist = Array{Int}(undef, length(x))
idx = Array{Int}(undef, length(x))
Threads.@threads for i =1:length(x)
    xnew = f(x[i])
    xlist[i] = xnew
    idx[i] = i
end

Preallocating arrays should always be prefered if you know the size in advance, since when resizing arrays, often times new memory needs to be allocated, which is quite expensive.

The code in the OP tries to implement threaded map. I recommend using existing solutions. For example:

You also need to know the output type of f which is impossible in general.

From GitHub - tro3/ThreadPools.jl: Improved thread management for background and nonuniform tasks in Julia. Docs at https://tro3.github.io/ThreadPools.jl how about:

julia> ThreadPools.tmap(collect(enumerate(x))) do (i,xx)
           deepcopy(i), deepcopy(f(xx))
       end
10-element Array{Tuple{Int64,Int64},1}:
 (1, 2)
 (2, 3)
 (3, 4)
 (4, 5)
 (5, 6)
 (6, 7)
 (7, 8)
 (8, 9)
 (9, 10)
 (10, 11)

Each entry is a separate thread. Order is guaranteed.