Skipping a lot of lines in CSV.read() allocates too much memory

,

A lighter version that returns a vector of tuples and that depends only on CSV (I think only for number parsing).
If you do it by hand, you donโ€™t have any dependence

function mappend(file)
    positions = Vector{SVector{3, Float64}}()
    open(file) do f
    readuntil(f, "ITEM: ATOMS")
    while !eof(f)
        currblock=[SA[r[3],r[4],r[5]] for r in eachrow(CSV.read(IOBuffer(readuntil(f, "ITEM: TIMESTEP")), CSV.Tables.matrix; delim=' ', ignorerepeated=true))]
        append!(positions,currblock )
        readuntil(f, "ITEM: ATOMS")
    end
end
positions
end

If the CSV package provided a sink that was an array of tuples, you could skip a few steps and, I think, speed things up even more.

A Tables.RowTable is a Vector of NamedTuple. Is that close enough?

I had thought about it and tried, but it doesnโ€™t seem to be defined as sink.
I think itโ€™s a simple addition for those who know their way around CSV code or the Tables API and Iโ€™d like to see, as an example, how to define and add a new sink type.

julia> function mappend(file)
           positions = Vector{NTuple{3,Float64}}()
           open(file) do f
           while !eof(f)
               readuntil(f, "ITEM: ATOMS id type xs ys zs\n")
               m=CSV.read(IOBuffer(readuntil(f, "ITEM: TIMESTEP")), CSV.Tables.RowTable; delim=' ',header=false, ignorerepeated=true)
               np=[(r.xs,r.ys,r.zs) for r in m]
               append!(positions,np )
           end
       end
       positions
       end
mappend (generic function with 1 method)

julia> @btime mappend(file)
ERROR: MethodError: no method matching (AbstractVector{T} where T<:NamedTuple)(::Tables.CopiedColumns{CSV.File})

among other things, I notice that in the previous post I refer to the following snippet, while instead there is the version that uses StaticArrays

function mappend(file)
    positions = Vector{NTuple{3,Float64}}()
    open(file) do f
    while !eof(f)
        readuntil(f, "ITEM: ATOMS id type xs ys zs\n")
        m=CSV.read(IOBuffer(readuntil(f, "ITEM: TIMESTEP")), CSV.Tables.matrix; delim=' ',header=false, ignorerepeated=true)
        np=[(r[3],r[4],r[5]) for r in eachrow(m)]
        append!(positions,np )
    end
end
positions
end

@rocco_sprmnt21 , thank you very much again.
I believe both your tuple and SVector solutions are a little bit faster than the currently accepted one. To acknowledge everyoneโ€™s contributions so far, Iโ€™ll prepare a tidy list of benchmarks and post them soon as the โ€œaccepted solutionโ€, for future reference.

As for the CSV โ€œsinksโ€, I have to say Iโ€™m pretty confused.

Where would you even find that option if youโ€™re a new user looking for whatโ€™s available out there?

Do you just have to read the code behind the CSV functions?
Looking at the CSV website, the only sink thatโ€™s explicitly mentioned is DataFrame.
Iโ€™ve been refered to the Tables.jl documentation before on this forum (which is also briefly mentioned somewhere in the CSV documentation), but it would have been very difficult to figure that out on my own, with my limited experience.

Would it be reasonable to ask for a comprehensive list of sink options to be incorporated in the CSV documentation, or anywhere else thatโ€™s easy to find by a google search?

Iโ€™m doing a package for analysis of molecular dynamics simulations which so happens to currently support only LAMMPS format.
The link is: Files ยท main ยท Vasily Pisarev / MDProcessing.jl ยท GitLab , if you find it useful.
I chose an easy path of reading only one-snapshot-per-file trajectories, so you might need to split the trajectory into a sequence of files named prefix.<timestep>.suffix (i.e., dump.0.txt, dump.1.txt etc.).

2 Likes

Hi,

Script(!) - written quickly (it can be improved Iโ€™m sure) - below:

code
using Arrow, CSV, DataFrames

file = "lammpDump.txt"
arrowFile = "lammpDump.arrow"
#=
ITEM: TIMESTEP
400
ITEM: NUMBER OF ATOMS
2000
ITEM: BOX BOUNDS pp pp pp
-3.0000000000000000e+01 3.0000000000000000e+01
-3.0000000000000000e+01 3.0000000000000000e+01
-3.0000000000000000e+01 3.0000000000000000e+01
ITEM: ATOMS id type xs ys zs
1 1 0.245239 0.510142 0.113791
2 2 0.234139 0.52158 0.113102
=#
Base.@kwdef struct Lammps
    timestep = 0
    natoms = 0
    boxsize#::Matrix{Float64} = Array{Float64}(undef, 0, 0)
    positions#::Matrix{Float64} = Array{Float64}(undef, 0, 0)
end
ArrowTypes.arrowname(::Type{Lammps}) = :Lammps
ArrowTypes.JuliaType(::Val{:Lammps}) = Lammps
atomColumnMappings = Dict(("id" => 1), ("type" => 2), ("xs" => 3), ("ys" => 4), ("zs" => 5))


function getAtomItem(itemName, atomColumnMappings, positionColumn, numberOfAtoms)

    columnIndex = atomColumnMappings[itemName]
    indexStart = ((columnIndex - 1) * numberOfAtoms) + 1
    indexEnd = columnIndex * numberOfAtoms
    @view positionColumn[indexStart:indexEnd]

end
struct LammpsResolved
    timestep 
    natoms 
    boxsize 
    positions_id 
    positions_type 
    positions_xs 
    positions_ys 
    positions_zs 
    
     LammpsResolved(lammp , atomColumnMappings) = new(
        lammp.timestep,
        lammp.natoms,
        lammp.boxsize,
        getAtomItem("id", atomColumnMappings, lammp.positions, lammp.natoms),
        getAtomItem("type", atomColumnMappings, lammp.positions, lammp.natoms),
        getAtomItem("xs", atomColumnMappings, lammp.positions, lammp.natoms),
        getAtomItem("ys", atomColumnMappings, lammp.positions, lammp.natoms),
        getAtomItem("zs", atomColumnMappings, lammp.positions, lammp.natoms)
        )
end

function parseLammpToArrow(file, streamBatchSize=50)
    open(file) do f
        let writer = open(Arrow.Writer, arrowFile)
            local batchSize = streamBatchSize
            local batch = 0
            local firstBatch = true
            local timesteps = Vector{Int}()
            local lammpsCollection = Vector{Lammps}()

            while !eof(f)

                timeStep = parse(Int, split(readuntil(f, "ITEM: NUMBER OF ATOMS"), "\n")[2])
                natoms = parse(Int, split(readuntil(f, "ITEM: BOX BOUNDS pp pp pp"), "\n")[2])
                boxsize = Matrix{Float64}(CSV.read(IOBuffer(readuntil(f, "ITEM: ATOMS")), DataFrame, header=false))
                positions = Matrix{Float64}(CSV.read(IOBuffer(readuntil(f, "ITEM: TIMESTEP")), DataFrame; delim=' ', ignorerepeated=true))

                l = Lammps(timeStep, natoms, boxsize, positions)
                push!(timesteps, timeStep)
                push!(lammpsCollection, l)
                batch += 1
                if batch == batchSize
                    table = (timeStep=timesteps, lammps=lammpsCollection)
                    Arrow.write(writer, table)#,ntasks=1;maxdepth=100)
                    batch = 0
                    timesteps = Vector{Int}()
                    lammpsCollection = Vector{Lammps}()
                end
            end
            if length(timesteps) > 0
                table = (timeStep=timesteps, lammps=lammpsCollection)
                Arrow.write(writer, table)
            end
            close(writer)
        end
    end
end

parseLammpToArrow(file)

a = Arrow.Table(arrowFile)

#can now access by column and index in < ms 
b = a[2]
#dont want to run btime below so mitigate first compilation:
#show rather than test since the input file will change and this is just a poc
@show getAtomItem("id", atomColumnMappings, b[1].positions, b[1].natoms)[1] == 1
@show getAtomItem("type", atomColumnMappings, b[1].positions, b[1].natoms)[1] == 1
@show getAtomItem("xs", atomColumnMappings, b[1].positions, b[1].natoms)[1] == 0.239375
@show getAtomItem("ys", atomColumnMappings, b[1].positions, b[1].natoms)[1] == 0.505765
@show getAtomItem("zs", atomColumnMappings, b[1].positions, b[1].natoms)[1] == 0.143033
l = LammpsResolved(b[1],atomColumnMappings)
@show l.natoms

#@code_warntype getAtomItem("id", atomColumnMappings, b[1].positions, b[1].natoms) 
function iterateB(b)
    atomColumnMappings :: Dict{String, Int} = Dict(("id" => 1), ("type" => 2), ("xs" => 3), ("ys" => 4), ("zs" => 5))
   
    for i in 1:length(b)
     l = LammpsResolved(b[i],atomColumnMappings)
    end
end
iterateB(a[2])
@time iterateB(a[2])

function iterateBThreaded(b)
    atomColumnMappings :: Dict{String, Int} = Dict(("id" => 1), ("type" => 2), ("xs" => 3), ("ys" => 4), ("zs" => 5))
    Base.Threads.@threads for i in 1:length(b)
        l = LammpsResolved(b[i] , atomColumnMappings)
    end
end

@show Base.Threads.nthreads()
iterateBThreaded(a[2])
@time iterateBThreaded(a[2])


youโ€™ll note that I made use of the @rafael.guerra struct and the readuntil approach to shorten turnaround. There is a helper function to facilitate the positions access (since they end up as a vector with offsets) and Iโ€™ve type them to float64 for convenience (so you may want/need to convert the id or type fields back to ints). hopefully clear in the โ€œiterateBโ€ functions on how to use these to get back what you want.

the writing is done in a batch approach since Im aware of your larger 10GB scenario which would (potentially still could) probably blow up your machine when initially writing if we do as a normal โ€œArrow.writeโ€. If you do find that the larger scenario blows up, writing to many files will (split by timestamp) would be the fix. One thing to point out is that the direct arrow.write is actually preferable, since the merged version creates sentinel arrays; that can sometimes be slower to access (but not by much).

A major win here other than the access speed is that you can now safely do in a threaded manner - the function ive written above is not the recommended way to do it, though (you should use chunking) but it serves the poc.

Also - Iโ€™ve completely forgotten how to collapse a code block - but will do if someone posts how โ† sorted

EDIT - the original code was allocating c. 15MB of data unnecessarily since the struct hadnt been mapped to an ArrowType and therefore was a NamedTuple only. making this change the timings are:

@time iterateB(a[2]) = 0.000082 seconds (1.81 k allocations: 161.375 KiB)
@time iterateBThreaded(a[2]) = 0.000137 seconds (1.86 k allocations: 166.078 KiB)

note that we still get allocations (about 800 less) but the size is greatly reduced and the speed is also greatly increased (from ms to us) and since this is a โ€œallocation and performanceโ€ thread it probably warrants this change. Note that because this is now so much faster the overhead of obtaining threads dominates the threaded versions timings.

Regards,

See:

1 Like

Here is a summary of some of the solutions presented so far in this thread.

Most refer to the lammps โ€œdump fileโ€ format, so here is a code that generates an identical file, for reproducibility.

Creating file
# CREATING FILE
filename = "generated_file.lammpstrj"

steps::Int64 = 1e2
natoms::Int64 = 1e3
open(filename, "w") do f
    for i in 1:steps
        println(f, "ITEM: TIMESTEP")
        println(f, "$i")
        println(f, "ITEM: NUMBER OF ATOMS")
        println(f, "$natoms")
        println(f, "ITEM: BOX BOUNDS pp pp pp")
        println(f, "$(rand()) $(rand())")
        println(f, "$(rand()) $(rand())")
        println(f, "$(rand()) $(rand())")
        println(f, "ITEM: ATOMS id type xs ys zs")
        for j in 1:natoms
            println(f, "$j 1 $(rand()) $(rand()) $(rand())")
        end
    end
end

Here are some ways to read the data from the file.

CSV skiplines loop

#Too slow, dont even bother!

for i in 1:steps
    boxlengths = CSV.read(filename, DataFrame, skipto=floor(Int, (i - 1) * (natoms + 9) + 6), limit=natoms, header=false, delim=' ')
    positions = CSV.read(filename, DataFrame, skipto=floor(Int, (i - 1) * (natoms + 9) + 10), limit=natoms, header=false, delim=' ')
end

Numpy.loadtxt
np = pyimport("numpy")

@benchmark for i in 1:steps
    boxlengths = pyconvert(Matrix, np.loadtxt(filename, skiprows=floor(Int,(i - 1) * (natoms + 9) + 5), max_rows=3, delimiter=' '))
    positions = pyconvert(Matrix, np.loadtxt(filename, skiprows=floor(Int, (i - 1) * (natoms + 9) + 9), max_rows=natoms, delimiter=' '))
end



BenchmarkTools.Trial: 3 samples with 1 evaluation.
 Range (min โ€ฆ max):  1.958 s โ€ฆ  1.970 s  โ”Š GC (min โ€ฆ max): 0.00% โ€ฆ 0.00%
 Time  (median):     1.963 s             โ”Š GC (median):    0.00%
 Time  (mean ยฑ ฯƒ):   1.964 s ยฑ 5.719 ms  โ”Š GC (mean ยฑ ฯƒ):  0.00% ยฑ 0.00%

  โ–ˆ                       โ–ˆ                              โ–ˆ  
  โ–ˆโ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–ˆโ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–ˆ โ–
  1.96 s        Histogram: frequency by time        1.97 s <

 Memory estimate: 4.12 MiB, allocs estimate: 9196.

read line by line, parse to Vector{Vector{Float}}
boxlengthsv::Vector{Float64} = zeros(3)
positionsv::Vector{Vector{Float64}} = [zeros(3) for _ in 1:natoms]

@benchmark open(filename) do io

    for s in 1:steps

        # Skip headers
        for _ in 1:5
            readline(io)
        end

        # Read box bounds
        for i in 1:3
            boxlengthsv[i] = sum(abs.(parse.(Float64, split(readline(io), ' '))))
        end

        # Skip header
        readline(io)

        # Read positions
        for i in 1:natoms
            positionsv[i] = parse.(Float64, split(readline(io), ' '))[3:5]
        end

        #Do calculations here

    end

end

BenchmarkTools.Trial: 21 samples with 1 evaluation.
 Range (min โ€ฆ max):  238.321 ms โ€ฆ 251.300 ms  โ”Š GC (min โ€ฆ max): 1.62% โ€ฆ 2.71%
 Time  (median):     243.535 ms               โ”Š GC (median):    1.48%
 Time  (mean ยฑ ฯƒ):   243.903 ms ยฑ   3.424 ms  โ”Š GC (mean ยฑ ฯƒ):  1.88% ยฑ 0.78%

  โ–  โ–     โ– โ–โ– โ–โ–  โ–โ–   โ–โ– โ–  โ–  โ–ˆ  โ–   โ–    โ–โ–     โ–        โ–
  โ–ˆโ–โ–โ–ˆโ–โ–โ–โ–โ–โ–ˆโ–โ–ˆโ–ˆโ–โ–ˆโ–ˆโ–โ–โ–ˆโ–ˆโ–โ–โ–โ–ˆโ–ˆโ–โ–ˆโ–โ–โ–ˆโ–โ–โ–ˆโ–โ–โ–ˆโ–โ–โ–โ–ˆโ–โ–โ–โ–โ–ˆโ–ˆโ–โ–โ–โ–โ–โ–ˆโ–โ–โ–โ–โ–โ–โ–โ–โ–ˆ โ–
  238 ms           Histogram: frequency by time          251 ms <

 Memory estimate: 50.70 MiB, allocs estimate: 502007.


@SteffenPL method
@benchmark open(filename) do f

    while !eof(f)
        readuntil(f, "ITEM: BOX BOUNDS")
        boxsize = CSV.read(IOBuffer(readuntil(f, "ITEM: ATOMS")), DataFrame, header=false, skipto=2)
        positions = CSV.read(IOBuffer(readuntil(f, "ITEM: ")), DataFrame, header=false, skipto=2)

    end
end

BenchmarkTools.Trial: 12 samples with 1 evaluation.
 Range (min โ€ฆ max):  297.871 ms โ€ฆ 617.403 ms  โ”Š GC (min โ€ฆ max): 0.00% โ€ฆ 1.59%
 Time  (median):     423.428 ms               โ”Š GC (median):    0.69%
 Time  (mean ยฑ ฯƒ):   423.796 ms ยฑ  88.632 ms  โ”Š GC (mean ยฑ ฯƒ):  0.84% ยฑ 0.77%

  โ–ˆ โ–ˆ    โ–ˆ          โ–ˆ โ–ˆ โ–ˆ  โ–ˆโ–ˆโ–ˆ      โ–ˆ      โ–ˆ                  โ–ˆ
  โ–ˆโ–โ–ˆโ–โ–โ–โ–โ–ˆโ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–ˆโ–โ–ˆโ–โ–ˆโ–โ–โ–ˆโ–ˆโ–ˆโ–โ–โ–โ–โ–โ–โ–ˆโ–โ–โ–โ–โ–โ–โ–ˆโ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–ˆ โ–
  298 ms           Histogram: frequency by time          617 ms <

 Memory estimate: 26.47 MiB, allocs estimate: 142004.

@rafael.guerra solution using a structure
Base.@kwdef mutable struct MyData
    timestep::Vector{Int} = Int[]
    natoms::Int = 0
    boxsize::Matrix{Float64} = Array{Float64}(undef, 0, 0)
    positions::DataFrame = DataFrame()
end

M = MyData()

# Read repeated items only once:
open(filename) do f
    readuntil(f, "ITEM: NUMBER OF ATOMS")
    M.natoms = parse(Int, readuntil(f, "ITEM: BOX BOUNDS pp pp pp"))
    M.boxsize = Matrix{Float64}(CSV.read(IOBuffer(readuntil(f, "ITEM: ATOMS")), DataFrame, header=false))
end

# Read varying items:
@benchmark open(filename) do f
    readline(f)
    while !eof(f)
        push!(M.timestep, parse(Int, readuntil(f, "ITEM: NUMBER OF ATOMS")))
        readuntil(f, "ITEM: ATOMS")
        M.positions = vcat(M.positions, CSV.read(IOBuffer(readuntil(f, "ITEM: TIMESTEP")), DataFrame; delim=' ', ignorerepeated=true))
    end
end

# Access loaded data with:

M.timestep
M.natoms
M.boxsize
M.positions
M.positions.zs

BenchmarkTools.Trial: 3 samples with 1 evaluation.
 Range (min โ€ฆ max):  1.586 s โ€ฆ    1.973 s  โ”Š GC (min โ€ฆ max): 16.45% โ€ฆ 28.38%
 Time  (median):     1.832 s               โ”Š GC (median):    16.99%
 Time  (mean ยฑ ฯƒ):   1.797 s ยฑ 195.888 ms  โ”Š GC (mean ยฑ ฯƒ):  21.00% ยฑ  6.74%

  โ–ˆ                                   โ–ˆ                    โ–ˆ
  โ–ˆโ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–ˆโ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–ˆ โ–
  1.59 s         Histogram: frequency by time         1.97 s <

 Memory estimate: 3.19 GiB, allocs estimate: 128727.

@rocco_sprmnt21 solutions

Using tuples

np = Vector{NTuple{3,Float64}}()
@benchmark open(file) do f
    while !eof(f)
        readuntil(f, "ITEM: ATOMS id type xs ys zs\n")
        m = CSV.read(IOBuffer(readuntil(f, "ITEM: TIMESTEP")), CSV.Tables.matrix; delim=' ', header=false, ignorerepeated=true)
        np = [(r[3], r[4], r[5]) for r in eachrow(m)]
        # append!(positions,np )
    end
end


nchmarkTools.Trial: 8 samples with 1 evaluation.
 Range (min โ€ฆ max):  638.898 ms โ€ฆ 704.015 ms  โ”Š GC (min โ€ฆ max): 0.40% โ€ฆ 0.71%
 Time  (median):     656.561 ms               โ”Š GC (median):    0.65%
 Time  (mean ยฑ ฯƒ):   661.835 ms ยฑ  20.798 ms  โ”Š GC (mean ยฑ ฯƒ):  0.63% ยฑ 0.19%

  โ– โ–            โ–โ–ˆ        โ–        โ–                         โ–
  โ–ˆโ–โ–ˆโ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–ˆโ–ˆโ–โ–โ–โ–โ–โ–โ–โ–โ–ˆโ–โ–โ–โ–โ–โ–โ–โ–โ–ˆโ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–โ–ˆ โ–
  639 ms           Histogram: frequency by time          704 ms <

 Memory estimate: 77.66 MiB, allocs estimate: 154600.

Using SVectors

using StaticArrays


currblock = Vector{SVector{3,Float64}}()
@benchmark open(filename) do f
    readuntil(f, "ITEM: ATOMS")
    while !eof(f)
        currblock = [SA[r[3], r[4], r[5]] for r in eachrow(CSV.read(IOBuffer(readuntil(f, "ITEM: TIMESTEP")), CSV.Tables.matrix; delim=' ', ignorerepeated=true))]
        # append!(positions,currblock )
        readuntil(f, "ITEM: ATOMS")
    end
end



BenchmarkTools.Trial: 18 samples with 1 evaluation.
 Range (min โ€ฆ max):  282.862 ms โ€ฆ 298.363 ms  โ”Š GC (min โ€ฆ max): 0.00% โ€ฆ 0.57%
 Time  (median):     291.831 ms               โ”Š GC (median):    0.56%
 Time  (mean ยฑ ฯƒ):   291.904 ms ยฑ   4.664 ms  โ”Š GC (mean ยฑ ฯƒ):  0.39% ยฑ 0.33%

  โ–         โ–     โ–โ–  โ–   โ–  โ–ˆ     โ–   โ–       โ– โ–โ–    โ–ˆ โ–  โ– โ–
  โ–ˆโ–โ–โ–โ–โ–โ–โ–โ–โ–โ–ˆโ–โ–โ–โ–โ–โ–ˆโ–ˆโ–โ–โ–ˆโ–โ–โ–โ–ˆโ–โ–โ–ˆโ–โ–โ–โ–โ–โ–ˆโ–โ–โ–โ–ˆโ–โ–โ–โ–โ–โ–โ–โ–ˆโ–โ–ˆโ–ˆโ–โ–โ–โ–โ–ˆโ–โ–ˆโ–โ–โ–ˆโ–โ–ˆ โ–
  283 ms           Histogram: frequency by time          298 ms <

 Memory estimate: 29.11 MiB, allocs estimate: 109039.
@djholiver Arrow example
using Arrow, CSV, DataFrames, BenchmarkTools

arrowFile = "lammpDump.arrow"

