Julia multithreading

I am new to Julia and want to multithread a portion of my code in Julia. Below is the code I have written, but it is throwing an error. Could anyone please help me in this regard?

            t2new = SharedArray{Float64}(nocc,nocc,nvir,nvir)

            #for (i,j) in pair_ls
             Threads.@threads for i in 1:nocc
                for j in 1:i

                     @fastmath tau_ij_ = @views tau[i,j,:,:]

                     @inbounds begin
                        @tensoropt (k=>x, t=>100x, d=>100x, c=>100x, a=>100x, b=>100x) begin
                            ttmp1[k,t,c] = Lov_T120[k,d,t]*tau_ij_[c,d]
                            ttmp2[k,a]  = ttmp1[k,t,c]*Lvv_df[t,a,c]
                            temp[a,b] = (-t1[k,b]*ttmp2[k,a])

                            ttmp3[k,t,d] = Lov_T120[k,c,t]*tau_ij_[c,d]
                            ttmp4[k,b]  = ttmp3[k,t,d]*Lvv_df[t,b,d]
                            temp[a,b] -= t1[k,a]*ttmp4[k,b]
                        end

                        Threads.atomic_add!(t2new[i,j,:,:], temp)
                        if i != j
                            Threads.atomic_add!(t2new[j,i,:,:], transpose(temp))
                        end
                     end  #@inbounds
                end
             end

