How do I build a car's factory pipeline with RemoteChannel?

I’m currently playing with RemoteChannel, and trying to make a text book example of parallelism, that is, a pipeline. My toy example is a car factory with 3 stages clean_car, paint and pay_taxes.

I can build the primitives and make communication, but I don’t know how to call them inside a loop, in such a way that uses full parallelism. I was monitoring the system processes, and I see that each stages occurs at different CPU core, but they do not happen at the same time.
Here’s the example:

using Distributed
addprocs(4)

@everywhere function clean_car(x)
    println("Cleaning your $(x[2]) ...")
    sleep(1)
    new_x = "Your $(x[2]) is clean."
    println("[ Cleaning OK ]")
    return (new_x, x[1])
end

@everywhere function paint(x, color)
    println("Painting...")
    sleep(1)
    new_x = "$(x) Now is painted in $(color)."
    println("[ Painting OK ]")
    return new_x
end

@everywhere function pay_taxes(x)
    println("Paying taxes")
    sleep(1)
    new_x = "$(x) And Finally, it has all taxes paid."
    println("[Taxes OK]")
    return new_x
end


const ch1 = RemoteChannel(()->Channel{Tuple}(1), 1);
const ch2 = RemoteChannel(()->Channel{String}(1), 1);
const ch3 = RemoteChannel(()->Channel{String}(1), 1);
cars = [("Red", "Ferrari"), ("Black", "Mustang"), ("Golden", "Lamborghini")]


############################### MY EXPECTED BEHAVIOR #################################

# first time : clear first car
@spawnat 2 begin car_cleaned = clean_car( cars[1]);         put!(ch1, car_cleaned) end;


# second time : paint first car and clean second car
@spawnat 3 begin car_painted = paint(     take!(ch1)...  ); put!(ch2, car_painted) end;
@spawnat 2 begin car_cleaned = clean_car( cars[2]);         put!(ch1, car_cleaned) end;

# third time: pay taxes of first car, paint the second car, clean the third car
@spawnat 4 begin car_ready   = pay_taxes( take!(ch2));        put!(ch3, car_ready) end;
first_car = take!(ch3)
@spawnat 3 begin car_painted = paint(     take!(ch1)...  ); put!(ch2, car_painted) end;
@spawnat 2 begin car_cleaned = clean_car( cars[3]);         put!(ch1, car_cleaned) end;


# fouth time: pay taxes of second car, paint third car
@spawnat 4 begin car_ready   = pay_taxes( take!(ch2));        put!(ch3, car_ready) end;
second_car = take!(ch3)
@spawnat 3 begin car_painted = paint(     take!(ch1)...  ); put!(ch2, car_painted) end;

# fith time: pay taxes of the third car
@spawnat 4 begin car_ready   = pay_taxes( take!(ch2));        put!(ch3, car_ready) end;
third_car = take!(ch3)


##################################### MY LOOP ATEMPT ######################################

function order_a_car(car_and_color)
    @spawnat 2 begin car_cleaned = clean_car(car_and_color);    put!(ch1, car_cleaned) end;
    @spawnat 3 begin car_painted = paint(     take!(ch1)...  ); put!(ch2, car_painted) end;
    @spawnat 4 begin car_ready   = pay_taxes( take!(ch2));      put!(ch3, car_ready) end;
    return nothing
end

@async for car_and_color ∈  cars
    order_a_car(car_and_color )
end

my_cars = []
@async for _ in 1:length(cars)
    push!(my_cars,  take!(ch3))
end
println(my_cars)

To see that each stage goes to different Cores of the CPU, I created some random math operation inside each function just to force some changes in the system monitor. Because the previous code is already long, I removed them and put a sleep(1).

I would expect that just putting @async in the for you solve it, but was not the case. How to should I proceed ?

You should really setup a pipe and stages to process the items in it.
You could try:

using Distributed
addprocs(4)

@everywhere using Printf
@everywhere function stage(prc::String, input::RemoteChannel, output::RemoteChannel)
    while true
        car = take!(input)
        println(@sprintf("%s the %s %s", prc, car[1], car[2]))
        sleep(2rand())
        println(@sprintf("finished %s", prc))
        put!(output, car)
    end
end

pipe = [RemoteChannel(()->Channel(1)) for _ in 1:4]
factory = ["cleaning", "painting", "paying"]
for i in 2:4
    @spawnat i stage(factory[i-1], pipe[i-1], pipe[i])
end

put!(pipe[1], ("red", "Ferrari"))

The stage function runs in a loop, taking a car from the input channel, working on it and putting it into the output channel.

Then you setup a pipe, the stages of your factory and spawn your stage function to the workers.

When you put the red Ferrari into the pipe, you get:

From worker 2:    cleaning the red Ferrari
From worker 2:    finished cleaning
From worker 3:    painting the red Ferrari
From worker 3:    finished painting
From worker 4:    paying the red Ferrari
From worker 4:    finished paying

Try to put in other cars. Nice business, isn’t it? :slightly_smiling_face:

Thank you a lot :love_you_gesture:t2: :ok_hand:t2: :fist_right:t2: :fist_left:t2:.
Alone, I would take a huge time thinking the solution.

For completeness, here the necessary changes in your code calls for external functions

  • First, I changed the function paint to contain only one input:
@everywhere function paint(x)
    x, color = x[1], x[2]
   ....
end
  • Second, prc input becomes a Symbol, because I will use create an Expr and eval
@everywhere function stage(prc::Symbol, input::RemoteChannel, output::RemoteChannel)
    while true
        car = take!(input)
        
        ex = Expr(:call, prc, car)
	    stage_output = eval(ex)
        
        put!(output, stage_output)
    end
end

pipe = [RemoteChannel(()->Channel(1)) for _ in 1:4]
factory = [:clean_car, :paint, :pay_taxes]
for i in 2:4
    @spawnat i stage(factory[i-1], pipe[i-1], pipe[i])
end

cars = [("Red", "Ferrari"), ("Black", "Mustang"), ("Golden", "Lamborghini")]

for car in cars
    @async put!(pipe[1], car)
end
 
result = []
for i=1:3
    push!(result, take!(pipe[4]))
end
println(result)
  • Third, I put all cars into pipe[1] using @async
  • Fourth, take all cars from pipe[4]
1 Like

Nice! Why not make prc a Function instead of a Symbol? This would be far more efficient and idiomatic. You could call it then as put!(output, prc(car)). Wouldn’t that be nicer too? :wink:

No special reason about Symbols :sweat_smile:.
I only made copy+paste from other code that I was experimenting with Expr, and I got lazy to write a new code using Functions.

Improvement never ends! Try it.