Using @async and/or Channels to handle incoming data and process it

I am trying to implement what i think is a pretty standard use case for using Tasks / CoRoutines, but I am struggling to adapt the examples in the docs to my specific use case.

Here’s an outline of what I want to do:

# event loop
while trigger

    # I want to run this loop to download some data
    # every 5 seconds.

    c1 = Channel(10)
    
    function get_data(urls::Array{String})
        for u in urls
            @async begin 
                data = my_download_function(u)
                put!(c1, data)
            end
        end
    end

    # run get_data every 5 seconds
    inline_timer(timer) = get_data(urls)
    t = Timer(il, 1, interval = 5)
    while trigger
        wait(t)
    end

    # here, i want functions to sequentially process data that
    # is being put to channel c1, but only when available.
    # i don't know how to do this so that it always takes a new 
    # value of c1.

    c2 = Channel(10)
    
    for c in c1 # this does not work, only iterates on values already in c1
        p1 = my_process_1(c)
        yield()
        p2 = my_process_2(p1)
        yield()
        p3 = my_process(p2)
        put!(c2, p3)
    end

    # again, i wantt this to work asyncronously
    for c in c2
        my_write_function(c)
    end
end

The idea is that the data is gathered regularly and processed, but that the network lag times are available for CPU time to process data. Classic coroutine example. But i’m not using Sockets or TCP ports like the doc examples that talk about this, and I’m really not sure what I’m missing.

A (I think related) question: I notice that many examples (like this one) use

while true
    # do stuff

This may seem like an obvious question, but what is this accomplishing when you wrap it up in an @async task ?

1 Like