Reading and writing to file simultaneously

I’m streaming data from the web and writing to file in a random fashion every second or two, and opening and closing on each write.

If I want to sometimes read everything in the file up to this point, is that a problem?

That is, what if I try to read at the same time that a write is occurring, or what if I try to write at the same time a read is occurring?

Perhaps I should be using a proper database for this, eg MySQL, which takes care of these potential issues for me?

I’m open to any suggestions about how I could do this better.

Cheers,

Colin

1 Like

I wouldn’t open and close the file. Open the file once, use seek() to jump around to different locations in the file and read/write the data you need to.

If you are using threads then you need to synchronize access to the file, i.e. if thread 1 seeks to offset 1000 then writes 1k, and thread 2 at the same time seeks to offset 5000 to read 1k. The last seek “wins” so that is the offset you will at for both operations.

The two ways to protect this would either be using a ReentrantLock to protect all reads and writes something like:

function myread(file, protect, offset, bytes)
   lock(protect)
   seek(file, offset)
   data = read(file, bytes)
   unlock(protect)
   return data
end

function mywrite(file, protect, offset, data)
    lock(protect)
    seek(file, offset)
    write(file, data)
    unlock(protect)
end

Another option might be to use a task:

filechan = Channel{Func}(100)

@async begin
    global filechan
    file = open("foo.dat", "w+")
    for f in filechan
        f(file)
    end
    close(file)
end

fileop(f)  = put!(filechan, f)

# Then to do file operations would be:
fileop() do f
    seek(1000)
    read(f, 100)
end

The channel will ensure that all the file operations are serialized. While I kind of like this way of protecting file access you might run into trouble. Julia can get funny when a task tries to call functions created AFTER the task is started…so you might need to change f(file) to invokelatest(f, file) if you get “running in world age X, while current world is Y" errors.

One caveat here is I wrote this code without testing, so I can’t promise it to be error free. I seem to remember a maxim about all code can be reduce by 1 line and contains at least 1 error. :slight_smile:

3 Likes

Hi,

Thanks for responding. I’m working on this now, and reading up on some of the issues raised in your answer. I’m fairly sure the solution I want is definitely your first solution, that is, passing around the IOStream to both the read and write functions, and locking and unlocking before each read and write operation, to make sure both don’t occur simultaneously.

However, I’m fairly new to this area of programming. I put together a quick bit of test code to try and work out how this all works, and it seems to work as I expected. Would you mind having a quick look and letting me know if I’ve done it right?

function mywrite(fid::IOStream, l::ReentrantLock, x::String)
    lock(l)
    sleep(5)
    seekend(fid)
    i = write(fid, x)
    unlock(l)
    return i
end
function myread(fid::IOStream, l::ReentrantLock)
    lock(l)
    seekstart(fid)
    x = read(fid, String)
    unlock(l)
    return x
end
fid1 = open("test_file.csv", "w+")
l1 = ReentrantLock()
@async mywrite(fid1, l1, "helloworld")
sleep(1)
myread(fid1, l1)

The line myread returns helloworld, indicating that the read operation didn’t occur until after the write operation had finished, which is what I was hoping would happen. Does this all seem correct to you?

Thanks again,

Colin

Yep that looks like it would work. One thing I was thinking about after I posted was handling error conditions so this would be slightly better:

function mywrite(fid::IOStream, l::ReentrantLock, x::String)
    lock(l)
    local i
    try
        sleep(5)
        seekend(fid)
        i = write(fid, x)
    finally
        unlock(l)
    end
    return i
end
function myread(fid::IOStream, l::ReentrantLock)
    lock(l)
    local x
    try
        seekstart(fid)
        x = read(fid, String)
    finally
        unlock(l)
    end
    return x
end

Putting the unlock() in a finally block ensures that the lock is released even if the read/write throws an error.

Understood, that makes sense. Thanks again for helping - this is much better than the other approaches I was weighing up!

Colin