Hey all! I wrote ViewReader a while ago and now thought adding some parallel file processing might be fun. ViewReader reads UInt8 (chars) to a buffer array so each thread could have its own buffer array. This might be neat for very big files where CPU time exceeds the IO.
Using a Channel
To do so, the first thing that came to my mind is using a Channel, put the lines in there and pull them with multiple working threads. Like so (simple example):
function dummy_work(line)
sleep(length(line)*0.00001)
end
function produce_lines!(line_channel::Channel{String}, f::String)
for line in eachline(f)
put!(line_channel, line)
end
put!(line_channel, "END")
end
function consume_chunks(line_channel::Channel{String})
processed_lines = 0
while isopen(line_channel)
for line in line_channel
line == "END" && close(line_channel)
processed_lines += 1
dummy_work(line)
end
end
println(Threads.threadid(), " processed: ", processed_lines)
end
function thread_test(f::String)
work_threads = Threads.nthreads() - 1
line_channel = Channel{String}(100)
Threads.@spawn produce_lines!(line_channel, f)
@sync for i in 1:work_threads
Threads.@spawn consume_chunks(line_channel)
end
end
function base_test(f::String)
for line in eachline(f)
dummy_work(line)
end
end
function dummy_data(f::String, n::Int)
h = open(f, "w")
for i in 1:n
println(h, "Some random text ", i)
end
println("Wrote ", n, " lines")
end
f = "test_data.txt"
dummy_data(f, 10_000)
@time thread_test(f)
@time base_test(f)
# 7.015079 seconds (245.41 k allocations: 11.381 MiB, 1.77% compilation time)
# 19.722287 seconds (54.30 k allocations: 1.686 MiB)
Using ConcurrentCollection.jl
using ConcurrentCollections
function dummy_work(line)
sleep(length(line)*0.00001)
end
function produce_numbers!(q, f::String, nrecv::Int)
for line in eachline(f)
push!(q, line)
end
# When we finish push terminators
for _ in 1:nrecv
push!(q, "END")
end
end
function run(f::String)
q = DualLinkedConcurrentRingQueue{String}()
nrecv = Threads.nthreads() - 1
@sync begin
Threads.@spawn produce_numbers!(q, f, nrecv)
for _ in 1:nrecv
t = Threads.@spawn begin
while true
line = popfirst!(q)
dummy_work(line)
line == "END" && break
end
end
end
end
end
function dummy_data(f::String, n::Int)
h = open(f, "w")
for i in 1:n
println(h, "Some random text ", i)
end
println("Wrote ", n, " lines")
end
f = "test_data.txt"
dummy_data(f, 10_000)
@time run(f) # 7.844786 seconds (980.21 k allocations: 50.513 MiB, 2.79% compilation tim
(P.s. start with julia --threads X
)
Some packages I came across
Doing some more reading I also came across things such as ResumableFunctions.jl but this does not seem to work for threading. Also saw ThreadPools.jl, a cool idea with the queues and that seemed like a convenient solution, however, I suppose it needs to know the length of the input beforehand so it will first read all lines before starting the threads (at least for @qbthreads
that I tried).
Question
Maybe doing this with channels is already the way with the least overhead, but curious to know if there are other packages/ideas that might perform better?
Also saw this thread that didn’t finish about parallel processing of lines so will tag it here to be findable Parallel Processing File
EDIT: Some tests with the number of threads
I noticed that when the number of threads to pull from the channel is increased it significantly hampers performance (see below). This is not the case for `ConcurrentCollections.jl`, however it might flood memory easily when the number of threads cannot keep up with the IO.Channel (5 threads): 4.99
Channel (50 threads): 17.45
ConcurrentCollections (5 threads): 5.76
ConcurrentCollections (50 threads): 1.09
Ideally, I would find a hybrid approach between the above two, one where there is an upper amount to the buffer but also no performance degradation due to increased threads (even when that means idle threads)