CUDA performing scalar indexing when used along with Distributed

Hello all!

I am running a series of nested functions, some of which using CUDA.jl at high level CuArrays processing (dot products and/or matrix multiplications). When I run it single threaded it works perfectly (even if I directly specify CUDA.allowscalar(false))

When I try to run something as the following (below), it runs very slowly and warning that:

┌ Warning: Performing scalar indexing on task Task (runnable) @0x00000256a4e77650.
│ Invocation of getindex resulted in scalar indexing of a GPU array.
│ This is typically caused by calling an iterating implementation of a method.
│ Such implementations *do not* execute on the GPU, but very slowly on the CPU,
│ and therefore are only permitted from the REPL for prototyping purposes.

Example code:

MaxValues= SharedArray{Float32}(100)
MaxTimes= SharedArray{Float32}(100)

Base.@sync @distributed for i in 1:100
    a, b = _myFunction(
                input1, # CuArray
                input2,
                input3
            )
    MaxValues[i] = a
    MaxTimes[i] = b
end

Any idea of what may be causing it and points to consider to fix it?
Thanks a lot!

Hi,

Let me first provide a MWE, including error message (which helps :slight_smile: ).

using Distributed

addprocs(1)
@everywhere begin
    using CUDA
    x = CUDA.rand(10)
end

@sync @distributed for i = 1:2
    println(x)
end
Error message and stacktrace
ERROR: TaskFailedException

    nested task error: Unhandled Task ERROR: Scalar indexing is disallowed.
Invocation of getindex resulted in scalar indexing of a GPU array.
This is typically caused by calling an iterating implementation of a method.
Such implementations *do not* execute on the GPU, but very slowly on the CPU,
and therefore should be avoided.

