Multi-threaded parsing of log file

In order to use julia for all the things, and to learn more about threads, I’m trying to parse a webserver log file using threads. In theory, each line is sent to a thread, a bunch of parsing functions run, the result saved to a dataframe, then dataframe written out to a parquet file. countlines(logfile) shows 247 million lines. Parts of the logfile contain binary data, so I try to handle it before calling a bunch of functions for data parsing. julia is run with -t auto and correctly sees the 16 cores.

From BenchmarkTools, the slowest part of the entire process is reading from the logfile (on NAS over a 1 gbps network). I thought that threads would allow the i/o wait to be less relevant.

Here’s what didn’t work:

Threads.@threads for rawline::String in eachline(logfile)
      line::String = try
      Unicode.normalize(rawline, :NFKC)
    catch
      return nothing
    end
  parser_function_one(line)
  parser_function_two(line)
end

Obviously that didn’t work because @threads wants an iterator, not a Base.EachLine type. So, I tried this, UPDATED from original post:

Threads.@threads for rawline::String in collect(eachline(logfile))
      line::String = try
      Unicode.normalize(rawline, :NFKC)
    catch
      return nothing
  end
  parser_function_one(line)
  parser_function_two(line)
end

I now have DataFrame errors, so more spelunking on my end. I thought DataFrames.jl was thread-safe, but maybe not the way I’m using it.

Pointers? What’s a better way to do this?

Thanks!

Threads can’t actually make the IO any faster, so it’s not exactly a trivial speedup, though it certainly could improve overall throughput if other threads can be parsing while waiting for data.

I would look at CSV.jl as a reference - fairly complex, but it has a well tested multi-threaded parser that does scale quite nicely.

# julia -t 8
using CSV, DataFrames, BenchmarkTools
df = DataFrame(rand(1_000_000, 10), :auto)
CSV.write("tmp.csv", df)

@btime CSV.read("tmp.csv", DataFrame) 
# 199.200 ms (2518 allocations: 77.09 MiB

@btime CSV.read("tmp.csv", DataFrame; threaded=false) 
#  669.982 ms (374 allocations: 84.32 MiB)
2 Likes

Just to add, I think the pattern CSV.jl uses is that it uses mmap or reads the file to get a vector of bytes, and then works on that byte buffer by detecting where each line starts (findrowstarts!), then having each thread parse some number of consective rows.

CSV’s code looks very complex to me but it has to handle all kinds of situations like newlines inside of strings (which then don’t actually start the next row), and not knowing the types or number of columns ahead of time, etc. I think if you have a highly structured log format probably you can get away with a lot less complication. But also maybe you can just use CSV.jl aha.

2 Likes

This is what I was hoping for. While waiting for i/o, other threads can parse lines.

Thanks! I’ve been staring at other threaded code and still trying to wrap my head around it.

Thank you for the feedback. Here’s how I solved it:

@sync @async for rawline::String in eachline(logfile)
    line::String = try
    Unicode.normalize(rawline, :NFKC)
  catch
    return nothing
  end
  parser_function_one(line)
  parser_function_two(line)
end

the collect() method works, but obviously loads each line into a vector in memory. This doesn’t seem to do that and uses tasks to keep the cores busy while parsing and writing the results of the functions to the dataframe.