Basic examples of Tasks/Channels? (or more verbose documentation?)

Hi,

I’m going through the section in the manual about Tasks, but I find that it is not very thorough so I’m not getting the full picture of how they work (at least for the moment).

The first paragraphs about using a Channel is OK, but then the documentation talks about binding multiple channels to a task or vice-versa, about more advanced work distribution patterns, and other low-level functions, like: yieldto(), current_task(), istaskdone(), etc.

But no examples or further references are given, so I’m a bit at a loss. Any suggestion on how to deepen my understanding of Tasks with Julia?

Thanks,
Ángel de Vicente

3 Likes

Are you familiar with the concept of coroutines at the first place?

1 Like

yes, the concept of coroutines is not a problem. I was just hoping to get more details/examples about the actual usage in Julia.

It’s more or less the same as in other languages. One of the simplest examples is using @sync/@async for non-blocking IO. Let’s say, you want to download 10 web pages. If you do it in a simple blocking loop:

urls = ["https://discourse.julialang.org/" for i=1:10]
results = Vector(10)
@time for (i, url) in enumerate(urls)
    results[i] = Requests.get(url)
end

it will take quite a lot of time (12 seconds for me). Most of the time Julia does nothing but waits for network. Fortunately, network IO in Julia is non-blocking, so you can use task machinery to speed things up:

@time @sync for (i, url) in enumerate(urls)
    @async results[i] = Requests.get(url)
end

which took only 2.5 seconds for me. In this example I run each request in a separate task using @async and then wait for the result of all of them using @sync. When one of these tasks encounters IO operation, it gives away control so that other tasks could use CPU. When IO operation is finished, the task is resumed.

TCPServer from Networking docs is another good example: even on a single CPU core you can have one task for accepting new connections and others for serving each individual connection.

But coroutines/tasks are not only about networking. A classic example which also uses channels is a producer-consumer pattern. Something like this

function producer()
    ch = Channel{Any}(1)
    @async begin
        for i=1:100 
            println("Generating a new item")     
            put!(ch, i)
        end
        close(ch)
    end
    return ch
end

ch = producer()
take!(ch)
take!(ch)
...

(if you want something more realistic than generating numbers from 1 to 100, take a look for example at load_stream from Spark.jl).

Or think about metrics collector where you have multiple tasks (e.g. handling TCP requests from users), but want a single “tube” to collect request count, latency or whatever. You can create a metrics channel and a single task that reads from this channel and writes them in bunches to disk:

metrics = Channel(1024)
@async begin
    buffer = Vector(1024 * 10)
    i = 1
    for item in metrics
        buffer[i] = item
        i += 1
        if i > length(buffer)
            println("buffer is full, writing to disk")            
            i = 1
        end
    end
end

# from any task, not just main
put!(metrics, 100)
put!(metrics, 200)
for i=1:100_000 
    put!(metrics, 92) 
end

Of course, you are not limited to a single consumer: instead, you can run N @async blocks each writing to a separate file. Tasks are also not limited to writing to a single channel - in fact, you can have any number of tasks communicating via any number of channels.

8 Likes

What is the relation between generators and tasks?

AFAIR, Julia got channels first, then generators as a separate construct and then there was some plan to connect the two. I see that there is a yield function for channels. Does it play the same role than the Python yield command? If not, what is the analog of this Python command?

As far as I can tell from the code, generators and tasks are unrelated.

Do you mean for tasks? It has a pretty good docstring:

  yield()

  Switch to the scheduler to allow another scheduled task to run. A task that
  calls this function is still runnable, and will be restarted immediately if
  there are no other runnable tasks.

  yield(t::Task, arg = nothing)

  A fast, unfair-scheduling version of schedule(t, arg); yield() which
  immediately yields to t before calling the scheduler.

But it’s pretty low-level, I don’t think I’ve ever used it for real code. In most cases @sync, @async and @task are enough.

Hi,
thanks for the examples.

As for the first one (producer), is there any difference between your code and this?

function producer(c::Channel)
    for n=1:10
        println("Generating a new item a")
        put!(c, n)
    end
end
cha = Channel(producer)

As for the metrics example, being a MPI guy, I understand it as follows:
the line

for item in metrics

is basically like a MPI_RECV call, that it would block the code until a matching MPI_SEND was issued, but since you wrap it around @async, then the code will not block and will continue with other tasks until somebody puts something in the ‘metrics’ channel.

The lines

put!(metrics,xx)

are actually blocking, but since the previous block is waiting all the time for new items in the ‘metrics’ channel then they return as soon as the value is put in the channel.

From the documentation I’m still stuck at this point:

