How to copy a Julia DataFrame in a threaded loop?

First of all, I am a very beginner
I’ve got the following problem :
A DataFrame named cr1 with 553 columns

Then, I make two loops as follow :

cr2=copy(cr1)
   #-----------------#
   #Loop on scenarios#
   #-----------------#
    @threads for k in 0:499
        cr1=copy(cr2)
        cr1[!,"scn"].=k

        cr1 = leftjoin(cr1,tra,on = [:scn])

        for i in 1:40
            #----------------------#
            ##Projection on 40 years
            #----------------------#
            cr1[!,"ch_prime"*str(i)].=cr1[!,"ch_prime"].*(cr1[!,"coll_"*str(i)]
            ...job on other columns

At the beginning, I make a copy of cr1 to keep the vision of the DataFrame at this stage. Indeed I need to restart of this vision at each iteration of the k-loop.
So,
1- I collect the scenario number (=k)
2- I make a merge with another DataFrame(named tra) to collect 40 columns (tra1-tra40) different on each scenario
3- I make a projection on 40 years, adding some new columns to cr1
4- Just before the end of the loop, I would like to append a result data frame with cr1 result for k=0,2,…499

But it doesn’t work. Julia do some job but cr1 is not modified.
I tried adding a global on the first line of the loop and squeezing the @threads

for k in 0:499
        global cr1=copy(cr2)

Then it works, but with no threading and I want to keep it for performance
Have you any idea another way to do this or a way to thread the last solution ?

I believe the issue is that you are essentially assigning a local copy of cr1 to each loop iteration by the line cr1=copy(cr2). You modify this local copy inside your loop. The problem is that these modifications are only done on that copy, not on the one outside of the loop that you started with.

Instead, create an empty DataFrame before you loop. Then, inside your loop create whatever portion of the result that you want from each iteration. At the end of the iteration, use vcat or append! to incrementally add them to your pre-created result DataFrame.

I’ve tried you’re solution, putting this just before the k-loop:

res=DataFrame()

@threads for k in 0:499
cr1=copy(cr2)
cr1[!,β€œscn”].=k

then the body of the loop and on the last line :

append!(res,cr1)

With the following error message :

TaskFailedException:
UndefRefError: access to undefined reference
Stacktrace:

  • [1] getindex at ./array.jl:788 [inlined]*
  • [2] nrow(::DataFrame) at /Users/alinemai/.julia/packages/DataFrames/htZzm/src/dataframe/dataframe.jl:346*
  • [3] insert_single_column!(::DataFrame, ::PooledArrays.PooledArray{String,UInt32,1,Array{UInt32,1}}, ::Symbol) at /Users/alinemai/.julia/packages/DataFrames/htZzm/src/dataframe/dataframe.jl:495*
  • [4] setindex!(::DataFrame, ::PooledArrays.PooledArray{String,UInt32,1,Array{UInt32,1}}, ::typeof(!), ::Symbol) at /Users/alinemai/.julia/packages/DataFrames/htZzm/src/dataframe/dataframe.jl:544*
  • [5] append!(::DataFrame, ::DataFrame; cols::Symbol, promote::Bool) at /Users/alinemai/.julia/packages/DataFrames/htZzm/src/dataframe/dataframe.jl:1202*
  • [6] append!(::DataFrame, ::DataFrame) at /Users/alinemai/.julia/packages/DataFrames/htZzm/src/dataframe/dataframe.jl:1195*
  • [7] macro expansion at /Users/alinemai/Julia/Julia_disclosure.jl:320 [inlined]*
  • [8] (::var"#20#threadsfor_fun#11"{UnitRange{Int64}})(::Bool) at ./threadingconstructs.jl:61*
  • [9] (::var"#20#threadsfor_fun#11"{UnitRange{Int64}})() at ./threadingconstructs.jl:28*
    wait(::Task) at task.jl:267
    macro expansion at threadingconstructs.jl:69 [inlined]
    macro expansion at Julia_disclosure.jl:121 [inlined]
    top-level scope at util.jl:175

Did I make some mistakes ?

Does it work if you just drop the multi-threading or does that also fail?

Yes, it works perfectly when squeezing the @threads…but as you can imagine I would like to keep it :wink:

Sorry it works with the addition of a global at the beginning of the loop

res=DataFrame()

for k in 0:499
global cr1=copy(cr2)

Maybe a simplest exemple could better illustrate the problem encountered


using DataFrames
using .Threads

bd = DataFrame(id = 1:4, pm_0 = [100, 50, 40, 120])
tra = DataFrame(scn = 1:5, tra1 = [0.10, 0.12, 0.14,0.12, 0.14],
tra2 = [0.10, 0.20, 0.12,0.20, 0.12],
tra3 = [0.11, 0.12, 0.14,0.22,0.12])
#Code 1 - No thread
bd2=copy(bd)
result=DataFrame()

for k in 1:5
    global bd=copy(bd2)
    bd[!,"scn"].=k
    bd = leftjoin(bd,tra,on = [:scn])
 
     for i in 1:3
         bd[!,"pm_$i"].=bd[!,"pm_$(i-1)"].*bd[!,"tra$i"]
     end
     append!(result,bd)
 end
 head(result)
> Gives the right thing
6Γ—9 DataFrame. Omitted printing of 2 columns
> β”‚ Row β”‚ id    β”‚ pm_0  β”‚ scn   β”‚ tra1     β”‚ tra2     β”‚ tra3     β”‚ pm_1    β”‚
> β”‚     β”‚ Int64 β”‚ Int64 β”‚ Int64 β”‚ Float64? β”‚ Float64? β”‚ Float64? β”‚ Float64 β”‚
> β”œβ”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
> β”‚ 1   β”‚ 1     β”‚ 100   β”‚ 1     β”‚ 0.1      β”‚ 0.1      β”‚ 0.11     β”‚ 10.0    β”‚
> β”‚ 2   β”‚ 2     β”‚ 50    β”‚ 1     β”‚ 0.1      β”‚ 0.1      β”‚ 0.11     β”‚ 5.0     β”‚
> β”‚ 3   β”‚ 3     β”‚ 40    β”‚ 1     β”‚ 0.1      β”‚ 0.1      β”‚ 0.11     β”‚ 4.0     β”‚
> β”‚ 4   β”‚ 4     β”‚ 120   β”‚ 1     β”‚ 0.1      β”‚ 0.1      β”‚ 0.11     β”‚ 12.0    β”‚
> β”‚ 5   β”‚ 1     β”‚ 100   β”‚ 2     β”‚ 0.12     β”‚ 0.2      β”‚ 0.12     β”‚ 12.0    β”‚
> β”‚ 6   β”‚ 2     β”‚ 50    β”‚ 2     β”‚ 0.12     β”‚ 0.2      β”‚ 0.12     β”‚ 6.0     β”‚
Code 2 - Use of @threads - doesn't work
bd2=copy(bd)
result=DataFrame()

@threads for k in 1:5
    global bd=copy(bd2)
    bd[!,"scn"].=k
    bd = leftjoin(bd,tra,on = [:scn])

    for i in 1:3
        bd[!,"pm_$i"].=bd[!,"pm_$(i-1)"].*bd[!,"tra$i"]
    end
    append!(result,bd)
end
head(result)

TaskFailedException:
AssertionError: length(idx.names) == length(idx.lookup) == ncols
Stacktrace:
[1] _check_consistency(::DataFrame) at /Users/alinemai/.julia/packages/DataFrames/htZzm/src/dataframe/dataframe.jl:365
[2] _join(::DataFrame, ::DataFrame; on::Array{Symbol,1}, kind::Symbol, makeunique::Bool, indicator::Nothing, validate::Tuple{Bool,Bool}) at /Users/alinemai/.julia/packages/DataFrames/htZzm/src/abstractdataframe/join.jl:241
[3] #leftjoin#334 at /Users/alinemai/.julia/packages/DataFrames/htZzm/src/abstractdataframe/join.jl:636 [inlined]
[4] macro expansion at /Users/alinemai/Julia/Exemple.jl:15 [inlined]
[5] (::var"#237#threadsfor_fun#33"{UnitRange{Int64}})(::Bool) at ./threadingconstructs.jl:61
[6] (::var"#237#threadsfor_fun#33"{UnitRange{Int64}})() at ./threadingconstructs.jl:28
wait(::Task) at task.jl:267
top-level scope at threadingconstructs.jl:69

Here is a fix with a comment what is changed:

using DataFrames
using .Threads

bd = DataFrame(id = 1:4, pm_0 = [100, 50, 40, 120])
tra = DataFrame(scn = 1:5, tra1 = [0.10, 0.12, 0.14,0.12, 0.14],
tra2 = [0.10, 0.20, 0.12,0.20, 0.12],
tra3 = [0.11, 0.12, 0.14,0.22,0.12])
#Code 1 - No thread
bd2=copy(bd)
result=DataFrame()

l = ReentrantLock() # create lock variable

@threads for k in 1:5
    local bd=copy(bd2) # this has to be local to avoid threads sharing the same object
    bd[!,"scn"].=k
    bd = leftjoin(bd,tra,on = [:scn])
 
     for i in 1:3
         bd[!,"pm_$i"].=bd[!,"pm_$(i-1)"].*bd[!,"tra$i"]
     end
     lock(l) # need to lock for append!
     append!(result,bd)
     unlock(l)
 end
 head(result)
2 Likes

Finally, it works fine and faster.
I will search documentation to better understand the lock / unlock mechanism.
Thks again :wink: