@sync is not working

Hello,

I am newbie in Julia. I am writing a small program which reads topics/records from Apache Kafka, process it and writes as another topic back.

My main function which is in main.jl file is as follows. There are basically three small modules, specifically one reads from kafka, another writes to kafka, the other is the processing task which is called algorithm. It is supposed to make some calculations and report back the result.

Modules communicate via channels. main.jl just initializes the modules with appropriate channels. Init interfaces of the modules return tasks which consist of infinite loop that reads, writes to kafka and do processing etc.

Init functions of these modules return tasks which are created via @async. I expect @sync to block on these tasks which are listed between begin ... end block. However, the main function returns and the tasks dies, before the previously created tasks. I tried yiedto one of the previously created tasks, wrapping init calls in anonymous functions, tuples, you name it. Only map function that is commented out below works. However it is counter intuitive and I would like to understand why @sync does not work as it is supposed to?

What is wrong with this code? Any help much appreciated.

main.jl file

include("kafka_consumer.jl")
include("kafka_producer.jl")
include("algorithm.jl")

import .TPRROKafkaConsumer as cons
import .TPRROKafkaProducer as prod
import .TPRROAlgorithm     as alg

const input_channel_size  = 100;
const output_channel_size = 100;

function main()

  input_channel  = Channel{Dict}(input_channel_size)
  output_channel = Channel{Dict}(output_channel_size)

  @sync begin
           cons.init(input_channel)
           prod.init(output_channel)
           alg.init(input_channel, output_channel)
   end

#=
  map(wait,[cons.init(input_channel),
            prod.init(output_channel),
            alg.init(input_channel, output_channel)])
=#

end

main()

Hi - agree that it may not be so intuitive but @sync only works when it wraps @async calls directly, i.e. there cannot be function boundaries between them.
Instead, remove all @async from your init() functions and instead place them directly within the @sync block.
Like

@sync begin
  @async …
  @async …
  …
end

Thank you!!! The code has become very clean. I just removed @async in the functions which wraps infinite loops and simply converted to following and works perfectly.

 @sync begin
          @async cons.init(input_channel)
          @async prod.init(output_channel)
          @async alg.init(input_channel, output_channel)
   end

One last question. How can I make this code to threaded version? Is below code the right way to do it?

 @sync begin
          Threads.@spawn cons.init(input_channel)
          Threads.@spawn prod.init(output_channel)
          Threads.@spawn alg.init(input_channel, output_channel)
   end

In theory yes. Although you have to be cautious when communicating between threads using channels. They are not threadsafe and although the official Julia docs are not very clear on this topic, it seems that using channels should be reserved for asynchronous programming using @async only.

1 Like

I have automatically thought that channels were thread safe. Then what would be the purpose of them? Erlang like structures are imported to many languages for this purpose to the best of my knowledge to communicate processes thread safe way without needing usual thread synchronization primitives such as semaphores, conditions, locks etc. I am very much badly surprised about it. Anyhow thank you for pointing this important point. I really appreciate

Hi again - looking at the implementation for Channel it does indeed look thread-safe, e.g. with the use of locks for put! and take! calls.
However, this is not mentioned in the docs which initially led to my confusion.
Sry bout that - happy multithreading

2 Likes