Multiple channels can be bound to a task, and vice-versa.

No issues with the meaning “Multiple tasks can be bound to a channel”, since either from the point of view of a consumer, I understand that several tasks can be consuming from a given channel, and also that several producers are putting values in the channel (as illustrated in your ‘metrics’ example).

But what does “Multiple channels can be bound to a task” mean? That a producer can put simultaneously items to several channels? Or that a consumer can be waiting for items in several channels? Any examples of this?

Many thanks,
AdV

In your code everything runs in the same task and thus is serial. No consumer will be able to read from the channel before producer finishes it’s job. If channel’s buffer isn’t big enough to keep all that producer creates, execution will just hang on.

On the other hand, with 2 tasks producer will submit new items to the channel until it can. When channel’s capacity is reached, Julia will switch to another task, e.g. consumer. When consumer reads everything available, Julia will switch tasks again, giving control back to producer.

but since you wrap it around @async, then the code will not block and will continue with other tasks until somebody puts something in the ‘metrics’ channel.

The current task will not block. Instead, a new task will be created, and this new task will be blocked until somebody writes to the channel.

The lines put!(metrics,xx) are actually blocking

It depends. If channel’s buffer is full, then yes, put! will wait until somebody reads from the channel. Otherwise put! adds a new item to the channel and proceeds.

That a producer can put simultaneously items to several channels?

Not simultaneously, but one after another:

subscribers = Channel[]

function send_news_to_subscribers()
    for news in news_source
        for subscriber_channel in subscribers
            put!(subscriber_channel, news)
        end
    end
end

@async send_news_to_subscribers()

push!(subscribers, alice_channel)
push!(subscribers, bob_channel)
...

In this code a new task (created with @async) sends news to all subscribed channels. Alternatively, it could send them to only one random channel essentially making a work distribution process (note, though, that all tasks will still run on the same CPU; if you really want to distribute work, consider multiple processes and RemoteChannel).

Or that a consumer can be waiting for items in several channels?

No, as far as I’m aware, but there’s a julep for something similar.

Hi @dfdx, and many thanks for your time on this.

I want to move on with the Julia manual so I did try a few things w.r.t tasks in order to be confident that I more or less understand coroutines in Julia.

In your code everything runs in the same task and thus is serial. No consumer will be able to read from the channel before producer finishes it’s job. If channel’s buffer isn’t big enough to keep all that producer creates, execution will just hang on.

I don’t think this is right? My example was basically a modification of the code in https://docs.julialang.org/en/latest/manual/control-flow/#man-tasks-1, for producer/consumer stuff, and the consumer reads fine from the unbuffered channel.

(note, though, that all tasks will still run on the same CPU; if you really want to distribute work, consider multiple processes and RemoteChannel).

yes, I understand this. Next step will be RemoteChannel, but I wanted to grasp this before moving on.

What comes below is perhaps going to be a bit too verbose and too basic if familiar with tasks, but since I was asking for more examples, perhaps this is useful to others (myself in the future included). I’m sure things can be written better/more elegantly, so any suggestions/comments are welcome.

  • First example, creating tasks “manually”. Two functions are defined as tasks e1t1 and e2t2 so that when they reach the “sleep” call they will yield to the scheduler and thus we can do progress in both at the same time. And we can synchronize with istaskdone. Execute by calling e1_t()
function e1_t()
    e1t1 = Task(e1_t1)
    e1t2 = Task(e1_t2)
    schedule(e1t1)
    schedule(e1t2)

    while istaskdone(e1t1) == false || istaskdone(e1t2) == false
       sleep(1)
    end
    println("All finished")
end

function e1_t1()
    for i=1:5
        println("Task 1: $i")
        sleep(2)
    end
end

function e1_t2()
    for i=1:5
        println("Task 2: $i")
        sleep(5)
    end
end
  • Second example, the example given by @dfdx to grab URLs asynchronously. I want to understand what @sync and @async do, so I give here the example with those macros, and then my version creating the tasks manually.
# With macros                                                                                                                                                                                          
function g_macros()
    urls = ["https://discourse.julialang.org/" for i=1:10]
    results = Vector(10)
    @sync for (i, url) in enumerate(urls)
        @async results[i] = Requests.get(url)
    end
    results
end


# The same idea, but manually creating the tasks. Execute by calling g()
function get(url,i,results)
    results[i] = Requests.get(url)
end

function g()
    urls = ["https://discourse.julialang.org/" for i=1:10]
    tasks = Vector(10)
    results = Vector(10)
    for (i,url) in enumerate(urls)
        tasks[i] = schedule(Task(()->get(url,i,results)))
    end
    while any(map(!istaskdone,tasks))
        sleep(0.01)
    end
    results
