Parallel computing on a subset of an array

Hello!

I’m trying to do some parallel computation on the 2 first dimensions of a 3D array A[j,k, t]. To put things in context, the data are timeseries, where j and k are spatial grid points and t is the time dimension. There is also no possibility to permutedims in this context (Climate dataset can be huge).

My objective is to compute the same function for each grid points, using the whole timeserie for a given grid point j, k. Hence, going eachindex but only for the spatial dimensions.

The first attempt is to simply put Threads.@threads before one of the “spatial loop”. Here’s a MWE.

using BenchmarkTools

function simplethread()
    A = randn(3, 3, 10000)
    dataout = fill(NaN, 3, 3, 10000)
    
    for k = 1:size(A, 2)
        Threads.@threads for j = 1:size(A, 1)
            val = somefunction(A[j,k,:])
            dataout[j,k,:] = val
        end
    end
    return dataout
end

function somefunction(datain) # (in real case, I do `Polynomials.polyval` over the timeserie using a previous `polyfit`).
       dataout = datain .+ randn(1)
end

@benchmark simplethread()
BenchmarkTools.Trial: 
  memory estimate:  2.76 MiB
  allocs estimate:  442
  --------------
  minimum time:     1.414 ms (0.00% GC)
  median time:      1.576 ms (0.00% GC)
  mean time:        2.046 ms (16.58% GC)
  maximum time:     231.659 ms (98.22% GC)
  --------------
  samples:          2438
  evals/sample:     1

Now, I want to effectively do calculations in parallel for each grid point, not just one of the spatial dimension. I tried using CartesianRange, but it does not work. Here’s my attempt :

function cartesianthread()
    A = randn(3, 3, 10000)
    dataout = fill(NaN, 3, 3, 10000)
       
    R = CartesianRange(Base.front(indices(A)))
           
    Threads.@threads for r in R        
           val = somefunction(A[r,:])
           dataout[r,:] = val    
     end
     return dataout
end