If you want to allow scalar iteration, use `allowscalar` or `@allowscalar`
to enable scalar iteration globally or for the operations in question.
Stacktrace:
  [1] error(s::String)
    @ Base .\error.jl:35
  [2] errorscalar(op::String)
    @ GPUArraysCore (...)\.julia\packages\GPUArraysCore\GMsgk\src\GPUArraysCore.jl:155
  [3] _assertscalar(op::String, behavior::GPUArraysCore.ScalarIndexing)
    @ GPUArraysCore (...)\.julia\packages\GPUArraysCore\GMsgk\src\GPUArraysCore.jl:128
  [4] assertscalar(op::String)
    @ GPUArraysCore (...)\.julia\packages\GPUArraysCore\GMsgk\src\GPUArraysCore.jl:116
  [5] getindex
    @ (...)\.julia\packages\GPUArrays\qt4ax\src\host\indexing.jl:50 [inlined]
  [6] iterate
    @ .\abstractarray.jl:1217 [inlined]
  [7] iterate
    @ .\abstractarray.jl:1215 [inlined]
  [8] hash(A::CuArray{Float32, 1, CUDA.DeviceMemory}, h::UInt64)
    @ Base .\abstractarray.jl:3430
  [9] hash(x::CuArray{Float32, 1, CUDA.DeviceMemory})
    @ Base .\hashing.jl:30
 [10] serialize_global_from_main(s::Distributed.ClusterSerializer{Sockets.TCPSocket}, sym::Symbol)
    @ Distributed (...)\.julia\juliaup\julia-1.10.4+0.x64.w64.mingw32\share\julia\stdlib\v1.10\Distributed\src\clusterserialize.jl:151
 [11] #8
    @ (...)\.julia\juliaup\julia-1.10.4+0.x64.w64.mingw32\share\julia\stdlib\v1.10\Distributed\src\clusterserialize.jl:101 [inlined]
 [12] foreach
    @ .\abstractarray.jl:3097 [inlined]
 [13] serialize(s::Distributed.ClusterSerializer{Sockets.TCPSocket}, t::Core.TypeName)
    @ Distributed (...)\.julia\juliaup\julia-1.10.4+0.x64.w64.mingw32\share\julia\stdlib\v1.10\Distributed\src\clusterserialize.jl:101
 [14] serialize_type_data(s::Distributed.ClusterSerializer{Sockets.TCPSocket}, t::DataType)
    @ Serialization (...)\.julia\juliaup\julia-1.10.4+0.x64.w64.mingw32\share\julia\stdlib\v1.10\Serialization\src\Serialization.jl:560
 [15] serialize(s::Distributed.ClusterSerializer{Sockets.TCPSocket}, t::DataType)
    @ Serialization (...)\.julia\juliaup\julia-1.10.4+0.x64.w64.mingw32\share\julia\stdlib\v1.10\Serialization\src\Serialization.jl:595
 [16] serialize_type_data(s::Distributed.ClusterSerializer{Sockets.TCPSocket}, t::DataType)
    @ Serialization (...)\.julia\juliaup\julia-1.10.4+0.x64.w64.mingw32\share\julia\stdlib\v1.10\Serialization\src\Serialization.jl:578
 [17] serialize_type(s::Distributed.ClusterSerializer{Sockets.TCPSocket}, t::DataType, ref::Bool)
    @ Serialization (...)\.julia\juliaup\julia-1.10.4+0.x64.w64.mingw32\share\julia\stdlib\v1.10\Serialization\src\Serialization.jl:602
 [18] serialize_any(s::Distributed.ClusterSerializer{Sockets.TCPSocket}, x::Any)
    @ Serialization (...)\.julia\juliaup\julia-1.10.4+0.x64.w64.mingw32\share\julia\stdlib\v1.10\Serialization\src\Serialization.jl:671
 [19] serialize(s::Distributed.ClusterSerializer{Sockets.TCPSocket}, x::Any)
    @ Serialization (...)\.julia\juliaup\julia-1.10.4+0.x64.w64.mingw32\share\julia\stdlib\v1.10\Serialization\src\Serialization.jl:655
 [20] serialize_msg(s::Distributed.ClusterSerializer{Sockets.TCPSocket}, o::Distributed.CallMsg{:call})
    @ Distributed (...)\.julia\juliaup\julia-1.10.4+0.x64.w64.mingw32\share\julia\stdlib\v1.10\Distributed\src\messages.jl:78
 [21] #invokelatest#2
    @ .\essentials.jl:892 [inlined]
 [22] invokelatest
    @ .\essentials.jl:889 [inlined]
 [23] send_msg_(w::Distributed.Worker, header::Distributed.MsgHeader, msg::Distributed.CallMsg{:call}, now::Bool)
    @ Distributed (...)\.julia\juliaup\julia-1.10.4+0.x64.w64.mingw32\share\julia\stdlib\v1.10\Distributed\src\messages.jl:181
 [24] send_msg
    @ (...)\.julia\juliaup\julia-1.10.4+0.x64.w64.mingw32\share\julia\stdlib\v1.10\Distributed\src\messages.jl:122 [inlined]
 [25] #remotecall#156
    @ (...)\.julia\juliaup\julia-1.10.4+0.x64.w64.mingw32\share\julia\stdlib\v1.10\Distributed\src\remotecall.jl:436 [inlined]
 [26] remotecall
    @ (...)\.julia\juliaup\julia-1.10.4+0.x64.w64.mingw32\share\julia\stdlib\v1.10\Distributed\src\remotecall.jl:434 [inlined]
 [27] remotecall(::Function, ::Int64; kwargs::@Kwargs{})
    @ Distributed (...)\.julia\juliaup\julia-1.10.4+0.x64.w64.mingw32\share\julia\stdlib\v1.10\Distributed\src\remotecall.jl:447
 [28] remotecall
    @ (...)\.julia\juliaup\julia-1.10.4+0.x64.w64.mingw32\share\julia\stdlib\v1.10\Distributed\src\remotecall.jl:447 [inlined]
 [29] spawnat
    @ (...)\.julia\juliaup\julia-1.10.4+0.x64.w64.mingw32\share\julia\stdlib\v1.10\Distributed\src\macros.jl:11 [inlined]
 [30] spawn_somewhere
    @ (...)\.julia\juliaup\julia-1.10.4+0.x64.w64.mingw32\share\julia\stdlib\v1.10\Distributed\src\macros.jl:13 [inlined]
 [31] macro expansion
    @ (...)\.julia\juliaup\julia-1.10.4+0.x64.w64.mingw32\share\julia\stdlib\v1.10\Distributed\src\macros.jl:91 [inlined]
 [32] macro expansion
    @ .\task.jl:479 [inlined]
 [33] (::Distributed.var"#177#179"{var"#1#2", UnitRange{Int64}})()
    @ Distributed (...)\.julia\juliaup\julia-1.10.4+0.x64.w64.mingw32\share\julia\stdlib\v1.10\Distributed\src\macros.jl:278
