Any small example of a queue like system, where a function is automatically executed when a new item pops in the queue?

Hello!

I have some Julia code which detects when new files are made in a folder. The name of these files are then passed to a vector. Whenever a new item enters the vector, I wish to have a function detect this and run a function on the file.

I am looking for a small example of someone doing something like this, if you happen to know where to find it please let me know :slight_smile:

I have been looking a bit into DataStructures.jl and see they have a queue system, but I cannot see anything such as “execute function if item enters queue” etc.

Kind regards

Maybe you want a Channel and a Task that reads from it. Instead of, or in addition to, adding your filenames to a vector, put! them on a Channel.

https://docs.julialang.org/en/v1/manual/asynchronous-programming/

2 Likes

That’s a great idea!

I just converted the code to move from outputting file names into a Vector, to a Channel instead using @async put!. And then using take! I see that I am able to take an element from the Channel, work on it and then it is automatically removed as well, which is nice.

Now I just need to see if I can figure out the listener task to the Channel.

Kind regards

I think I am doing something wrong with @async put, since I read from documentation that put! is a blocking call to the Channel. I need to be able to put! without blocking and perhaps there is a more correct way to do this then?

For an explanation of why, it is entirely possible to imagine that a few files have been made, before finishing the work of post-processing one file.

But I want to thank you a lot, your suggestion made me succeed it seems:

┌ Info: File Created
└   event = "FILE.out" => FileEvent(true, false, false)
I HAVE TAKEN FILE.out

The first two lines (Info block) are made by the Filewatching code I was provided, while the last line is from the task listening in on the channel.

I marked your answer as solution - of course I am open for other ideas and approaches too. :slight_smile:

Kind regards

Construct your Channel with a size greater than zero, .ie., the maximum size of the queue. Tasks · The Julia Language. If you find you fill the channel and block, then start more reader tasks.

1 Like

Thanks! I used “Inf” since it was the most straightforward for me. I could work out a max size or a max realistic size, but the performance improvement by doing so is not worth it to me. Now it works without async as I suspected was a bad way of doing it :slight_smile:

Looking to improve my “ListenerFunction” a bit, initially I just copied:

#https://docs.julialang.org/en/v1/manual/asynchronous-programming/#More-on-Channels
# Given Channels c1 and c2,
c1 = Channel(32)
c2 = Channel(32)

# and a function `foo` which reads items from c1, processes the item read
# and writes a result to c2,
function foo()
    while true
        data = take!(c1)
        [...]               # process data
        put!(c2, result)    # write out result
    end
end

# we can schedule `n` instances of `foo` to be active concurrently.
for _ in 1:n
    errormonitor(@async foo())
end

Which shows how one can continually take! into data, which works perfectly fine. It just uses a “while true”, which I am very uncomfortable with (perhaps wrongly), because I’ve always been taught to not program like that.

I know I could get around this by using a timed loop, i.e. each x second take! a new element, but I would much rather be able to check “if an element is present in the Channel, take it”. I tried to play around with isready(Channel), but couldn’t get it to work properly. Do you perhaps have any suggestions?

Kind regards

Yes I think wrongly, while true is a perfectly fine construct. If the loop must terminate it just needs to have a condition and call break. This is a fine idiom.

3 Likes

This is simplified assuming you don’t generate results from data. Just block on take!.
Your consumer function should be running asynchronously and work in the background doing its one and only chore. You do need a try/catch to exit the loop gracefully.

Thanks to you and @dlakelan

That is what I ended up doing. I decided to make it so the listener does not close the channel, but instead when the simulation is finished it will close to channel. Then the listener will reach the catch block and exit gracefully.

Kind regards

Correct, the producer should close the channel.

1 Like

I’ve encountered a small problem, which you might have a suggestion on.

Now that I am working on my process function I encounter in some cases that if the file being written to the drive is too big (i.e. 1 gb etc.), then the listener function detects it “too early”, while it is still being written and thereby the process function fails, since it is working on a file which perhaps only has had 0.4 gb of its total 1gb written.

I’ve been looking for an “isopen” function to wait for it being “closed”, but struggling to find this. Any suggestions?

Kind regards

If you have control over the code that writes the files, then you could implement something like writing the file to a temporary name, and then renaming when complete, or write another small file that indicates the main file is complete. If you don’t have control, then detecting write completion is harder and OS specific. Search for the web for checking whether writing a file is complete. You may need a OS specific call, such as lsof on linux.

2 Likes

I think I ended up finding something better for me. Basically I use stat to measure the file size after x seconds, and compare meas1 and meas2. If they are equal then assume file is done writing.

Not perfect, but gets the job done, the other stuff was really difficult to grasp on Windows for me. It is also much better than a fixed sleep time, since now a fast simulation will have a slight delay, while a slow simulation with big files will have “no delay” due to the short sleep and checking of file size.

Thank you!

Kind regards