Base.@kwdef struct Lammps
    timestep = 0
    natoms = 0
    boxsize#::Matrix{Float64} = Array{Float64}(undef, 0, 0)
    positions#::Matrix{Float64} = Array{Float64}(undef, 0, 0)
end
ArrowTypes.arrowname(::Type{Lammps}) = :Lammps
ArrowTypes.JuliaType(::Val{:Lammps}) = Lammps
atomColumnMappings = Dict(("id" => 1), ("type" => 2), ("xs" => 3), ("ys" => 4), ("zs" => 5))


function getAtomItem(itemName, atomColumnMappings, positionColumn, numberOfAtoms)

    columnIndex = atomColumnMappings[itemName]
    indexStart = ((columnIndex - 1) * numberOfAtoms) + 1
    indexEnd = columnIndex * numberOfAtoms
    @view positionColumn[indexStart:indexEnd]

end
struct LammpsResolved
    timestep 
    natoms 
    boxsize 
    positions_id 
    positions_type 
    positions_xs 
    positions_ys 
    positions_zs 
    
     LammpsResolved(lammp , atomColumnMappings) = new(
        lammp.timestep,
        lammp.natoms,
        lammp.boxsize,
        getAtomItem("id", atomColumnMappings, lammp.positions, lammp.natoms),
        getAtomItem("type", atomColumnMappings, lammp.positions, lammp.natoms),
        getAtomItem("xs", atomColumnMappings, lammp.positions, lammp.natoms),
        getAtomItem("ys", atomColumnMappings, lammp.positions, lammp.natoms),
        getAtomItem("zs", atomColumnMappings, lammp.positions, lammp.natoms)
        )
end

function parseLammpToArrow(file, streamBatchSize=50)
    open(file) do f
        let writer = open(Arrow.Writer, arrowFile)
            local batchSize = streamBatchSize
            local batch = 0
            local firstBatch = true
            local timesteps = Vector{Int}()
            local lammpsCollection = Vector{Lammps}()

            while !eof(f)

                timeStep = parse(Int, split(readuntil(f, "ITEM: NUMBER OF ATOMS"), "\n")[2])
                natoms = parse(Int, split(readuntil(f, "ITEM: BOX BOUNDS pp pp pp"), "\n")[2])
                boxsize = Matrix{Float64}(CSV.read(IOBuffer(readuntil(f, "ITEM: ATOMS")), DataFrame, header=false))
                positions = Matrix{Float64}(CSV.read(IOBuffer(readuntil(f, "ITEM: TIMESTEP")), DataFrame; delim=' ', ignorerepeated=true))

                l = Lammps(timeStep, natoms, boxsize, positions)
                push!(timesteps, timeStep)
                push!(lammpsCollection, l)
                batch += 1
                if batch == batchSize
                    table = (timeStep=timesteps, lammps=lammpsCollection)
                    Arrow.write(writer, table)#,ntasks=1;maxdepth=100)
                    batch = 0
                    timesteps = Vector{Int}()
                    lammpsCollection = Vector{Lammps}()
                end
            end
            if length(timesteps) > 0
                table = (timeStep=timesteps, lammps=lammpsCollection)
                Arrow.write(writer, table)
            end
            close(writer)
        end
    end
end

parseLammpToArrow(filename)

a = Arrow.Table(arrowFile)

#can now access by column and index in < ms 
b = a[2]


#@code_warntype getAtomItem("id", atomColumnMappings, b[1].positions, b[1].natoms) 
function iterateB(b)
    atomColumnMappings :: Dict{String, Int} = Dict(("id" => 1), ("type" => 2), ("xs" => 3), ("ys" => 4), ("zs" => 5))
   
    for i in 1:length(b)
     l = LammpsResolved(b[i],atomColumnMappings)
    end
end
iterateB(a[2])
@benchmark iterateB(a[2])

BenchmarkTools.Trial: 10000 samples with 1 evaluation.
 Range (min โ€ฆ max):  20.500 ฮผs โ€ฆ  2.437 ms  โ”Š GC (min โ€ฆ max): 0.00% โ€ฆ 91.02%
 Time  (median):     24.700 ฮผs              โ”Š GC (median):    0.00%
 Time  (mean ยฑ ฯƒ):   38.492 ฮผs ยฑ 72.942 ฮผs  โ”Š GC (mean ยฑ ฯƒ):  5.69% ยฑ  3.76%

  โ–ˆโ–‡โ–†โ–„โ–ƒโ–‚โ–‚โ–‚โ–ƒโ–‚โ–โ–‚โ–โ–‚โ–‚โ–‚โ–‚โ–‚โ–โ–โ–โ–                                      โ–
  โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–‡โ–‡โ–†โ–†โ–‡โ–‡โ–…โ–†โ–…โ–†โ–†โ–…โ–†โ–…โ–†โ–…โ–…โ–…โ–…โ–…โ–…โ–…โ–†โ–†โ–†โ–†โ–†โ–†โ–†โ–…โ–†โ–…โ–…โ–…โ–…โ–† โ–ˆ
  20.5 ฮผs      Histogram: log(frequency) by time       168 ฮผs <

 Memory estimate: 78.64 KiB, allocs estimate: 804.



function iterateBThreaded(b)
    atomColumnMappings :: Dict{String, Int} = Dict(("id" => 1), ("type" => 2), ("xs" => 3), ("ys" => 4), ("zs" => 5))
    Base.Threads.@threads for i in 1:length(b)
        l = LammpsResolved(b[i] , atomColumnMappings)
    end
end