Scalar indexing is disallowed.
    Invocation of getindex resulted in scalar indexing of a GPU array.
    This is typically caused by calling an iterating implementation of a method.
    Such implementations *do not* execute on the GPU, but very slowly on the CPU,
    and therefore should be avoided.

    If you want to allow scalar iteration, use `allowscalar` or `@allowscalar`
    to enable scalar iteration globally or for the operations in question.
    Stacktrace:
      [1] error(s::String)
        @ Base .\error.jl:35
      [2] errorscalar(op::String)
        @ GPUArraysCore (...)\.julia\packages\GPUArraysCore\GMsgk\src\GPUArraysCore.jl:155
      [3] _assertscalar(op::String, behavior::GPUArraysCore.ScalarIndexing)
        @ GPUArraysCore (...)\.julia\packages\GPUArraysCore\GMsgk\src\GPUArraysCore.jl:128
      [4] assertscalar(op::String)
        @ GPUArraysCore (...)\.julia\packages\GPUArraysCore\GMsgk\src\GPUArraysCore.jl:116
      [5] getindex
        @ (...)\.julia\packages\GPUArrays\qt4ax\src\host\indexing.jl:50 [inlined]
      [6] iterate
        @ .\abstractarray.jl:1217 [inlined]
      [7] iterate
        @ .\abstractarray.jl:1215 [inlined]
      [8] hash(A::CuArray{Float32, 1, CUDA.DeviceMemory}, h::UInt64)
        @ Base .\abstractarray.jl:3430
      [9] hash(x::CuArray{Float32, 1, CUDA.DeviceMemory})
        @ Base .\hashing.jl:30
     [10] serialize_global_from_main(s::Distributed.ClusterSerializer{Sockets.TCPSocket}, sym::Symbol)
        @ Distributed (...)\.julia\juliaup\julia-1.10.4+0.x64.w64.mingw32\share\julia\stdlib\v1.10\Distributed\src\clusterserialize.jl:151
     [11] #8
        @ (...)\.julia\juliaup\julia-1.10.4+0.x64.w64.mingw32\share\julia\stdlib\v1.10\Distributed\src\clusterserialize.jl:101 [inlined]
     [12] foreach
        @ .\abstractarray.jl:3097 [inlined]
     [13] serialize(s::Distributed.ClusterSerializer{Sockets.TCPSocket}, t::Core.TypeName)
        @ Distributed (...)\.julia\juliaup\julia-1.10.4+0.x64.w64.mingw32\share\julia\stdlib\v1.10\Distributed\src\clusterserialize.jl:101
     [14] serialize_type_data(s::Distributed.ClusterSerializer{Sockets.TCPSocket}, t::DataType)
        @ Serialization (...)\.julia\juliaup\julia-1.10.4+0.x64.w64.mingw32\share\julia\stdlib\v1.10\Serialization\src\Serialization.jl:560
     [15] serialize(s::Distributed.ClusterSerializer{Sockets.TCPSocket}, t::DataType)
        @ Serialization (...)\.julia\juliaup\julia-1.10.4+0.x64.w64.mingw32\share\julia\stdlib\v1.10\Serialization\src\Serialization.jl:595
     [16] serialize_type_data(s::Distributed.ClusterSerializer{Sockets.TCPSocket}, t::DataType)
        @ Serialization (...)\.julia\juliaup\julia-1.10.4+0.x64.w64.mingw32\share\julia\stdlib\v1.10\Serialization\src\Serialization.jl:578
     [17] serialize_type(s::Distributed.ClusterSerializer{Sockets.TCPSocket}, t::DataType, ref::Bool)
        @ Serialization (...)\.julia\juliaup\julia-1.10.4+0.x64.w64.mingw32\share\julia\stdlib\v1.10\Serialization\src\Serialization.jl:602
     [18] serialize_any(s::Distributed.ClusterSerializer{Sockets.TCPSocket}, x::Any)
        @ Serialization (...)\.julia\juliaup\julia-1.10.4+0.x64.w64.mingw32\share\julia\stdlib\v1.10\Serialization\src\Serialization.jl:671
     [19] serialize(s::Distributed.ClusterSerializer{Sockets.TCPSocket}, x::Any)
        @ Serialization (...)\.julia\juliaup\julia-1.10.4+0.x64.w64.mingw32\share\julia\stdlib\v1.10\Serialization\src\Serialization.jl:655
     [20] serialize_msg(s::Distributed.ClusterSerializer{Sockets.TCPSocket}, o::Distributed.CallMsg{:call})
        @ Distributed (...)\.julia\juliaup\julia-1.10.4+0.x64.w64.mingw32\share\julia\stdlib\v1.10\Distributed\src\messages.jl:78
     [21] #invokelatest#2
        @ .\essentials.jl:892 [inlined]
     [22] invokelatest
        @ .\essentials.jl:889 [inlined]
     [23] send_msg_(w::Distributed.Worker, header::Distributed.MsgHeader, msg::Distributed.CallMsg{:call}, now::Bool)
        @ Distributed (...)\.julia\juliaup\julia-1.10.4+0.x64.w64.mingw32\share\julia\stdlib\v1.10\Distributed\src\messages.jl:181
     [24] send_msg
        @ (...)\.julia\juliaup\julia-1.10.4+0.x64.w64.mingw32\share\julia\stdlib\v1.10\Distributed\src\messages.jl:122 [inlined]
     [25] #remotecall#156
        @ (...)\.julia\juliaup\julia-1.10.4+0.x64.w64.mingw32\share\julia\stdlib\v1.10\Distributed\src\remotecall.jl:436 [inlined]
     [26] remotecall
        @ (...)\.julia\juliaup\julia-1.10.4+0.x64.w64.mingw32\share\julia\stdlib\v1.10\Distributed\src\remotecall.jl:434 [inlined]
     [27] remotecall(::Function, ::Int64; kwargs::@Kwargs{})
        @ Distributed (...)\.julia\juliaup\julia-1.10.4+0.x64.w64.mingw32\share\julia\stdlib\v1.10\Distributed\src\remotecall.jl:447
     [28] remotecall
        @ (...)\.julia\juliaup\julia-1.10.4+0.x64.w64.mingw32\share\julia\stdlib\v1.10\Distributed\src\remotecall.jl:447 [inlined]
     [29] spawnat
        @ (...)\.julia\juliaup\julia-1.10.4+0.x64.w64.mingw32\share\julia\stdlib\v1.10\Distributed\src\macros.jl:11 [inlined]
     [30] spawn_somewhere
        @ (...)\.julia\juliaup\julia-1.10.4+0.x64.w64.mingw32\share\julia\stdlib\v1.10\Distributed\src\macros.jl:13 [inlined]
     [31] macro expansion
        @ (...)\.julia\juliaup\julia-1.10.4+0.x64.w64.mingw32\share\julia\stdlib\v1.10\Distributed\src\macros.jl:91 [inlined]
     [32] macro expansion
        @ .\task.jl:479 [inlined]
     [33] (::Distributed.var"#177#179"{var"#1#2", UnitRange{Int64}})()
        @ Distributed (...)\.julia\juliaup\julia-1.10.4+0.x64.w64.mingw32\share\julia\stdlib\v1.10\Distributed\src\macros.jl:278