JULIA: TaskFailedException
Stacktrace:
  [1] wait
    @ ./task.jl:345 [inlined]
  [2] threading_run(fun::var"#141#threadsfor_fun#3"{var"#141#threadsfor_fun#1#4"{Matrix{Float64}, Array{Float64, 3}, Array{Float64, 3}, UnitRange{Int64}}}, static::Bool)
    @ Base.Threads ./threadingconstructs.jl:38
  [3] macro expansion
    @ ./threadingconstructs.jl:89 [inlined]
  [4] update_cc_amps(t1::Matrix{Float64}, t2::Array{Float64, 4}, eris::PyObject, ovov::Array{Float64, 4}, oovv::Array{Float64, 4}, ovvv::Array{Float64, 4}, ovoo::Array{Float64, 4}, oooo::Array{Float64, 4}, fock::Matrix{Float64}, foo::Matrix{Float64}, fov::Matrix{Float64}, fvo::Matrix{Float64}, fvv::Matrix{Float64}, Lov_df::Array{Float64, 3}, Lvv_df::Array{Float64, 3}, Loo_df::Array{Float64, 3})
    @ Main ~/bagh_install/bagh_1/bagh/bagh_code/ccsd/ccsd_incr4_multhreading.jl:459
  [5] macro expansion
    @ ./timing.jl:262 [inlined]
  [6] cc_iter(t1::Matrix{Float64}, t2::Array{Float64, 4}, eris::PyObject, cc_input::PyObject)
    @ Main ~/bagh_install/bagh_1/bagh/bagh_code/ccsd/ccsd_incr4_multhreading.jl:231
  [7] invokelatest(::Any, ::Any, ::Vararg{Any}; kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
    @ Base ./essentials.jl:729
  [8] invokelatest(::Any, ::Any, ::Vararg{Any})
    @ Base ./essentials.jl:726
  [9] _pyjlwrap_call(f::Function, args_::Ptr{PyCall.PyObject_struct}, kw_::Ptr{PyCall.PyObject_struct})
    @ PyCall ~/.julia/packages/PyCall/twYvK/src/callback.jl:28
 [10] pyjlwrap_call(self_::Ptr{PyCall.PyObject_struct}, args_::Ptr{PyCall.PyObject_struct}, kw_::Ptr{PyCall.PyObject_struct})
    @ PyCall ~/.julia/packages/PyCall/twYvK/src/callback.jl:44

    nested task error: MethodError: no method matching atomic_add!(::Matrix{Float64}, ::Matrix{Float64})
    Stacktrace:
     [1] macro expansion
       @ ~/bagh_install/bagh_1/bagh/bagh_code/ccsd/ccsd_incr4_multhreading.jl:475 [inlined]
     [2] (::var"#141#threadsfor_fun#3"{var"#141#threadsfor_fun#1#4"{Matrix{Float64}, Array{Float64, 3}, Array{Float64, 3}, UnitRange{Int64}}})(tid::Int64; onethread::Bool)
       @ Main ./threadingconstructs.jl:84
     [3] #141#threadsfor_fun
       @ ./threadingconstructs.jl:51 [inlined]
     [4] (::Base.Threads.var"#1#2"{var"#141#threadsfor_fun#3"{var"#141#threadsfor_fun#1#4"{Matrix{Float64}, Array{Float64, 3}, Array{Float64, 3}, UnitRange{Int64}}}, Int64})()
       @ Base.Threads ./threadingconstructs.jl:30>```

You can’t add matrix to a scalar without broadcasting

But also, you don’t need shared array for this, threading is memory-shared

But both t2new and temp are arrays of the same size.

The core of the error message is:

 nested task error: MethodError: no method matching atomic_add!(::Matrix{Float64}, ::Matrix{Float64})
    Stacktrace:
     [1] macro expansion
       @ ~/bagh_install/bagh_1/bagh/bagh_code/ccsd/ccsd_incr4_multhreading.jl:475 [inlined]
     [2] (::var"#141#threadsfor_fun#3"{var"#141#threadsfor_fun#1#4"{Matrix{Float64}, Array{Float64, 3}, Array{Float64, 3}, UnitRange{Int64}}})(tid::Int64; onethread::Bool)
       @ Main ./threadingconstructs.jl:84
     [3] #141#threadsfor_fun

Threads.atomic_add! is only defined for Threads.Atomic not for arrays. You will want to use Atomix.jl which extends the @atomic macro in base to work for Arrays.

Furthermore t2new[i,j,:,:] creates a slice and not a view. So it won’t have the effect you want. As @jling said you don’t need SharedArrays for this.

On top of that atomic_add! or @atomic only works for scalars and not whole arrays.

1 Like
This is the code now I am using, but still showing error.
        @fastmath @inbounds ttmp1 = Array{Float64,3}(undef,(nocc,naux,nvir))
        @fastmath @inbounds ttmp2 = Array{Float64,2}(undef,(nocc,nvir))
        @fastmath @inbounds ttmp3 = Array{Float64,3}(undef,(nocc,naux,nvir))
        @fastmath @inbounds ttmp4 = Array{Float64,2}(undef,(nocc,nvir))
        @fastmath @inbounds ttmp5 = Array{Float64,3}(undef, (naux,nvir,nvir))
        @fastmath @inbounds temp  = Array{Float64,2}(undef,(nvir,nvir))


        @fastmath @inbounds Lov_T120 = permutedims(Lov_df,(2,3,1))

    #    t2new = SharedArray{Float64}(nocc,nocc,nvir,nvir)

        #for (i,j) in pair_ls
         Threads.@threads for i in 1:nocc
            for j in 1:i

                 @fastmath tau_ij_ = @views tau[i,j,:,:]

                 @inbounds begin
                    @tensoropt (k=>x, t=>100x, d=>100x, c=>100x, a=>100x, b=>100x) begin
                        ttmp1[k,t,c] = Lov_T120[k,d,t]*tau_ij_[c,d]
                        ttmp2[k,a]  = ttmp1[k,t,c]*Lvv_df[t,a,c]
                        temp[a,b] = (-t1[k,b]*ttmp2[k,a])

                        ttmp3[k,t,d] = Lov_T120[k,c,t]*tau_ij_[c,d]
                        ttmp4[k,b]  = ttmp3[k,t,d]*Lvv_df[t,b,d]
                        temp[a,b] -= t1[k,a]*ttmp4[k,b]
                    end

                    @atomic t2new[i,j,:,:]+= temp
                    #t2new[i,j,:,:] += temp
                    if i != j
                        @atomic t2new[j,i,:,:]+= temp
                        #t2new[i,j,:,:] += transpose(temp)
                    end
                 end  #@inbounds
            end
         end

JULIA: TaskFailedException
Stacktrace:
  [1] wait
    @ ./task.jl:345 [inlined]
  [2] threading_run(fun::var"#141#threadsfor_fun#3"{var"#141#threadsfor_fun#1#4"{Matrix{Float64}, Array{Float64, 3}, Array{Float64, 3}, UnitRange{Int64}}}, static::Bool)
    @ Base.Threads ./threadingconstructs.jl:38
  [3] macro expansion
    @ ./threadingconstructs.jl:89 [inlined]
  [4] update_cc_amps(t1::Matrix{Float64}, t2::Array{Float64, 4}, eris::PyObject, ovov::Array{Float64, 4}, oovv::Array{Float64, 4}, ovvv::Array{Float64, 4}, ovoo::Array{Float64, 4}, oooo::Array{Float64, 4}, fock::Matrix{Float64}, foo::Matrix{Float64}, fov::Matrix{Float64}, fvo::Matrix{Float64}, fvv::Matrix{Float64}, Lov_df::Array{Float64, 3}, Lvv_df::Array{Float64, 3}, Loo_df::Array{Float64, 3})
    @ Main ~/bagh_install/bagh_1/bagh/bagh_code/ccsd/ccsd_incr4_multhreading.jl:460
  [5] macro expansion
    @ ./timing.jl:262 [inlined]
  [6] cc_iter(t1::Matrix{Float64}, t2::Array{Float64, 4}, eris::PyObject, cc_input::PyObject)
    @ Main ~/bagh_install/bagh_1/bagh/bagh_code/ccsd/ccsd_incr4_multhreading.jl:232
  [7] invokelatest(::Any, ::Any, ::Vararg{Any}; kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
    @ Base ./essentials.jl:729
  [8] invokelatest(::Any, ::Any, ::Vararg{Any})
    @ Base ./essentials.jl:726
  [9] _pyjlwrap_call(f::Function, args_::Ptr{PyCall.PyObject_struct}, kw_::Ptr{PyCall.PyObject_struct})
    @ PyCall ~/.julia/packages/PyCall/twYvK/src/callback.jl:28
 [10] pyjlwrap_call(self_::Ptr{PyCall.PyObject_struct}, args_::Ptr{PyCall.PyObject_struct}, kw_::Ptr{PyCall.PyObject_struct})
    @ PyCall ~/.julia/packages/PyCall/twYvK/src/callback.jl:44
nested task error: MethodError: Cannot `convert` an object of type 
  Matrix{Float64} to an object of type 
  Union{Atomix.Internal.IndexableRef{Array{Float64, 4}, Tuple{Int64}}, Atomix.Internal.IndexableRef{Array{Float64, 4}, NTuple{4, Int64}}}
Closest candidates are:
  convert(::Type{T}, !Matched::T) where T at Base.jl:61
Stacktrace:
 [1] asstorable(ref::Matrix{Union{Atomix.Internal.IndexableRef{Array{Float64, 4}, Tuple{Int64}}, Atomix.Internal.IndexableRef{Array{Float64, 4}, NTuple{4, Int64}}}}, v::Matrix{Float64})
   @ Atomix.Internal ~/.julia/packages/Atomix/F9VIX/src/core.jl:37
 [2] modify!(ref::Matrix{Union{Atomix.Internal.IndexableRef{Array{Float64, 4}, Tuple{Int64}}, Atomix.Internal.IndexableRef{Array{Float64, 4}, NTuple{4, Int64}}}}, op::typeof(+), x::Matrix{Float64}, ord::UnsafeAtomics.Internal.LLVMOrdering{:seq_cst})
   @ Atomix.Internal ~/.julia/packages/Atomix/F9VIX/src/core.jl:29
 [3] macro expansion
   @ ~/bagh_install/bagh_1/bagh/bagh_code/ccsd/ccsd_incr4_multhreading.jl:476 [inlined]
 [4] (::var"#141#threadsfor_fun#3"{var"#141#threadsfor_fun#1#4"{Matrix{Float64}, Array{Float64, 3}, Array{Float64, 3}, UnitRange{Int64}}})(tid::Int64; onethread::Bool)
   @ Main ./threadingconstructs.jl:84
 [5] #141#threadsfor_fun
   @ ./threadingconstructs.jl:51 [inlined]
 [6] (::Base.Threads.var"#1#2"{var"#141#threadsfor_fun#3"{var"#141#threadsfor_fun#1#4"{Matrix{Float64}, Array{Float64, 3}, Array{Float64, 3}, UnitRange{Int64}}}, Int64})()
   @ Base.Threads ./threadingconstructs.jl:30>

Do you know how to work on arrays or any other way to write this code to implement multithreading? Any other syntax other than @atomic or Atomic_add! .

As the task failure says you are trying to use a whole array on the right-hand side of the operation.

@atomic t2new[j,i,:,:]+= temp

Floops.jl has some support for generalized reduction of parallel loops. Or you could use locks.

But I would recommend taking a closer look at your code and thinking about what operations need to happen atomically. You have one ttmp1 array as an example and you have many threads writing to the same data location.

@fastmath @inbounds ttmp1 = Array{Float64,3}(undef,(nocc,naux,nvir)) Neither @fastmath nor @inbounds makes sense in that statement.

1 Like

Now I have used locks but the code is in an infinite loop . Basically, this multithreaded portion of my code is also nested within a for loop 1 to 45. But the code should converge within the number of total iterations(45). But from the start of the iteration, it’s giving the wrong answer and finished at i = 45. The serial version of my code is working fine and converging after 33 iterations but when I am adding multithreading and locks it does not converge and also gives the wrong answer from the 1st iteration.

        for i in 1: 45
           # some tensor operations.

            ttmp1 = Array{Float64,3}(undef,(nocc,naux,nvir))
            ttmp2 = Array{Float64,2}(undef,(nocc,nvir))
            ttmp3 = Array{Float64,3}(undef,(nocc,naux,nvir))
            ttmp4 = Array{Float64,2}(undef,(nocc,nvir))
            ttmp5 = Array{Float64,3}(undef, (naux,nvir,nvir))
            temp  = Array{Float64,2}(undef,(nvir,nvir))


            Lov_T120 = permutedims(Lov_df,(2,3,1))
            lk = ReentrantLock()

           #t2new = SharedArray{Float64}(nocc,nocc,nvir,nvir)

            #for (i,j) in pair_ls
             Threads.@threads for i in 1:nocc
                for j in 1:i

                     @fastmath tau_ij_ = @views tau[i,j,:,:]

                     @inbounds begin
                        @tensoropt (k=>x, t=>100x, d=>100x, c=>100x, a=>100x, b=>100x) begin
                            ttmp1[k,t,c] = Lov_T120[k,d,t]*tau_ij_[c,d]
                            ttmp2[k,a]  = ttmp1[k,t,c]*Lvv_df[t,a,c]
                            temp[a,b] = (-t1[k,b]*ttmp2[k,a])

                            ttmp3[k,t,d] = Lov_T120[k,c,t]*tau_ij_[c,d]
                            ttmp4[k,b]  = ttmp3[k,t,d]*Lvv_df[t,b,d]
                            temp[a,b] -= t1[k,a]*ttmp4[k,b]
                            
                            ttmp5[t,a,d] = Lvv_df[t,a,c]*tau_ij_[c,d]
                            temp[a,b] += ttmp5[t,a,d]*Lvv_df[t,b,d]
                        end

                        begin
                            lock(lk)
                            try
                                t2new[i,j,:,:]+= temp
                                #t2new[i,j,:,:] += temp
                                if i != j
                                    t2new[j,i,:,:]+= temp
                                    #t2new[i,j,:,:] += transpose(temp)
                                end
                            finally
                                unlock(lk)
                            end
                        end

                     end  #@inbounds
                end
             end
        end

t2new is updated in every iteration and is used in the next iteration.

You have races on your temporary arrays. Each thread will need it’s own temporary array.

3 Likes