How can I split large data using a faster and more efficient function (data science)?

Hi everyone

I am a new user of Julia. I want to parse data from a file with over 50 GB. I have used CSV.file to read a set of data (every 427,905 lines) using skipto=start_m,limit=NumberEffects,ntasks=1.
The file contains 2,567,430,000 lines. With every iteration, the time is doubled. It seems to be the function read all the files again until the position indicated in limit argument. So I don’t know how to read large data and split it more efficient. Could someone suggest some code, commands, o packages?

Code below:

function ComputeVariancePerGeneration(Route_NameOfFile::String, NumberEffects::Int64, Iter::Int64, group_1::Dict, gener::Vector{Int64}, EffectPosition::Int64, called::String="Mutational variance")
	MSM = string("You are calculating the elements of posterior marginal distribution of ",called,"\n", "BE CAREFUL writing the arguments","\n")
	printstyled(MSM,bold=false,color=:light_yellow)
	values_var = var_vector(Dict{Int64, Vector{Float64}}(),Dict{Int64, Float64}())
	start_m = 1
	end_m   = NumberEffects
	for i in gener
		values_var.vsam_par[i] = Float64[]
		values_var.mean_PMD[i] = -1.0
	end
	for itera in 1:Iter
		all_solut = Matrix(CSV.read(Route_NameOfFile,delim=' ',ignorerepeated=true,header=false,skipto=start_m,limit=NumberEffects,ntasks=1,DataFrame))
		all_solut = all_solut[all.(==(EffectPosition), eachrow(all_solut[:,2])), :]
		for i in gener
			current_value = round(var(all_solut[findall(in( values(group_1.Vari_par[i]) ),all_solut[:,3]), [4]]), digits=8)
			push!(values_var.vsam_par[i], current_value)	
		end
		start_m = end_m + 1
		end_m   = start_m + NumberEffects - 1
		println(itera)
	end
	@show last(all_solut)
	for m in gener
		values_var.mean_PMD[m] = mean(values(values_var.vsam_par[m]))
	end	
	MSM = string("The first element is a vector of variance elements per generation,","\n", "the second element is the mean of variance component parameter per generation codification")
	printstyled(MSM,bold=false,color=:light_cyan)
	return (values_var.vsam_par,values_var.mean_PMD)
end

route_2 = raw"I:\NARUTO\870-3020\model_AC+Mutat"
Route_NameOfFile= route_2*"/all_solutions2"
NumberEffects = 427905
Iter = 6000
group_1 = member_generat[2]
gener = [0,1,3,4,5,6,7,8]
EffectPosition = 2
called = "additive genetic variance"

Never used it myself, but it could be an application for CSV.Chunks (Reading · CSV.jl)

Thank you for your answer.
The issue is that I can not use the limit argument, in CSV.Chunks, with ntasks being greater than 1, because the total number of lines is distinct in each chunk.

* `limit`: an `Integer` to indicate a limited number of rows to parse in a csv file; 
use in combination with `skipto` to read a specific, contiguous chunk within a file; 
note for large files when multiple threads are used for parsing, 
the `limit` argument may not result in an exact 
# of rows parsed; use `threaded=false` to ensure an exact limit if necessary

BTW, “threaded” is deprecated.

Maybe I misunderstood something. I thought something in the line of

using DataFrames, CSV
ch = CSV.Chunks(Route_NameOfFile;delim=' ',ignorerepeated=true,header=false, ntasks = 6000)
t = []
for df in ch
   push!(t, DataFrame(df)) # replace with your calculation
end

could help. Basically, what you are doing with skipto and limit is done by Chunks in an automatic way.

1 Like

In addition to the Chunks method you could use Rows and that’s particularly helpful if you don’t need all the data but want to filter out some subset. Like if you are reading Census data and just want rows from one particular state or something similar.

Thank you once again for your answer.

I can split the data into chunks with your command lines, but the issue is I would need 6000 chunks with specific 427,905 lines (which comprises each iteration of data). CSV.Chunks without limit divides into chunks with similar bites instead of the same number of lines (each chunk).

How could I define the number of lines by chunk, as every 427,905 lines? I need to calculate some parameters for every 427,905 lines.

CSV.File("I:\NARUTO\870-3020\model_AC+Mutat/all_solutions2"):
Size: 428241 x 4
Tables.Schema:
 :Column1  Int64
 :Column2  Int64
 :Column3  Int64
 :Column4  Float64
CSV.File("I:\NARUTO\870-3020\model_AC+Mutat/all_solutions2"):
Size: 427992 x 4
Tables.Schema:
 :Column1  Int64
 :Column2  Int64
 :Column3  Int64
 :Column4  Float64