Stacktrace:
 [1] sync_end(c::Channel{Any})
   @ Base .\task.jl:448
 [2] macro expansion
   @ task.jl:480 [inlined]
 [3] top-level scope
   @ REPL[4]:1

Now I don’t have any experience with Distributed.jl and I also didn’t fully verify the explanation below. But to me it seems we want to send the code to execute (i.e. println(x)) to all processes (send_msg). For this we need to serialize x(*). As this is not explicitly implemented for CuArrays, we use the generic AbstractVector version, where we presumably iterate over its elements. For a CuArray this then results in the scalar indexing warning.

*But didn’t we define x on each process? So why would we need to send it?
Well, if you replace x = CUDA.rand(10) with x = rand() (and add an @everywhere println(x) here to see that the processes get different values), you’ll notice that the @distributed part now runs, but prints the same value for all processes. So apparently we are sending over our (the main process’s) value of x and not using the ones already defined on the other processes. See also this topic.

A possible workaround could be to use a getter function:

using Distributed

addprocs(2)
@everywhere begin
    using CUDA
    x = CUDA.rand(10)
    get_x() = x
end

@sync @distributed for i = 1:2
    println(get_x())
end
#=
      From worker 2:    Float32[0.44315395, 0.8780446, 0.21944213, 0.36170566, 0.14836204, 0.11738869, 0.726818, 0.1946531, 0.09105217, 0.9457448]
      From worker 3:    Float32[0.32678527, 0.65252995, 0.19543259, 0.69162387, 0.9956036, 0.3051676, 0.86222124, 0.18076622, 0.9949689, 0.45308512]
