Efficiently using single large dataframe over multiple workers


#1

I’m new to Julia and programming in general. I need to run many linear regressions using exactly the same large dataset 5GB+. I’ve wrote a code to run each regression over different workers. Essentially, I’m copying the dataframe into each worker() and running a loop with parallel. However, this process consumes a lot of memory and sometimes Julia run out of memory.

Is there a way to load the dataframe on a single worker and allow other workers() to use it as read-only?

I’d appreciate any other tips on how to improve the code as well. Thanks a lot in advance

My code is the following:

addprocs(12)
nprocs()

using JuliaDB, CSV
@everywhere using DataFrames, RDatasets, FixedEffectModels, JLD #, FileIO, StatFiles, DataFrames, CSV, JuliaDB

df1 = @time JLD.load("iv_sample_baseline_p2_julia.jld","df1")

# making df1 available in all processors
nprocs()
for i in 1:nprocs()
@spawnat i df1
end

cd("C:/Users/admin-britto/Dropbox/Brazil_Emp_dur/Empirical/Program_State_9/Julia/paralell/project1")
@everywhere include("regression.jl") # define regression function 
@everywhere include("definitions.jl") # define global variables used in each worker

# multi processor
nprocs()
res = SharedArray{Float64}(1000,6)

tic()
@parallel for n in 1:36
i = mod(n-1,6)+1
j = div(n-1,6)+1
control = eval(parse("c$i"))
w = eval(parse("w$j"))
a = "FixedEffectModels.reg(df1, @model($dep ~ $w $control + (ben_inf ~ iv_pred_w), fe = yearfe , vcov = cluster(pisfe) ))"
eval(parse("b = $a"))
res[n,1] = round(coef(b)[length(coef(b))],8)
res[n,2] = round(vcov(b)[length(coef(b))^2]^(1/2),8)
res[n,3] = round(coef(b)[length(coef(b))]/(vcov(b)[length(coef(b))^2]^(1/2)),3)
res[n,4] = nobs(b)
res[n,5] = i
res[n,6] = j
end
@everywhere fetch(res)
toc()
´´´

#2

In general, if you have multiple Julia processes running on the same computer and you all want them to have access to the same large array, then you should use a SharedArray. Now that I type this I see that you are storing your results in a shared array. If your df1 was a regular array then you could make it a SharedArray as well. Then all processes could access it, with only one copy stored in memory. However, I’m not a DataFrames user and I don’t know if there’s a way to make DataFrames use SharedArrays in some way. Perhaps someone familiar with DataFrames can chime in.


#3

This should be trivial, DataFrames is agnostic to the type of AbstractVector used for columns. For example, you could just do

df = DataFrame(A=shared_array_A, B=shared_array_B)

and you’ll have a DataFrame with shared arrays.

At that point all column-wise operations should work just as they would with any SharedArray that’s not part of a DataFrame. I’m a little fuzzier with what exactly would happen with join and by, which is clearly the crucial aspect of this. Those would just happen on a single process, I don’t think anyone has done a parallel join or by implementation for DataFrames. Certainly the capability exists in JuliaDB, but I don’t know the details.


#4

Thanks a lot guys.

Indeed the key issue is that I’m using a function that needs a DataFrame.

I’ve tried the last suggestion but it gives an error when trying to convert a shared array into a dataframe (line 4):

a = Array{Float64}(10,1)

df = DataFrame(a)

b = SharedArray{Float64}(10,1)

df = DataFrame(b)

MethodError: Cannot convert an object of type SharedArray{Float64,2} to an object of type DataFrames.DataFrame

This may have arisen from a call to the constructor DataFrames.DataFrame(…),

since type constructors fall back to convert methods.

DataFrames.DataFrame(::SharedArray{Float64,2}) at sysimg.jl:77

include_string(::String, ::String) at loading.jl:522

include_string(::String, ::String, ::Int64) at eval.jl:30

include_string(::Module, ::String, ::String, ::Int64, ::Vararg{Int64,N} where N) at eval.jl:34

(::Atom.##100#105{String,Int64,String})() at eval.jl:75

withpath(::Atom.##100#105{String,Int64,String}, ::String) at utils.jl:30

withpath(::Function, ::String) at eval.jl:38

hideprompt(::Atom.##99#104{String,Int64,String}) at repl.jl:65

macro expansion at eval.jl:73 [inlined]

(::Atom.##98#103{Dict{String,Any}})() at task.jl:80

Am I doing anything wrong?


#5

This works for me on DataFrames master.

But parallelism isn’t going to work out of the box with DataFrames. Someone would have to write am implementation of DataFrames where everything is a shared array. This is non-trivial.

Are you sure your function needs a DataFrame, and can’t be done with JuliaDB?


#6

It depends on what you are doing though. The actual DataFrame operations like join and by won’t work in parallel out of the box, but any of the purely column-wise stuff should since these tend to be just regular array operations. In my experience there’s a pretty large class of problems where the bulk of the run-time is spent doing simple column-wise transformations, of course I don’t know if that’s the case here. (You’d also have to be careful not to do things like eachrow which would likely screw up any potential parallelism.)

I agree that if your goal here is really parallelism, you should definitely use JuliaDB. Hopefully we’ll eventually have some simple parallel methods in DataFrames while keeping it nice and light-weight, but we’re not there yet.


#7

I see, to me it did not work with DataFrames either. I’ll finding out what I’m doing wrong.

I’m borrowing an user-written function that’s a little complex. It probably could be done with JuliaDB but would probably take some work and is beyond my capabilities at the moment.

I’m not 100% sure what you mean by real parallelism, but I guess what I’m trying to do is very simple and dirty parallelism. All I want is running for “n times” a function that essentially only needs to read a single constant large DataFrame- no need for editing that (I guess!). I’ve implemented that in the code above that distributed each run of the function over multiples workers but it is consuming to much memory.

I’ll try going over the function code and see if I can convert it to JuliaDB


#8

What I mean is whether you are really looking parallel dataframe operations or simply parallel array operations using a DataFrames as convenient organizer. By dataframe operations I basically mean this stuff.

If the former, yes you will needed dedicated code specifically for this type of data structure which currently only exists in JuliaDB. If the latter, you basically only need a container type that can be shared across multiple processes, whether you stick those arrays in a DataFrame or not shouldn’t matter. Looking at what you wrote above it looks to me like your use case might be the latter, but I don’t know what FixedEffectModels is doing.

It might seem like a stupid question, but because of the way things are typically done in other languages I find that simple array operations often get enmeshed into the dataframes API, while this isn’t the case in Julia where you are much less likely to be constrained by whether a dataframe API has a prticular feature.


#9

I see what you are saying, in theory you should be able to write df.columns = SharedArray(df.columns), but then any operation, like join which calls a new constructor might not work.


#10

Yes, exactly. It would still work for a pretty large class of use cases though, so it might be worth doing.

(By the way, as a complete example of how well this sort of thing can work see the current master of Feather.jl.)


#11

Great thanks for all the tips guys!