Help implementing parallelism with readbytes!

I have to read some TBs of data and I want to read a 10-50GB chunk using readbytes! and process it. Since processing the chunk takes longer than reading it it would be a waste to every time wait for the readbytes!.

So I thought I must be able to do something like:

  • define two functions, read_chunk(..), process_chunk(..)
  • allocate two buffer arrays, b1 and b2
  • call read_chunk(b1)
  • when this is done, I call process_chunk(b1) and at the same time call read_chunk(b2)
  • then when process_chunk(b1) and read_chunk(b2) are done, call process_chunk(b2) and read_chunk(b1) (this would now just overwrite the old bytes)
  • etc till eof

I read this topic about channels but I’m not sure how I would implement a Channel for this. In Python I would start two threads and use is_alive() to check if they are still running and wait for two to finish, but cannot figure out how to implement something equivalent.

Should I create two channels to differentiate the read and process? Any advice on how to implement this is greatly appreciated :slight_smile:

Some code I came up with, but I don’t think this is safe: starting to read while I don’t check if it’s not being processed in the other channel (actually no clue how to do this)

read_channel = Channel{Symbol}(1)
proc_channel = Channel{Tuple{Symbol, Int64}}(1)

function read_chunk(h::IOStream, buffer::Vector{UInt8}, origin::Symbol, n_bytes::Int)
    readbytes!(h, buffer, n_bytes)
    put!(read_channel, origin)
end

function process_chunk(buffer::Vector{UInt8}, origin::Symbol)
    sleep(2) # work
    put!(proc_channel, (origin, length(buffer)) )
end

function main(f::String)
    h = open(f, "r")
    chunk_size = 1_000
    b1 = zeros(UInt8, chunk_size)
    b2 = zeros(UInt8, chunk_size)
    # Read the initial chunk
    read_chunk(h, b1, :b1, chunk_size)

    while true
        # Wait for first read to finish 
        if isready(read_channel)
            # Pop it from the read channel
            origin_chunk = take!(read_channel)
            # Send it for processing in the channel
            # NOTE, have to guarantee here somehow that I do not start reading 
            # b2 for example when it's still being processed
            if origin_chunk == :b1
                process_chunk(b1, :b1)
                read_chunk(h, b2, :b2, chunk_size)
            else 
                process_chunk(b2, :b2)
                read_chunk(h, b1, :b1, chunk_size)
            end
        end

        if isready(proc_channel)
            proc_result = take!(proc_channel)
            println("Process result: ", proc_result)
        end

    end
end

main("file.txt")

I would create a single channel first, and then have two functions, one which continuous writes to the channel (reading the bytes) and another which processes.

c=Channel{ByteDataType}(1) #replace with the actual type
function read_data!(c, f
    open(f, 'r') do file
        while !eof(file)
             data=readbytes!(file) # read a chunk
             put!(c, data)
        end
    end
end
function process_data!(c)
    while isopen(c)
        data=take!(c)
        # process the data
    end
end
processing_task = Threads.@spawn process_data!(c)
read_data!(c, "filename.dat") # can read on main thread
wait(processing_task)

Sorry for the " pseudocodeness", I am on my phone and can’t run anything now. I hope it is clear nonetheless.

1 Like

Thanks! If I understand you correctly you actually refer to readbytes here, instead of readbytes!. Then the readbytes will have to allocate the UInt8 array (data) for every read. With 10GB data this will cause a huge overhead compared to filling existing buffer arrays using readbytes!

You can pass a reference to a buffer if you want, something like:

c=Channel{Vector{UInt8}}(1) #replace with the actual type
function read_data!(c, f)
    # create two buffers so one can be written to and the other processed at the same time
    buffer1=zeros(UInt8, buffer_size)
    buffer2=similar(buffer1)
    buffer=buffer1
    open(f, 'r') do file
        while !eof(file)
             readbytes!(buffer, file) # read a chunk
             put!(c, buffer)
             buffer = (buffer==buffer1) ? buffer2 : buffer1 # switch ref to buffer
        end
    end
end
function process_data!(c)
    while isopen(c)
        data=take!(c)
        # process the data
    end
end
processing_task = Threads.@spawn process_data!(c)
read_data!(c, "filename.dat") # can read on main thread
wait(processing_task)
2 Likes

@jmair gave you a producer-consumer solution. You can learn about its components here

https://docs.julialang.org/en/v1/manual/asynchronous-programming/#Communicating-with-Channels

2 Likes

Aaah, amazing thanks a lot! Around 2,5X faster than waiting

Neat that it’s possible to spawn the thread to keep pushing to the Channel! I could apply this to some other code as well. Only saw a mention of combining threads and channels in multi-threading but not like this. Thanks for showing :slight_smile:


For others, some minor things need to be edited in the code, especially adding close(c) at the end of read_data as otherwise, it will never finish

1 Like

Hey @jmair!

I sometimes seem to get InvalidStateException("Channel is closed.", :closed). When I remove the wait() it does not happen, but not sure if this has consequences?

I actually did add another thread for processing, is this causing it? (like mentioned here) So

processing_task1 = Threads.@spawn process_data!(c)
processing_task2 = Threads.@spawn process_data!(c)
wait(processing_task1)
wait(processing_task2)

As processing takes around twice as long as reading

If you do for data in channel instead of take!, it should handle a closed channel gracefully.

2 Likes

Yeah that works :), thanks!