Safe loop with push! multi-threading

Hello,

I would like to use multi-threading to speed-up a for loop.
For each instance, an expensive computation is done and then the result is pushed at the end of an array. Here is toy example:

x = collect(1:10)
f(x) = x+1
xlist = []
idx = []
Threads.@threads for i =1:length(x)
    xnew = f(x[i])
    push!(xlist, deepcopy(xnew))
    push!(idx, deepcopy(i))
end

However, the previous is not safe for multi-threading
xlist is different at each run and the elements are not ordered.

I have tried ThreadedIterables https://github.com/marekdedic/ThreadedIterables.jl but I don’t get any speed-up.

I don’t think you need deepcopy but you can’t just push! in this case because the operation is not atomic, try this instead:

An approach I like to do is have a Channel where the result is pushed into and spawn a task for the multithreaded loop. This way, a different task can take! from the channel and finish the aggregation safely.

I am trying to adapt the code of Thread-safe array building - #2 by yuyichao for Array{Float64,1}:

num_monte = Int(1e6)
solution_data = Vector{Array{Float64,1}}()
for i in 1:Threads.nthreads()
  push!(solution_data, Array{Float64,1}[])
end
Threads.@threads for i in 1:num_monte
  push!(solution_data[Threads.threadid()],[1.0; 2.0])
end

TaskFailedException:
MethodError: Cannot convert an object of type Array{Float64,1} to an object of type Float64
Closest candidates are:
convert(::Type{T}, !Matched::Ratios.SimpleRatio{S}) where {T<:AbstractFloat, S} at /home/mat/.julia/packages/Ratios/uRs4y/src/Ratios.jl:14
convert(::Type{T}, !Matched::T) where T<:Number at number.jl:6
convert(::Type{T}, !Matched::Number) where T<:Number at number.jl:7

Stacktrace:
[1] push!(::Array{Float64,1}, ::Array{Float64,1}) at ./array.jl:912
[2] macro expansion at ./In[87]:7 [inlined]
[3] (::var"#804#threadsfor_fun#28"{UnitRange{Int64}})(::Bool) at ./threadingconstructs.jl:61
[4] (::var"#804#threadsfor_fun#28"{UnitRange{Int64}})() at ./threadingconstructs.jl:28

Stacktrace:
[1] wait(::Task) at ./task.jl:267
[2] macro expansion at ./threadingconstructs.jl:69 [inlined]
[3] macro expansion at ./util.jl:175 [inlined]
[4] top-level scope at ./In[87]:6

Thank you for your answer,

I am not familiar with Channel in Julia, can you Channel-style rewriting of the toy problem?

In this particular example, the best approach would probably be to preallocate xlist and idx:

x = collect(1:10)
f(x) = x+1
xlist = Array{Int}(undef, length(x))
idx = Array{Int}(undef, length(x))
Threads.@threads for i =1:length(x)
    xnew = f(x[i])
    xlist[i] = xnew
    idx[i] = i
end

Preallocating arrays should always be prefered if you know the size in advance, since when resizing arrays, often times new memory needs to be allocated, which is quite expensive.

2 Likes

The code in the OP tries to implement threaded map. I recommend using existing solutions. For example:

You also need to know the output type of f which is impossible in general.

8 Likes

From GitHub - tro3/ThreadPools.jl: Improved thread management for background and nonuniform tasks in Julia. Docs at https://tro3.github.io/ThreadPools.jl how about:

julia> ThreadPools.tmap(collect(enumerate(x))) do (i,xx)
           deepcopy(i), deepcopy(f(xx))
       end
10-element Array{Tuple{Int64,Int64},1}:
 (1, 2)
 (2, 3)
 (3, 4)
 (4, 5)
 (5, 6)
 (6, 7)
 (7, 8)
 (8, 9)
 (9, 10)
 (10, 11)

Each entry is a separate thread. Order is guaranteed.

1 Like