Parallel DataFrame processing

Hi !
I try to process DataFrame columns in parallel. I first think using pmap() but it was not possible because it is not just applying a function to each columns. So I created a toy example here producing my error.

It is unclear for me if this error is because DataFrames are not thread safe or related to another problem.

ERROR: LoadError: On worker 3:
KeyError: key DataFrames [a93c6f00-e57d-5684-b7b6-d8193f3e46c0] not found

Thanks in advance for your help !

using DataFrames
using Distributed

addprocs(3)

# this function computes the sum of 2 DataFrame columns
@everywhere function sum2col(df, c1, nc)
    result = Array{Array{String, 1}, 1}()
    for c2 = (c1+1):nc
        su = sum(df[!,c1]) + sum(df[!,c2])
        str = [string(names(df)[c1]) , string(names(df)[c2]), string(su)]
        push!(result, str)
    end
    return result  
end

# this function start the jobs
function process(df)
    (nr, nc) = size(df) # row number and col number
    t = Array{Future,1}(undef,nc-1)  # Future array

    # column 1 = rownames we start at column 2
    @sync for c1 = 2:(nc-1)
        @async t[c1-1] = @spawn sum2col(df, c1, nc)
    end   

    # get jobs results
    fetchJobs(nc, t)
    
end

# this function get the jobs results
function fetchJobs(nc, t)
    result = [["col1", "col2", "Sum"]]
    @sync for j in 2:(nc-1)
        @async r = fetch(t[j-1])
        # push the results together
        append!(result, r)
    end
    return  result
end

# MAIN
df = DataFrame(A = ["row1", "row2", "row3", "row4"], B = rand(4), C = rand(4), D = rand(4), E = rand(4))
sumAllCol = process(df)
println(sumAllCol)
1 Like

Hi, you want to start the script making DataFrames available to all workers

using Distributed
addprocs(3)
@everywhere using DataFrames

That solves the error you described. Then you’ll get other error with the variable r not being declared. I guess that happens because @async starts a process but doesn’t wait until it is finished before calling append!(result, r). A possible solution is to remove @sync and @async from fetchJobs(). The real parallel work is done within process().

2 Likes

@alvrg thank you very much for finding my error !

I post here the working example

using Distributed
addprocs(3)
@everywhere using DataFrames

# this function computes the sum of 2 DataFrame columns
@everywhere function sum2col(df, c1, nc)
    result = Array{Array{String, 1}, 1}()
    for c2 = (c1+1):nc
        su = sum(df[!,c1]) + sum(df[!,c2])
        str = [string(names(df)[c1]) , string(names(df)[c2]), string(su)]
        push!(result, str)
    end
    return result  
end

# this function start the jobs
function process(df)
    (nr, nc) = size(df) # row number and col number
    t = Array{Future,1}(undef,nc-1)  # Future array

    # column 1 = rownames we start at column 2
    @sync for c1 = 2:(nc-1)
        @async t[c1-1] = @spawn sum2col(df, c1, nc)
    end   

    # get jobs results
    fetchJobs(nc, t)
    
end

# this function get the jobs results
function fetchJobs(nc, t)
    result = [["col1", "col2", "Sum"]]
    for j in 2:(nc-1)
        r = fetch(t[j-1])
        # push the results together
        append!(result, r)
    end
    return  result
end

# MAIN
df = DataFrame(A = ["row1", "row2", "row3", "row4"], B = rand(4), C = rand(4), D = rand(4), E = rand(4))
sumAllCol = process(df)
println(sumAllCol)
5 Likes