How to replace consume and produce with channels

I got a deprecation warning while trying to upgrade to Julia v0.6, saying that the consume() and produce() functions are not available anymore, and that I should use channels.

The usage of channels is not clear to me yet. Would it be possible to provide a toy example of how instead of consuming or producing a task I could acquire the same result using channels?

1 Like

How do you use produce() and consume() currently?

The story behind it is rather complex, I define a task which runs an MCMC iteration, which when consumed and returns an MCMC state when produced. The main question I have in mind, which I didn’t manage to answer by looking into the documentation on channels, is how I would translate a simple example, in which I define a Task and then consume or produce it with the new Channel type (if not mistaken tasks where removed altogether from the language?). Thank you for your help.

Tasks aren’t deprecated, only produce() and consume() functions are. Tasks in Julia implement a very powerful idea of coroutines, making it possible to run multiple subroutines concurrently (although not necessarily in parallel).

produce() and consume() are a way to communicate between 2 tasks. Channels are a way to communicate between any number of tasks.

For example, we can create a channel with capacity 1 so that it could keep at most 1 element.

chnl = Channel{Int}(1)
# ==> Channel{Int64}(sz_max:1,sz_curr:0)

and run a separate task to fill this channel with numbers from 1 to 10:

@async for i=1:10 put!(chnl, i) end
# ==> Task (runnable) @0x00007fe67d1aa770

Local channels like this hold data in as an array that we can inspect:

chnl.data
# ==> 1-element Array{Int64,1}:
# ==>  1

Our additional task started running, put single element to the channel and stopped (switching control flow to another task) because channel is full. Now let’s take an element from the channel:

take!(chnl)
# ==> 1
chnl.data
# ==> 1-element Array{Int64,1}:
# ==> 2

As expected, take!() returns first element from the channel. What’s more interesting is that our additional task immediately fills the channel with the next element. And stops again.

Channels also expose iterator interface, so we can read all the data using a loop:

for x in chnl println(x) end
# ==> 2
# ==> 3
# ==> 4
# ==> 5
# ==> 6
# ==> 7
# ==> 8
# ==> 9
# ==> 10

Note that the previous code will hang on because channel is still open, so the loop expects new elements, but nobody adds them. To fix it, we can modify additional task as follows:

@async begin for i=1:10 put!(chnl, i) end; close(chnl) end
# ==> Task (runnable) @0x00007fe67c843490
for x in chnl println(x) end
# ==> 1
# ==> 2
# ==> 3
# ==> 4
# ==> 5
# ==> 6
# ==> 7
# ==> 8
# ==> 9
# ==> 10
# ==> returned control

Of course, you can specify larger capacity for channels to avoid to frequent switching between tasks.

To summarize:

  1. Create a channel available for both - producer and consumer tasks.
  2. Use put!(chnl, x) instead of produce() and take!(chnl) instead of consume(), or just iterate over the channel.
  3. Call close(chnl) to finish.
8 Likes

Thank you dfdx, I will try to use your mini tutorial and will let you know if I get stuck.

I have three questions dfdx.

The first one is whether (as I can see from your explanations above) this means that I don’t need to actually need to create a task myself. I simply create a channel, is this correct?

How do I now call older functions such as task_local_storage or task.storage?

The third and last question is what is the link between channels and tasks internally? I was looking at the source code in Julia’s base (task.jl and channels.jl), and I noticed that Channel has fields Channel.takers and Channel.putters, which are of type Vector{Task}. Can I operate on these fields directly if I want to use functions such as task_local_storage or there is an interface for this?

The produce-consume pattern implies that you have at least 2 tasks anyway - one for producer and another for consumer. In the example above I created a producer task using @async and a consumer runs in the main task (note that any code runs in some task, e.g. try to run current_task() from the REPL). Channels are simple means to pass data between these 2 tasks, but all in all they are orthogonal to tasks: you can use tasks without channels or channels without tasks.

How do I now call older functions such as task_local_storage or task.storage

It depends on how you used them previously. All in all, task’s local storage has nothing to do with channels or produce-consume pattern, and task_local_storage() should not be affected in any way.

The third and last question is what is the link between channels and tasks internally? […] Can I operate on these fields directly if I want to use functions such as task_local_storage or there is an interface for this?

I wouldn’t recommend using these fields directly. As any internal stuff, they aren’t the subject of public API and thus may change at any time. Whether there’s a public interface for it or not depends on your actual task.

I read the documentation of Julia on channels and on tasks, although the latter is very short. I find it difficult to use these concepts given how succinct the documentation is.

Let’s make it concrete. Let’s say that I started Julia with two workers. I have a function f(x, y) = x+y and a function g(x, y) = x-y. How do I define and run a task explicitly on worker 1 to calculate function f, define and run a task on worker 2 to calculate g, then pass the output of f from worker 1 to worker 2 and the output of g from worker 2 to worker 1 (essentially exchange the output of f and g between workers 1 and 2)?