end
  • Third example. Consumer/producer stuff.
# Execute by calling myconsumer(), who gets items produced by myproducer as soon as they are ready
function myproducer(c::Channel)
    for n=1:10
        println("Generating a new item")
        put!(c, n)
    end
end

function myconsumer()
    cha = Channel(myproducer)
    results = Vector(10)
    ind = 1
    for i in cha
        results[ind]=i
        ind += 1
    end
    results
end
# But just in case, the task doesn't have to be the producer, it could be the consumer as well.
# Execute by calling myproducer2(), which will put items in the channel, consumed as available
# by the task myconsumer2
                                                                                                                                    
function myproducer2()
    cha = Channel(myconsumer2)
    for n=1:10
        put!(cha,n)
    end
end

function myconsumer2(c::Channel)
    for i in c
        println("Got $i")
    end
end
  • Fourth example. Bind multiple tasks to a channel. Not sure if this is what is meant in the manual, but having several tasks listening on a channel we can write master-worker style programs, where the master gives several tasks and the several workers process them as soon as they are idle. In this case I have only two workers (tasks w1 and w2), and we would execute this by calling master()
# For master-workers type problems, where I want several workers to work on the tasks given by the master
                                                                                                             
function master()
    cha = Channel(0)
    w1 = Task(()->worker(1,cha)) ; w2 = Task(()->worker(10,cha))
    schedule(w1)             ; schedule(w2)

    for n=1:20
        put!(cha,n)
    end
    close(cha)

    while istaskdone(w1) == false || istaskdone(w2) == false
        sleep(0.01)
    end
    println("All finished")
end

function worker(sleep_s,c::Channel)
    for i in c
        sleep(sleep_s)
        println("Got $i and slept $sleep_s")
    end
end
  • Fifth example. Bind multiple channels to a task. In this example a producer (could be many producers) generates values and are put at random in different channels (either cha1 or cha2). The consumer waits for a value in either channel and processes it.
# Output to different channels at random, which are consumed by the consum as needed. 
# Execute by calling consum()                                                                                                                                   
function produc(cha1::Channel,cha2::Channel)
    for i=1:20
        if bitrand()[1] == true
            put!(cha1,rand()*3)
        else
            put!(cha2,rand()*3)
        end
    end

    close(cha1)
    close(cha2)
end

function consum()
    cha1 = Channel(0)
    cha2 = Channel(0)

    pr = Task(()->produc(cha1,cha2))
    schedule(pr)

    while isopen(cha1) || isopen(cha2)
        sleep(0.01)
        if isready(cha1)
            v = take!(cha1)
            println("Got $v from cha1" ) ; sleep(v)
        end

        if isready(cha2)
            v = take!(cha2)
            println("Got $v from cha2") ; sleep(v)
        end
    end
end

2 Likes

Ah, right. I read producer(Channel()) instead of Channel(producer). As documentation states, Channel(function()) indeed creates a new task and bounds it to the channel.

There’s a number of repeated patterns in your code that already have more convenient wrappers. For example, this:

w1 = Task(()->worker(1,cha))

can be simplified to:

w1 = @task worker(1,cha)

If in doubt, you can verify it using macroexpand(:(w1 = @task worker(1,cha))).
You then schedule the task:

w1 = @task worker(1, cha)
schedule(w1)

which is equivalent to:

w1 = @schedule worker(1, cha)

I also don’t recommend using sleep() and istaskdone() for waiting on multiple tasks - instead of aligning checks to sleep intervals you can create a channel to wait for workers. Something like this:

function worker(params, i_am_done_chnl::Channel)
    # do stuff
    put!(i_am_done_chnl, true)
end

function master()
    # create a channel through which workers will notify master that they are done
    i_am_done = Channel(2)
    # schedule 2 tasks
    @schedule worker(params1, i_am_done)
    @schedule worker(params2, i_am_done)
    for i=1:2   
        # one iteration of the loop corresponds to one finished task
        take!(I_am_done) 
    end   
end

Not surprisingly, Julia provides a shortcut for this use case too: @async schedules a task and adds to a set of active tasks; @sync suspends the main task until all items in this set are done:

function worker(params)
    # do stuff
end

@sync begin
    @async worker(params1)
    @async worker(params2)
end
3 Likes

@dfdx Many thanks for the comments. I was aware of @task, @schedule, @sync and @async but I’m still in chapter 12 of the manual, didn’t get to macros yet, so I was avoiding them :slight_smile: But I will use them when/if I write code using tasks.