@show Base.Threads.nthreads()
@benchmark iterateBThreaded(a[2])
BenchmarkTools.Trial: 10000 samples with 1 evaluation.
 Range (min โ€ฆ max):  22.100 ฮผs โ€ฆ  1.337 ms  โ”Š GC (min โ€ฆ max): 0.00% โ€ฆ 94.74%
 Time  (median):     24.000 ฮผs              โ”Š GC (median):    0.00%
 Time  (mean ยฑ ฯƒ):   27.120 ฮผs ยฑ 35.097 ฮผs  โ”Š GC (mean ยฑ ฯƒ):  4.89% ยฑ  3.85%

  โ–‡โ–ˆโ–ˆโ–‡โ–„โ–„โ–ƒโ–‚โ–‚โ–โ–‚โ–                                                โ–‚
  โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–‡โ–†โ–†โ–†โ–…โ–†โ–…โ–ˆโ–ˆโ–†โ–†โ–†โ–‡โ–‡โ–†โ–…โ–†โ–†โ–†โ–„โ–„โ–„โ–„โ–…โ–…โ–†โ–…โ–†โ–…โ–…โ–„โ–…โ–„โ–…โ–…โ–„โ–†โ–†โ–„โ–„โ–†โ–†โ–…โ–„โ–… โ–ˆ
  22.1 ฮผs      Histogram: log(frequency) by time      65.6 ฮผs <

 Memory estimate: 79.33 KiB, allocs estimate: 810.

@jules parsers (very fast)
using Parsers
using BenchmarkTools
using Mmap

@inline function skipline(io)
    skipchars(!=('\n'), io)
    skip(io, 1)
end

@inline function _parse(T, io)
    v = Parsers.parse(T, io)
    skip(io, 1) # either space or newline
    return v
end

struct Atom
    id::Int
    type::Int
    x::Float64
    y::Float64
    z::Float64
end

function read_step(io)
    skipline(io)
    timestep = _parse(Int, io)
    skipline(io)
    natoms = _parse(Int, io)
    skipline(io)
    box_bounds = ntuple(Val(3)) do _
        x = _parse(Float64, io)
        y = _parse(Float64, io)
        (; x, y)
    end
    skipline(io)
    atoms = Vector{Atom}(undef, natoms)
    for i in eachindex(atoms)
        id = _parse(Int, io)
        type = _parse(Int, io)
        x = _parse(Float64, io)
        y = _parse(Float64, io)
        z = _parse(Float64, io)
        atoms[i] = Atom(id, type, x, y, z)
    end
    (; timestep, natoms, box_bounds, atoms)
end

function read_lammps(file)
    steps = 100
    open(file, "r") do handle
        m = Mmap.mmap(handle)
        io = IOBuffer(m)
        for i in 1:steps
            data = read_step(io)

        end
    end
end

@benchmark read_lammps("generated_file.lammpstrj")

BenchmarkTools.Trial: 305 samples with 1 evaluation.
 Range (min โ€ฆ max):  14.076 ms โ€ฆ 23.616 ms  โ”Š GC (min โ€ฆ max): 0.00% โ€ฆ 0.00%
 Time  (median):     15.816 ms              โ”Š GC (median):    0.00%
 Time  (mean ยฑ ฯƒ):   16.374 ms ยฑ  1.896 ms  โ”Š GC (mean ยฑ ฯƒ):  1.16% ยฑ 3.21%

       โ–ˆโ–†โ–„โ–ƒโ–ƒโ–ƒโ–†โ–‚ โ–‚โ–
  โ–„โ–…โ–†โ–‡โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–„โ–†โ–‡โ–„โ–‡โ–„โ–ƒโ–ƒโ–„โ–โ–„โ–ƒโ–ƒโ–„โ–„โ–ƒโ–โ–ƒโ–โ–ƒโ–โ–โ–„โ–โ–ƒโ–ƒโ–โ–โ–โ–„โ–„โ–โ–โ–โ–โ–โ–ƒโ–โ–ƒโ–โ–„โ–ƒ โ–„
  14.1 ms         Histogram: frequency by time        23.1 ms <

 Memory estimate: 3.82 MiB, allocs estimate: 215.

Methods not tested yet:
SQlite, recommended by @mnemnion
DuckDB, recommended by @rdavis120

Hope I didnโ€™t miss anything!

4 Likes

Thanks @aris , if possible, please make use of the code that I updated and edited today and benchmark the โ€œiterateB(a[2])โ€ method as a solution comparison with the other submissions, since that reflects your original ask. The convert to Arrow step โ€œparseLammpToArrowโ€ that you have currently benchmarked is a a one time only change of file format, you then access and use the data in your loops as with the other solutions.

To be clear, using your original file attached earlier in the thread and benchmarking the โ€œiterateBโ€ method I get:

BenchmarkTools.Trial: 10000 samples with 1 evaluation.
 Range (min โ€ฆ max):  55.400 ฮผs โ€ฆ   3.869 ms  โ”Š GC (min โ€ฆ max): 0.00% โ€ฆ 95.33%
 Time  (median):     61.700 ฮผs               โ”Š GC (median):    0.00%
 Time  (mean ยฑ ฯƒ):   81.413 ฮผs ยฑ 119.051 ฮผs  โ”Š GC (mean ยฑ ฯƒ):  7.59% ยฑ  5.52%

  โ–…โ–ˆโ–…โ–„โ–ƒโ–ƒโ–ƒโ–‚โ–โ– โ–โ–‚โ–โ–  โ–‚โ–‚โ–                                         โ–
  โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–‡โ–‡โ–‡โ–…โ–†โ–†โ–†โ–†โ–‡โ–†โ–†โ–†โ–†โ–†โ–†โ–…โ–†โ–†โ–…โ–…โ–…โ–„โ–„โ–…โ–…โ–…โ–„โ–„โ–„โ–„โ–…โ–ƒโ–ƒโ–ƒโ–„โ–„โ–„โ–ƒ โ–ˆ
  55.4 ฮผs       Histogram: log(frequency) by time       260 ฮผs <

 Memory estimate: 160.59 KiB, allocs estimate: 1807.

