Thank you @sgaure and @Oscar_Smith. For the original question, the change with channels would be either using mutable values passed into both tasks and using an unbuffered channel for synchronizing, or to use a buffered channel that juggles the values between tasks. So the modified code for the above task will be
Note (I missed to mention that function_require_sync is also a mutating function, should have been made function_require_sync! and it modified arg1 to be used by Task1).
# Construction multi channel communicators.
# Create graphs between each pair of processes.
function create_communicators(Num_processes::Integer)
Num_channel_pairs = binomial(Num_processes,2)
communicators = Array{Channel{T},2}(undef,2,Num_channel_pairs)
return communicators
end
# Method 1: Using mutable objects, and simple sync messages. The channel will be unbuffered, and mutable objects will be passed as argument.
function function1_loop!(arg1,arg_task1,communicators::Array{Channel{T},2},arg_func1...) where T
sync_message::T = 1
iterations = size(arg_task1,1)
for index in range(1,iterations)
arg_iterable_Task1 = view(arg_task1,:,index)
take!(communicators[1,1])
function1!(arg1,arg_func1…,arg_iterable_Task1)
put!(communicators[2,1],sync_message)
# Here arg1 (a mutable object) is mutated and shared between the two functions.
# If required arg1, could be the sync message being passed between the two functions.
end
end
function function2_loop!(arg1,arg2,arg_task2,communicators::Array{Channel{T},2},arg_func2...) where T
sync_message::T = 1
iterations = size(arg_task2,1)
for index in range(1,iterations)
arg_iterable_Task2 = view(arg_task2,:,index)
take!(communicators[2,1])
function_require_sync!(arg1,arg2,arg_sync…) # Modifies arg1 in place to be used by function! in next iteration.
put!(communicators[1,1],sync_message)
# If required arg1, could be the sync message being passed between the two functions.
function2!(arg2,arg_func2…,arg_iterable_Task2)
end
end
function asynchronous_compute_unbuffered!(arg2,arg_task1,arg_task2)
arg1 = some_mutable_object()
arg_func1 = tuple(some_value...)
arg_func2 = tuple(some_value...)
Num_processes = 2
communicators = create_communicators(Num_processes)
communicators .= Channel{T}(0)
put!(communicators[1,1],1)
# Alternately, arg1 could be the sync message being passed between the two functions.
# In that case, the functions do not consume arg1 as argument, and arg1 is no longer mutable.
Task_1_loop = @spawn function1_loop!(arg1,arg_task1,communicators,arg_func1...)
Task_2_loop = @spawn function2_loop!(arg1,arg2,arg_task2,communicators,arg_func2...)
#wait(Task_1_loop) #Wait not really necessary since Task_2 completion is controlled by channel and hence Task 1.
wait(Task_2_loop)
take!(communicators[1,1])
close.(communicators)
end
# Method 2: Using channels as sync messages, passing arguments as channel messages.
# Alternate version passing arg1 as a sync message.
function function1_loop!(arg_task1,communicators::Array{Channel{T},2},arg_func1...) where T
iterations = size(arg_task1,1)
for index in range(1,iterations)
arg_iterable_Task1 = view(arg_task1,:,index)
arg1 = take!(communicators[1,1])
arg2 = function1(arg1,arg_func1…,arg_iterable_Task1)
put!(communicators[2,1],arg2)
end
end
function function2_loop!(arg_task2,communicators::Array{Channel{T},2},arg_func2...) where T
iterations = size(arg_task2,1)
for index in range(1,iterations)
arg_iterable_Task2 = view(arg_task2,:,index)
arg2 = take!(communicators[2,1])
arg1 = function_require_sync(arg2,arg_sync…)
put!(communicators[1,1],arg1)
function2!(arg2,arg_func2…,arg_iterable_Task2)
end
end
function asynchronous_compute_buffered!(arg_task1,arg_task2)
arg1 = some_object()
arg_func1 = tuple(some_value...)
arg_func2 = tuple(some_value...)
Num_processes = 2
communicators = create_communicators(Num_processes)
communicators[1] = Channel{type(arg1)}(1) # How to optimize if arg1 is an array or matrix? Channel{eltype(arg1)}(size(arg1)) if arg1 is an array.
communicators[2] = Channel{type(arg2)}(1) # Channel{eltype(arg2)}(size(arg2)) ??
put!(communicators[1,1],arg1)
Task_1_loop = @spawn function1_loop!(arg_task1,communicators,arg_func1...)
Task_2_loop = @spawn function2_loop!(arg_task2,communicators,arg_func2...)
wait(Task_2_loop)
arg1_out = take!(communicators[1,1])
close.(communicators)
return arg1_out
end
Can anyone teach me how I can make a package to make these communications faster. Ideally, I should be able to support both cases mentioned above. Willing to delve into the deeper parts of the libraries/base Julia to do this. Ideally, it should be able to use existing channel infrastructure with a macro on the put! and take!.