Multi-threadded processing of large number of files faster when reading in batch

Dear All,

I would like to ask a question about a behavior of multi-threadding. My usecase is following. I have on disk 800 000 files containing compressed JSONs, and I want to find those containing some predefined set of keys (I use term matching). This problem can be trivially paralellized as follows:

using Folds, JSON3
@elapsed Folds.map(files) do f
	js = JSON3.read(IOBuffer(transcode(LZ4FrameDecompressor, read(f))), Dict{String,Any})
    match(rs, js)
end

Which takes 603s on 32 physical core machine and I use 32 threads. Note that match(rs, js) is the matching function which matches pattern rs to json js.

What is surprising to me is that if I read 5000 files sequentially and matching them in parallel as follows

@elapsed map(enumerate(Iterators.partition(files, 5000))) do (chunkid, chunk_files)
	buffers = map(read, chunk_files)

	Folds.map(buffers) do buf
		js = JSON3.read(IOBuffer(transcode(LZ4FrameDecompressor, buf)), Dict{String,Any})
	        match(rs, js)
	end
end

the execution takes 540s, which is 10% faster. Nice, may-be, there are some delays caused by IO locks (but I have thought that this is resolved now).

But what is even more confusing, is that if I put all buffers into one large array and partition that array in threads (which makes the code ugly)

@elapsed map(enumerate(Iterators.partition(files, 5000))) do (chunkid, chunk_files)
	index = DataFrame(key=String[], chunk=Int[], offset=Int[], size=Int[])
	buffer = Vector{UInt8}()
	for f in chunk_files
		compressed = read(f)
		push!(index, (f, chunkid, length(buffer), length(compressed)))
		append!(buffer, compressed)
	end

	jsons = Folds.map(eachrow(df)) do row
        buf = buffer[row.offset+1:row.offset+row.size]
        js = JSON3.read(IOBuffer(transcode(LZ4FrameDecompressor, buf)), Dict{String,Any})
        match(rs, js)
    end
end

the execution time is 460s. Vow. So doing more work is faster then doing less. What? I apologize that I cannot provide MWE, but without those file, it might not make sense. I am primarily interested if there is some known phenomenon that I am not aware of.

Thanks for help in advance.

2 Likes

I’m betting you get better cache performance In the array version

That was something I was considering. But I am puzzled that the continuous buffer is used only once anyway, therefore I would expect that putting everything to a buffer would be more expensive.

gc overhead may also be different. Can you re-run it with @timev instead of @elapsed?

I see, I will do it probably over the night.

If you have a lot of allocations like this, process-based parallelism like pmap from Distributed may be a better choice. There’s also Folds.map(f, files, DistributedEx()).

2 Likes

I have run the default scenario with @timev as follows

julia> @timev @elapsed Folds.map(files) do f
	js = JSON3.read(IOBuffer(transcode(LZ4FrameDecompressor, read(f))), Dict{String,Any})
    match(rs, js)
end

1242.671792 seconds (16.05 G allocations: 1.097 TiB, 86.79% gc time, 0.01% compilation time)
elapsed time (ns): 1242671791700
gc time (ns):      1078505614949
bytes allocated:   1206642422253
pool allocs:       16042098600
non-pool GC allocs:3753066
malloc() calls:    103111
realloc() calls:   327384
GC pauses:         73
full collections:  2

and if I read the results right, the time is dominated by GarbageCollector. I will try TKF suggestion to use parallel processes instead of MultiThreadding.

This is very interesting. I would’ve thought that reading from disk is the culprit. In your original easy to read code this is what happens

Thread 1: read from disk -------> process using match–wait—>read from disk ----- wait ----->
Thread 2: ----waiting for disk → read from disk ----- wait -----> process using match

Because reading from disk is pretty serial unless you have a top SSD.

So it’s always better to read as much as you can from disk and process it so there’s less waiting time.

yeah same issue as described in:

probably

That might be the culprit.

Thanks for all suggestions. My general experimence is that GC is frequently slowing down threads.