Hopefully the above makes sense - the specific point is that, if you were to adopt, you may find that, following initial conversion, your ability to interact with the lammps objects (including filtering by timestep and doing in parrallel) is significantly quicker and less memory intensive.

Regards,

Thank you for the correction @djholiver!

Iโ€™m not really familliar with arrows, hopefully I didnโ€™t mess it up this time. I updated your solution in my post, just removed the "show rather than test " part, because the numbers wouldnโ€™t match the randomly generated ones that Iโ€™m using.

1 Like

ah yes - this is more like it - as much as the data may be slightly different the others, presumably the volume must be identical and therefore the results still indicative and there is value in the comparison?

note that this is running in microseconds - are you able to consume the objects generated in your use case? that would be the acid test.

Regards,

Not sure if I understand correcly, but I used the same data file (generated as described in this post) for all the benchmarks. Perhaps benchmarking the whole code block in your example might be more of a โ€œfairโ€ comparisson to the other examples, as writing the arrow file would also take some time.

Anyhow, I guess itโ€™s not just about achieving the best time, but about accessing the data in a way that suits oneโ€™s application, whatever that might be.

I was wondering how fast a naive parsing function would be that doesnโ€™t really validate much, just assumes your input format is correct and skips the appropriate number of lines / spaces.

using Parsers
using BenchmarkTools
using Mmap

@inline function skipline(io)
    skipchars(!=('\n'), io)
    skip(io, 1)
end

@inline function _parse(T, io)
    v = Parsers.parse(T, io)
    skip(io, 1) # either space or newline
    return v
end

struct Atom
    id::Int
    type::Int
    x::Float64
    y::Float64
    z::Float64
end

function read_step(io)
    skipline(io)
    timestep = _parse(Int, io)
    skipline(io)
    natoms = _parse(Int, io)
    skipline(io)
    box_bounds = ntuple(Val(3)) do _
        x = _parse(Float64, io)
        y = _parse(Float64, io)
        (; x, y)
    end
    skipline(io)
    atoms = Vector{Atom}(undef, natoms)
    for i in eachindex(atoms)
        id = _parse(Int, io)
        type = _parse(Int, io)
        x = _parse(Float64, io)
        y = _parse(Float64, io)
        z = _parse(Float64, io)
        atoms[i] = Atom(id, type, x, y, z)
    end
    (; timestep, natoms, box_bounds, atoms)
end

function read_lammps(file)
    steps = 100
    open(file, "r") do handle
        m = Mmap.mmap(handle)
        io = IOBuffer(m)
        map(1:steps) do _
            read_step(io)
        end
    end
end

@benchmark read_lammps("generated_file.lammpstrj")
BenchmarkTools.Trial: 480 samples with 1 evaluation.
 Range (min โ€ฆ max):   9.692 ms โ€ฆ  15.867 ms  โ”Š GC (min โ€ฆ max): 0.00% โ€ฆ 0.00%
 Time  (median):     10.101 ms               โ”Š GC (median):    0.00%
 Time  (mean ยฑ ฯƒ):   10.423 ms ยฑ 917.812 ฮผs  โ”Š GC (mean ยฑ ฯƒ):  2.11% ยฑ 5.69%

    โ–„โ–‡โ–ˆโ–†โ–†โ–…โ–„โ–‚โ–‚                                                   
  โ–…โ–…โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–…โ–†โ–ˆโ–„โ–…โ–…โ–„โ–โ–„โ–โ–„โ–โ–„โ–โ–โ–โ–โ–…โ–…โ–†โ–โ–‡โ–…โ–„โ–„โ–โ–„โ–…โ–„โ–โ–†โ–„โ–‡โ–‡โ–„โ–„โ–โ–โ–โ–„โ–„โ–โ–โ–„โ–„โ–โ–„ โ–‡
  9.69 ms       Histogram: log(frequency) by time      14.1 ms <

 Memory estimate: 3.83 MiB, allocs estimate: 216.

Thank you very much for your input @jules.

Thatโ€™s ligtning fast in my machine as well, impressive!

Just to clarify, that would load everything in the structure created by the read_lammps() function, right?

If thatโ€™s the case, how would you modify the function so that every timestep is replaced by the next one within the โ€œ1:stepsโ€ loop?
I would rather process one timestep at a time, since some files are a bit too big to be loaded in the ram.

You would replace

map(1:steps) do _
    read_step(io)
end

with something like

for _ in 1:steps
    data = read_step(io)
    do_something_with_data(data)
end
1 Like

or this way

function steps(file)
    open(file) do handle
        io=IOBuffer(mmap(handle))
        while !eof(io)
            data = read_step(io)
            do_something_with_data(data)
        end
    end  
end

But this post is mainly to ask questions not to give answers.
Why is mmap() so much faster than read() and seek()?
Should/could CSV and affected packages use this function instead of read()?
I used the file in link to a file using the lammps format and I ran into a couple of problems:

  1. there is a value โ€œ1โ€ between the positions of the atoms xs ys and sz of some step which makes the parsing fail and sends the script into error. I corrected it to โ€œ1.โ€ and the problem seems to have been overcome;
  2. there is a problem in the parsing of the second integer which affects the parsing of the first Float64. I couldnโ€™t understand where it came from exactly because I didnโ€™t understand exactly how the Parsers.parse() function works if and how it consumes the streaming bytes
