I’m currently playing with RemoteChannel,
and trying to make a text book example of parallelism, that is, a pipeline. My toy example is a car factory with 3 stages clean_car
, paint
and pay_taxes
.
I can build the primitives and make communication, but I don’t know how to call them inside a loop, in such a way that uses full parallelism. I was monitoring the system processes, and I see that each stages occurs at different CPU core, but they do not happen at the same time.
Here’s the example:
using Distributed
addprocs(4)
@everywhere function clean_car(x)
println("Cleaning your $(x[2]) ...")
sleep(1)
new_x = "Your $(x[2]) is clean."
println("[ Cleaning OK ]")
return (new_x, x[1])
end
@everywhere function paint(x, color)
println("Painting...")
sleep(1)
new_x = "$(x) Now is painted in $(color)."
println("[ Painting OK ]")
return new_x
end
@everywhere function pay_taxes(x)
println("Paying taxes")
sleep(1)
new_x = "$(x) And Finally, it has all taxes paid."
println("[Taxes OK]")
return new_x
end
const ch1 = RemoteChannel(()->Channel{Tuple}(1), 1);
const ch2 = RemoteChannel(()->Channel{String}(1), 1);
const ch3 = RemoteChannel(()->Channel{String}(1), 1);
cars = [("Red", "Ferrari"), ("Black", "Mustang"), ("Golden", "Lamborghini")]
############################### MY EXPECTED BEHAVIOR #################################
# first time : clear first car
@spawnat 2 begin car_cleaned = clean_car( cars[1]); put!(ch1, car_cleaned) end;
# second time : paint first car and clean second car
@spawnat 3 begin car_painted = paint( take!(ch1)... ); put!(ch2, car_painted) end;
@spawnat 2 begin car_cleaned = clean_car( cars[2]); put!(ch1, car_cleaned) end;
# third time: pay taxes of first car, paint the second car, clean the third car
@spawnat 4 begin car_ready = pay_taxes( take!(ch2)); put!(ch3, car_ready) end;
first_car = take!(ch3)
@spawnat 3 begin car_painted = paint( take!(ch1)... ); put!(ch2, car_painted) end;
@spawnat 2 begin car_cleaned = clean_car( cars[3]); put!(ch1, car_cleaned) end;
# fouth time: pay taxes of second car, paint third car
@spawnat 4 begin car_ready = pay_taxes( take!(ch2)); put!(ch3, car_ready) end;
second_car = take!(ch3)
@spawnat 3 begin car_painted = paint( take!(ch1)... ); put!(ch2, car_painted) end;
# fith time: pay taxes of the third car
@spawnat 4 begin car_ready = pay_taxes( take!(ch2)); put!(ch3, car_ready) end;
third_car = take!(ch3)
##################################### MY LOOP ATEMPT ######################################
function order_a_car(car_and_color)
@spawnat 2 begin car_cleaned = clean_car(car_and_color); put!(ch1, car_cleaned) end;
@spawnat 3 begin car_painted = paint( take!(ch1)... ); put!(ch2, car_painted) end;
@spawnat 4 begin car_ready = pay_taxes( take!(ch2)); put!(ch3, car_ready) end;
return nothing
end
@async for car_and_color ∈ cars
order_a_car(car_and_color )
end
my_cars = []
@async for _ in 1:length(cars)
push!(my_cars, take!(ch3))
end
println(my_cars)
To see that each stage goes to different Cores of the CPU, I created some random math operation inside each function just to force some changes in the system monitor. Because the previous code is already long, I removed them and put a sleep(1)
.
I would expect that just putting @async
in the for
you solve it, but was not the case. How to should I proceed ?