Is it possible to do this in a way that I have actual control on the choice of worker on which each task runs and on the pairs of workers that exchange info between two tasks running on different workers? I imagine I need a RemoteChannel for this - if this simple example could be shown, it might start making sense how to make use of channels and tasks.

You are mixing up several unrelated concepts. Let’s start from the beginning.


1. Tasks are coroutines - generalization of subroutines with multiple entry points. Here’s an example:

# Pkg.add("Requests")
using Requests

url = "https://julialang.org"

# running in a single task
@time for i=1:100
    Requests.get(url)
end 
# ==> 38.621294 seconds

 # running in 101 tasks
@time @sync for i=1:100
    @async Requests.get(url)
end 
# ==> 1.062761 seconds

In the first example, 100 requests are made one after another within a single task. Each iteration has to wait for the previous to finish, so they run strictly linearly.

In the second example, we crate a separate task for each iteration, so they all run concurrently, i.e. on each iteration we create and start a new task, it runs until it meets blocking IO operation (requesting a remote URL) at which point Julia switches to the next task. When data for a particular task is available, Julia switches to that task again. This way Julia waits for 100 HTTP requests at the same time instead of waiting for them one after another.

Things to notice:

  • the same piece of code - Requests.get(url) - is started and suspended at least twice: at the beginning and after the data is available; this would be impossible if we were using just function calls (like we do in the first example);
  • all the code is running in the same process (and most likely on the same CPU core, but it’s another story) and has access to the same memory (global variables)

2. Processes are for parallelism. Examples with URLs are IO-bound, not CPU-bound, so they all can run on the same CPU core without any significant performance loss. On other hand, if you run CPU-intensive code (e.g. MCMC simulations), you may want to split the load between several CPUs or even several machines in the cluster. In Julia parlance, you need to create multiple processes (a.k.a. workers). There’s just too much of it to describe it here, but Julia already has pretty thorough docs on parallel computing.

Note how processes and tasks are orthogonal - depending on your needs, you can use only tasks, only processes, both of them or even non of them (in the later case you would run in a single process and single task).


3. Channels are for communication. Communication between what? It depends on the type of channel.

Channel{T} is used for communication between tasks. In fact, Channel is just an buffer (i.e. array) exposing a FIFO pattern - you put data into it in one task and take it in another task (or the same task, why not). You don’t have to use Channel to communicate between tasks, you could just use global variables or whatever, but channels are convenient (as any FIFO).

RemoteChannel{T} is used for communication between processes. RemoteChannel exposes the same FIFO pattern, but unlike ordinary Channel (which is a local buffer), it’s a handle to a remote buffer. You may also think of remote channels as message queues, just like RabbitMQ or queues in Redis. And again, you don’t have to use remote channels to talk between processes, you can as well use remotecall and fetch or @spawn, or even just @parallel with reducing.

So you have a number of tools, but you need to know what you want to achieve (run code concurrently or in parallel, run multiple iterations or make just a single long-running call, etc.) to select the right one.

This sounds pretty unnatural - normally you delegate part of the work to another process and read the result, not exchange results between 2 workers - but ok, let’s go this way. Assuming you have started Julia with -p 1 (i.e. 1 more worker in addition to the main, so 2 in total):

@everywhere f(x, y) = x+y
@everywhere g(x, y) = x-y

# I will use a single remote channel for reading and writing in both processes, 
# you may want to use 2 channels for better control
const q = RemoteChannel(() -> Channel(1))

# on process 2, run g(2,3) and put the result to the channel
@spawnat 2 begin r = g(2,3); put!(q, r) end

# on process 1, read the result from the channel
@spawnat 1 println(take!(q))
# ==> -1

# on process 1, run f(2,3) and put the result to the channel
@spawnat 1 begin r = f(2,3); put!(q, r) end

# on process 2, read the result from the channel
@spawnat 2 println(take!(q))
# ==> From worker 2:	5

Note that this is relatively low-level interface, I highly recommend reading through docs on parallel computing for more high-level tools.

Also note how in this code I haven’t used any Tasks. To emphasize it once again: coroutines (tasks) and parallel computing (processes / workers) are 2 distinct features.

7 Likes

The third point and example, which demonstrates how channels can be used at a low level to communicate between processes is very helpful, thank you Andrei. I agree that it is odd to swap states between 2 processes, but it is sth that happens when one wants to do parallel MCMC.

would you please give me idea what produce and consume does ? Thinking I am dumb about this.
And would you please give me small simple code that demonstrate idea of these two and how those depretiated package are replaced and please would you give me new how can i do that.

I am solving fibonacci series and got these function which is depretiated now

It may be easier to take your concrete fibinacci code and rewrite it to channels.

How can i make exactly same function as comsume and produce? Although these are depretiated now

produce() and consume() have been removed from the language years ago, so there are not many people who still remember the API :slight_smile: If I recall correctly, these functions can’t be directly translated to newer versions of the language, but some approximation using channels can be found in the docs.

2 Likes

Thank you . I think i have to look new material