Task (done) @0x000002496fed3a30
=#
1 Like

Hello @eldee ,

Thanks a lot for putting some effort on helping me out!

I think I’ve got to a solution which, although seeming somewhat memory-wise inefficient, managed to solve the cpu x gpu parallelization problem:

I created a function to be run on pmap. The problem is that since pmap only takes one group of elements (as per my understanding), in order to input many variables I first need to place them into an array, see example below:

@everywhere function pmap_calc(elements)
    ele1 = elements[1]
    ele2 = elements[2]
    ele3 = elements[3]
        
    a, b = _myFunction(ele1, ele2, ele3)          
    return a, b
end

x = Array{Any}(undef, (3, 1)...)
for i in 1:3
    x[i] = [input1, input2, input3]
end

pmap(pmap_calc, x)

If anyone knows a better way to handle at least this pmap (or alternative solutions), it would be great!

Thanks a lot!

Hi Paulo, you’re welcome.

Could you add some more information to the code: declarations of input1 etc., a (dummy) implementation of _myFunction, …? See also point 4 in this PSA. It’s hard to see how something can be improved, when you’re not sure what is concretely going on :slight_smile: .

I’m also not sure what the intent is here. Why is size(x) == (3, 1), instead of just (3,)? By the way, note that you don’t need the splatting ...: (undef, (3, 1)) and (undef, 3, 1) (what the splatting results in) are equivalent. Is it intended that x[1] == x[2] == x[3]? Why Array{Any} and not something a bit more concrete like Vector{NTuple{3, CuArray}}?

The second argument to (p)map does not need to be a Vector, but could also be e.g. a Tuple or generator, which might help with the inefficient memory usage you mention. For example:

using Distributed
addprocs(2)
@everywhere begin
    using CUDA 
    using Statistics: mean
end

pmap(mean, (CUDA.rand(2) .+ myid() for i = 1:3))
#=
3-element Vector{Float32}:
 1.9031491
 1.707502
 1.1796367
=#

pmap(x -> myid() + mean(x), (CUDA.rand(2) for i = 1:3))
#=
3-element Vector{Float32}:
 3.316328
 2.5876007
 2.284222
=#

Note also that this example shows that the CUDA data is generated here by the master process (myid() == 1) and sent over to the other processes without any issues.

Hello @eldee ,

Thanks a lot for the suggestions! Finally I got to a solution that works fine for my needs: using a struct for running with the “fixed params”. I made a MWE that is running fine:

@everywhere begin
    struct my_dummy_struct
        mat_one
        mat_two
    end
    
    function initialize_dummy_struct(mat_one, mat_two)
        return my_dummy_struct(mat_one, mat_two)
    end

    # Define function working with mat_one and mat_two as "fixed inputs"
    function (m::my_dummy_struct)(x)
        # Extract inputs from x
        variable1 = x[1]
        variable2 = x[2]

        a = sum(variable1 .* (m.mat_one * m.mat_two))
        b = mean(variable2 .* (m.mat_one))
        return a, b
    end
end

# Define mat_one and mat_two
mat_one = CUDA.ones(2,2)
mat_two = CuArray([2.5 3.0; 2.7 4.5])

# Initialize the struct on the main process
pmap_dummy_struct = initialize_dummy_struct(mat_one, mat_two)

# Make struct available on other workers
global my_pmap_dummy_struct = pmap_dummy_struct

variable1_vec = [1; 2; 2.5]
variable2_vec = [3; 2; 3.5]

pmap(x -> my_pmap_dummy_struct(x), zip(variable1_vec, variable2_vec))
#=
3-element Vector{Tuple{Float64, Float64}}:
 (25.4, 3.0)
 (50.8, 2.0)
 (63.5, 3.5)
=#

Many of the points you asked the answer is just that I am really bad at coding! (and it was great to receive the comments so that I learnt some things, thanks for that also!)

1 Like