CSV.File("I:\NARUTO\870-3020\model_AC+Mutat/all_solutions2"):
Size: 428127 x 4
Tables.Schema:
 :Column1  Int64
 :Column2  Int64
 :Column3  Int64
 :Column4  Float64
CSV.File("I:\NARUTO\870-3020\model_AC+Mutat/all_solutions2")

and so on…

Use the Rows object and a counter

Hi, thanks

However, I can not get how to use CSV. Rows efficiently. I could create command lines to collect the data using CSV. Row, but it takes a long time like my first function. I can rewrite and define the variable, but my point is that the command always begins to count again from the first line.

CSV.Chunks would be the ideal way to solve my issue if I could define the number of lines in each chunk.

Sorry, I am not a master of using CSV.File.jl. I really appreciate your help.

Iter=6000
ch = CSV.Rows(route_2*"/all_solutions2";delim=' ',ignorerepeated=true,header=false)
t = Union{DataFrame, Nothing}[nothing for _ in 1:Iter]
let counter_1=0, i=1, m=0, TotalEffects = 427905
    for s in ch
        m+=1
        if counter_1==0
            t[i]=DataFrame()
        end
        if m % TotalEffects!=0
            value = DataFrame([:Column1 => s.Column1, :Column2 => s.Column2, :Column3 => s.Column3, :Column4 => s.Column4])
            push!(t[i], value[1,:])
            counter_1+=1
        elseif m % TotalEffects==0
            value = DataFrame([:Column1 => s.Column1, :Column2 => s.Column2, :Column3 => s.Column3, :Column4 => s.Column4])
            push!(t[i], value[1,:])
            counter_1=0
            println(i)
            i+=1
        end
    end
end

# or skipping the previous data used in the previous computation.

ch=CSV.Rows(route_2*"/all_solutions2";delim=' ',ignorerepeated=true,header=false,skipto=427905001,limit=427905)

# that option takes a longer time 
 

Cheers

For large files you want to stream through them, i.e., not load them in memory first, but iterate over the rows one by one. Others have already suggested CSV.Rows for that purpose. Then, try to think in separate steps and use independent tools for each step:

rows = CSV.Rows(<my_file>)                       # 1. Get iterator over all rows
chunks = Base.Iterators.partition(rows, 427905)  # 2. Chunk into 427905 rows each (this will need to fit into memory though)
map(my_task, chunks)                             # 3. Do the actual work with each chunk
# for chunk in chunks                            # 3. In case you prefer loops or do not want to store results of my_task, e.g., writing them to another file immediately 
#    my_task(chunk)
# end
2 Likes

Thank you very much. I have implemented your command line structure using a function with a “for” loop, and it is utterly more efficient and faster. I have only taken around 4 hours (without optimising) to carry out my analysis.

function FASTER_ComputeVariancePerGeneration(Route_NameOfFile::String, NumberEffects::Int64, Iter::Int64, group_1::samples_vector, gener::Vector{Int64}, EffectPosition::Int64, called::String="Mutational variance")
	MSM = string("You are calculating the elements of posterior marginal distributions of ",called,"\n", "BE CAREFUL writing the arguments","\n")
	printstyled(MSM,bold=false,color=:light_yellow)
	values_var = var_vector(Dict{Int64, Vector{Float64}}(),Dict{Int64, Float64}())
	for i in gener
		values_var.vsam_par[i] = Float64[]
		values_var.mean_PMD[i] = -1.0
	end
	ch = CSV.Rows(Route_NameOfFile;delim=' ',ignorerepeated=true,header=false)
	hunks = Base.Iterators.partition(ch, NumberEffects)
	j=0
	for itera in hunks
		j+=1
		all_solut = DataFrame(itera)
		for c ∈ names(all_solut, PosLenString)
			all_solut[!, c]= parse.(Float64, all_solut[!, c])
		end
		all_solut=Matrix(all_solut)
		all_solut = all_solut[all.(==(EffectPosition), eachrow(all_solut[:,2])), :]
		for i in gener
			current_value = round(var(all_solut[findall(in( values(group_1.Vari_par[i]) ),all_solut[:,3]), [4]]), digits=8)
			push!(values_var.vsam_par[i], current_value)
		end
		println(j)
	end
	for m in gener
		values_var.mean_PMD[m] = mean(values(values_var.vsam_par[m]))
	end
	if j!=Iter
		printstyled("The number of iteration written in the function: $Iter is not the same as the number total of iteration in chunk: $j"  ,bold=true,color=:light_red)
	end
	MSM = string("The first element is a vector of variance elements per generation,","\n", "the second element is the mean of variance component parameters per generation")
	printstyled(MSM,bold=false,color=:light_cyan)
	return (values_var.vsam_par,values_var.mean_PMD)
end