I have to read some TBs of data and I want to read a 10-50GB chunk using readbytes!
and process it. Since processing the chunk takes longer than reading it it would be a waste to every time wait for the readbytes!
.
So I thought I must be able to do something like:
- define two functions,
read_chunk(..)
,process_chunk(..)
- allocate two buffer arrays,
b1
andb2
- call
read_chunk(b1)
- when this is done, I call
process_chunk(b1)
and at the same time callread_chunk(b2)
- then when
process_chunk(b1)
andread_chunk(b2)
are done, callprocess_chunk(b2)
andread_chunk(b1)
(this would now just overwrite the old bytes) - etc till
eof
I read this topic about channels but I’m not sure how I would implement a Channel for this. In Python I would start two threads and use is_alive()
to check if they are still running and wait for two to finish, but cannot figure out how to implement something equivalent.
Should I create two channels to differentiate the read and process? Any advice on how to implement this is greatly appreciated
Some code I came up with, but I don’t think this is safe: starting to read while I don’t check if it’s not being processed in the other channel (actually no clue how to do this)
read_channel = Channel{Symbol}(1)
proc_channel = Channel{Tuple{Symbol, Int64}}(1)
function read_chunk(h::IOStream, buffer::Vector{UInt8}, origin::Symbol, n_bytes::Int)
readbytes!(h, buffer, n_bytes)
put!(read_channel, origin)
end
function process_chunk(buffer::Vector{UInt8}, origin::Symbol)
sleep(2) # work
put!(proc_channel, (origin, length(buffer)) )
end
function main(f::String)
h = open(f, "r")
chunk_size = 1_000
b1 = zeros(UInt8, chunk_size)
b2 = zeros(UInt8, chunk_size)
# Read the initial chunk
read_chunk(h, b1, :b1, chunk_size)
while true
# Wait for first read to finish
if isready(read_channel)
# Pop it from the read channel
origin_chunk = take!(read_channel)
# Send it for processing in the channel
# NOTE, have to guarantee here somehow that I do not start reading
# b2 for example when it's still being processed
if origin_chunk == :b1
process_chunk(b1, :b1)
read_chunk(h, b2, :b2, chunk_size)
else
process_chunk(b2, :b2)
read_chunk(h, b1, :b1, chunk_size)
end
end
if isready(proc_channel)
proc_result = take!(proc_channel)
println("Process result: ", proc_result)
end
end
end
main("file.txt")