Parallelizing file processing with the least amount of overhead

Hey all! I wrote ViewReader a while ago and now thought adding some parallel file processing might be fun. ViewReader reads UInt8 (chars) to a buffer array so each thread could have its own buffer array. This might be neat for very big files where CPU time exceeds the IO.


Using a Channel
To do so, the first thing that came to my mind is using a Channel, put the lines in there and pull them with multiple working threads. Like so (simple example):

function dummy_work(line)
    sleep(length(line)*0.00001)
end

function produce_lines!(line_channel::Channel{String}, f::String)
    for line in eachline(f)
        put!(line_channel, line)
    end 
    put!(line_channel, "END")
end

function consume_chunks(line_channel::Channel{String})
    processed_lines = 0
    while isopen(line_channel)
        for line in line_channel
            line == "END" && close(line_channel)
            processed_lines += 1
            dummy_work(line)
        end 
    end
    println(Threads.threadid(), " processed: ", processed_lines)
end

function thread_test(f::String)
    work_threads = Threads.nthreads() - 1
    line_channel = Channel{String}(100)
    Threads.@spawn produce_lines!(line_channel, f)
    @sync for i in 1:work_threads
        Threads.@spawn consume_chunks(line_channel)
    end    
end

function base_test(f::String)
    for line in eachline(f)
        dummy_work(line)
    end

end

function dummy_data(f::String, n::Int)
    h = open(f, "w")
    for i in 1:n 
        println(h, "Some random text ", i)
    end
    println("Wrote ", n, " lines")

end

f = "test_data.txt"
dummy_data(f, 10_000)

@time thread_test(f)
@time base_test(f)

#   7.015079 seconds (245.41 k allocations: 11.381 MiB, 1.77% compilation time)
#  19.722287 seconds (54.30 k allocations: 1.686 MiB)

Using ConcurrentCollection.jl

using ConcurrentCollections

function dummy_work(line)
    sleep(length(line)*0.00001)
end


function produce_numbers!(q, f::String, nrecv::Int)
    for line in eachline(f)
        push!(q, line)
    end
    # When we finish push terminators 
    for _ in 1:nrecv
        push!(q, "END")
    end

end


function run(f::String)
    q = DualLinkedConcurrentRingQueue{String}()
    nrecv = Threads.nthreads() - 1 

    @sync begin
        
        Threads.@spawn produce_numbers!(q, f, nrecv)
        
        for _ in 1:nrecv
            t = Threads.@spawn begin
                while true
                    line = popfirst!(q)
                    dummy_work(line)
                    line == "END" && break
                end
            end
        end
    end
end



function dummy_data(f::String, n::Int)
    h = open(f, "w")
    for i in 1:n 
        println(h, "Some random text ", i)
    end
    println("Wrote ", n, " lines")

end

f = "test_data.txt"
dummy_data(f, 10_000)

@time run(f) # 7.844786 seconds (980.21 k allocations: 50.513 MiB, 2.79% compilation tim

(P.s. start with julia --threads X)

Some packages I came across
Doing some more reading I also came across things such as ResumableFunctions.jl but this does not seem to work for threading. Also saw ThreadPools.jl, a cool idea with the queues and that seemed like a convenient solution, however, I suppose it needs to know the length of the input beforehand so it will first read all lines before starting the threads (at least for @qbthreads that I tried).

Question
Maybe doing this with channels is already the way with the least overhead, but curious to know if there are other packages/ideas that might perform better?


Also saw this thread that didn’t finish about parallel processing of lines so will tag it here to be findable Parallel Processing File


EDIT: Some tests with the number of threads

I noticed that when the number of threads to pull from the channel is increased it significantly hampers performance (see below). This is not the case for `ConcurrentCollections.jl`, however it might flood memory easily when the number of threads cannot keep up with the IO.
Channel (5 threads):  4.99
Channel (50 threads):  17.45
ConcurrentCollections (5 threads): 5.76
ConcurrentCollections (50 threads): 1.09

Ideally, I would find a hybrid approach between the above two, one where there is an upper amount to the buffer but also no performance degradation due to increased threads (even when that means idle threads)

@tkf Hey! Just looking at ConcurrentCollections.jl. This seems more reliable when threads are increased. On Channels increasing the threads, for example from 5 to 50, can suddenly significantly degrade performance.

The DualLinkedConcurrentRingQueue seems unaffected by this (Thanks!). This, however, may flood memory when an inappropriate number of threads is chosen compared to the workload. In this case for example when the processing of lines takes long but reading the data is super fast.

Any tips on how to use them for this purpose?

Aah just realized tkf has not been around for a while, @yuyichao (sorry for the tag just thought of you), do you have an idea on this with your multi-threading experience?

Let’s consider that processing a line takes much longer than reading it, and that we cannot assume users will have NVMEs or SSD, so we can only use a single thread for reading.

I’m okay with the Channel but as I said above overloading the number of threads will suddenly degrade performance and I cannot really expect everyone to try to figure out the best number of threads to match their IO. ConcurrentCollections does not have that problem but it will flood memory as soon as there isn’t enough “CPU power” to keep up with reading