cartesianthread()
ERROR: MethodError: no method matching unsafe_getindex(::CartesianRange{CartesianIndex{2}}, ::Int64)
Closest candidates are:
  unsafe_getindex(::StepRangeLen{T,#s45,#s44} where #s44<:Base.TwicePrecision where #s45<:Base.TwicePrecision, ::Integer) where T at twiceprecision.jl:193
  unsafe_getindex(::StepRangeLen{T,R,S} where S where R, ::Integer) where T at range.jl:505
  unsafe_getindex(::LinSpace, ::Integer) at range.jl:510
  ...
Stacktrace:
 [1] (::##342#threadsfor_fun#26{CartesianRange{CartesianIndex{2}},Array{Float64,3},Array{Float64,3}})(::Bool) at ./threadingconstructs.jl:63
 [2] macro expansion at ./threadingconstructs.jl:71 [inlined]
 [3] cartesianthread() at ./REPL[120]:7

Trying without the Threads.@threads macro inside the cartesianthread() function returns the correct values.

Thanks for any help or hints about what can be done!

Anyone has an idea? :smile:

Right now @threads only works with indexable iterables (more precisely, iterators with Base.unsafe_index defined).

You could try reshaping your array:

julia> function simplethread2()
                  A = randn(3, 3, 10000)
                  dataout = fill(NaN, 3, 3, 10000)
                  B = reshape(A,(size(A,1)*size(A,2),size(A,3)))
                  dataoutB = reshape(dataout,(size(A,1)*size(A,2),size(A,3)))
                  Threads.@threads for j = 1:size(B, 1)
                      val = somefunction(B[j,:])
                      dataoutB[j,:] = val
                  end
                  return dataout
              end
simplethread2 (generic function with 1 method)

julia> @benchmark simplethread2()
BenchmarkTools.Trial:
  memory estimate:  1.68 MiB
  allocs estimate:  43
  --------------
  minimum time:     1.766 ms (0.00% GC)
  median time:      2.372 ms (0.00% GC)
  mean time:        2.399 ms (0.00% GC)
  maximum time:     9.151 ms (0.00% GC)
  --------------
  samples:          2082
  evals/sample:     1

julia> @benchmark simplethread()
BenchmarkTools.Trial:
  memory estimate:  2.46 MiB
  allocs estimate:  470
  --------------
  minimum time:     2.051 ms (0.00% GC)
  median time:      2.333 ms (0.00% GC)
  mean time:        2.539 ms (0.00% GC)
  maximum time:     6.084 ms (0.00% GC)
  --------------
  samples:          1968
  evals/sample:     1

A and B share the same data.
Ps: Note that I didn’t test if both functions provide the same results.

1 Like

Thanks! Looks like the rehaping does not cost too much in this case. I’ll have to try if it’s still the case with regular size array I’m used too (~ 200x200x20_000). Will keep posted!

The provided solution does work, thanks!

The speedup is not as important as I expected, due to the cost of reshaping I guess vs no reshape in the non-threaded function.

The way your data is laid-out is not the best for the type of work you are doing (using the whole time series of one point). Is reorganizing your data really out of question? You would see major speed improvement if your data was organized as A[t,k,j] instead of A[j,k,t].
A[t,j,k] would work too.

1 Like

You could try transposing your array before this calculation, and then transposing it back to the original shape. You’d have to benchmark it to see whether the speed gain on the calculation is worth, considering the extra time spent transposing the data.

The original datasets contained in netCDF files from climate models are [k, j, t] (standardized practice).

Initially, we were transposing it to [t, k, j] to get better performance on calculations. However, the bottleneck came from loading bigger dataset such as Daymet (e.g. 1km resolution → size of (7814 × 8075 × 365) for a given year. The transposition (using permutedims) was out of the question due to memory limits. Hence, our solution was to avoid transposing at the cost of slower calculations.

I’m open to suggestions! For example, is there a function that tests if a given operation would give a memory limit error? I could simply do a if-else perhaps.

Instead of reshaping A, you could try linearly looping over a list of cartesian indices:

    R = CartesianRange((indices(A,1), indices(A,2)))
    cartesian_inds = collect(R)

    Threads.@threads for i in linearindices(cartesian_inds)
        j = cartesian_inds[i][1]
        k = cartesian_inds[i][2]
        val = somefunction(A[j,k,:])
        dataout[j,k,:] = val
    end

However, the biggest gains would probably come from:

  1. cache-friendly layout with time series as the first dimension.

  2. re-writing somefunction to accept A, j, k as separate arguments (to avoid A[j,k,:] slicing), and possibly to update val or dataout in-place, rather than returning an allocated array.

1 Like

Showing my utter ignorance here. The documentation for reshape says “The two arrays share the same underlying data, so that setting elements of R alters the values of A and vice versa.”
So the reshaped array uses the same memory addresses. It is just indexed differenty.
ie. you dont use twice the memory when reshape is used?

I guess I could Google, but at a low level how does an array “know” that it has dimensions A B C?
I guess it is held in a data structure… Algorithms + Data Structures = Programs after all!

1 Like

Precisely. Eg

julia> A = zeros(4)
4-element Array{Float64,1}:
 0.0
 0.0
 0.0
 0.0

julia> B = reshape(A, 2, :)
2×2 Array{Float64,2}:
 0.0  0.0
 0.0  0.0

julia> B[1, 1] = 1.0
1.0

julia> A
4-element Array{Float64,1}:
 1.0
 0.0
 0.0
 0.0

This is an implementation detail which you don’t need unless you want to contribute to Julia internals, but a good starting point is the devdocs.

Thankyou.
Veering wildly off topic, at one point I had expertise in BOS memory management for FORTRAN
http://www.desy.de/~blobel/bosman.pdf

Doing terrible terrible things to the insides of innocent COMMON blocks. The horror… the horror…

Thanks for the suggestion! I’ll try it and see how it goes. (can’t test it before next week though).