read_lammps(file)
1-element Vector{Vector{Atom}}:
 [Atom(1, 0, 239375.0, 0.505765, 0.143033), Atom(2, 0, 254461.0, 0.510929, 0.143544), Atom(3, 0, 240602.0, 0.489869, 0.142478), Atom(4, 0, 23796.0, 0.503614, 0.142924), Atom(5, 0, 595518.0, 0.0928577, 0.411568)

I used mmap after reading this post

1 Like

I followed, as far as I could understand, the discussion on the possibilities and limitations of mmap.
For this reason and to gain experience with some aspects of IO, I tried to โ€œplayโ€ with the following script which only uses the basic functions and the float parsing function as defined by @mkitti

no dependencies and seems very fast
julia> using BenchmarkTools

julia> file="IO_lammpstr.txt"
"IO_lammpstr.txt"

julia> function pmk(bytes::AbstractVector{UInt8}, _start::Int = 1, _end::Int = length(bytes))
           int_val = 0
           power = 0
           for i in _start:_end
               byte = bytes[i]
               if byte != 0x2e
                   int_val *= 10
                   int_val += byte - 0x30
                   power *= 10
               else
                   power = 1
               end
           end
           q = int_val / power 
           return q
       end
pmk (generic function with 3 methods)

julia> function fn(c,ch, s)
           for i in s:lastindex(ch)
               c==ch[i] && return i
           end
       end
fn (generic function with 1 method)

julia> function chnkpmk(ch)
           res=[(0.,0.,0.) for _ in 1:2000]
           ez=i=1
           while ez <length(ch)
               ei1=fn(0x20,ch,ez)
               ei2=fn(0x20,ch,ei1+1)
               ex=fn(0x20,ch,ei2+1)
               x=pmk(ch,ei2+1,ex-1)
               ey=fn(0x20,ch,ex+1)
               y=pmk(ch,ex+1,ey-1)
               ez=fn(0x0a,ch,ey)
               z=pmk(ch,ey+1,ez-1)
               res[i]=(x,y,z)
               i+=1
           end
           res
       end
chnkpmk (generic function with 1 method)

julia> function atomsb(buffer, L)
           from=codeunits("TEM: ATOMS id type xs ys zs")
           to=codeunits("ITEM: TIMESTEP")
           lto=length(to)
           append!(buffer,to)
           t=last(findfirst(to,buffer))
           f=last(findfirst(from,buffer))
           while true
               t=last(findnext(to,buffer,f))
               ch=@view buffer[f+2:t-lto]
               t>L&& (return f)
               chnkpmk(ch)
               f=last(findnext(from,buffer,t))
           end

       end
atomsb (generic function with 1 method)

julia> function fa(file, chunk=10^6)
           buffer=[0x0]
           io=open(file)
           pos=1
           while !eof(io)
           L=readbytes!(io, buffer,chunk)
           p=atomsb(buffer, L)
           pos+=p
           seek(io,pos)
           end
       end
fa (generic function with 2 methods)

julia>  @btime fa(file)
  24.145 ms (391 allocations: 10.73 MiB)

julia>  @btime fa(file,3*10^5)
  21.692 ms (318 allocations: 7.95 MiB)

julia>  @btime fa(file,10^5)
  8.434 ms (16 allocations: 247.91 KiB)

Iโ€™m not entirely sure that something isnโ€™t lost in the transitions from one chunk to another. :laughing:
But since itโ€™s very fast and shows how performance can vary depending on the size of the chin k, maybe itโ€™s worth testing it a bit

below are some tests to see how many steps per chunk are โ€œprocessedโ€

tests

function atomsbtest(buffer, L)
    from=codeunits("TEM: ATOMS id type xs ys zs")
    to=codeunits("ITEM: TIMESTEP")
    lto=length(to)
    append!(buffer,to)
    t=last(findfirst(to,buffer))
    f=last(findfirst(from,buffer))
    i=1
    while true
        t=last(findnext(to,buffer,f))
        ch=@view buffer[f+2:t-lto]
        t>L&& (return (f,i))
        chnkpmk(ch)
        i+=1
        f=last(findnext(from,buffer,t))
    end

end
function fatest(file, chunk=10^6)
    buffer=[0x0]
    io=open(file)
    cn=[]
    pos=1
    while !eof(io)
    L=readbytes!(io, buffer,chunk)
    (p,i)=atomsb(buffer, L)
    pos+=p
    push!(cn,i)
    seek(io,pos)
    end
    cnjulia> function atomsbtest(buffer, L)
           from=codeunits("TEM: ATOMS id type xs ys zs")
           to=codeunits("ITEM: TIMESTEP")
           lto=length(to)
           append!(buffer,to)
           t=last(findfirst(to,buffer))
           f=last(findfirst(from,buffer))
           i=1
           while true
               t=last(findnext(to,buffer,f))
               ch=@view buffer[f+2:t-lto]
               t>L&& (return (f,i))
               chnkpmk(ch)
               i+=1
               f=last(findnext(from,buffer,t))
           end

       end
atomsbtest (generic function with 1 method)

julia> function fatest(file, chunk=10^6)
           buffer=[0x0]
           io=open(file)
           cn=[]
           pos=1
           while !eof(io)
           L=readbytes!(io, buffer,chunk)
           (p,i)=atomsbtest(buffer, L)
           pos+=p
           push!(cn,i)
           seek(io,pos)
           end
           cn
       end
fatest (generic function with 2 methods)

julia> fatest(file)
15-element Vector{Any}:
 16
 15
 15
 15
 15
 15
 15
 15
 15
 15
 15
 15
 15
  5
  1

julia> fatest(file,3*10^5)
51-element Vector{Any}:
 5
 4
 4
 4
 4
 4
 4
 4
 4
 4
 4
 4
 4
 4
 4
 4
 4
 4
 4
 4
 โ‹ฎ
 4
 4
 4
 4
 4
 4
 4
 4
 4
 4
 4
 4
 4
 4
 4
 4
 4
 4
 4
 1

julia> fatest(file,10^5)
201-element Vector{Any}:
 2
 1
 1
 1
 1
 1
 1
 1
 1
 1
 1
 1
 1
 1
 1
 1
 1
 1
 1
 1
 โ‹ฎ
 1
 1
 1
 1
 1
 1
 1
 1
 1
 1
 1
 1
 1
 1
 1
 1
 1
 1
 1
 1

This text will be hidden