Communicating across worker processes

Hello. I just got into julia and i could use some help.

I am trying to create an application that listens for instructions form the command line and sends them to a worker process. The worker process performs some computations and returns the results. The results from this worker should be processed in the main process.

Aim

  1. I want the main and worker process to be alive until the "exit" command is passed via the cli input or I termiate the main process.

  2. I want to be able to control (spawn and terminate at will) a task on the worker process from either the main or worker processes.

  3. While the worker is peforming computations, it should also be able to receieve and process other information; like terminating a workking task

Current Implementation.

    using Distributed, ...

    addprocs(1)

    @everywhere function g(ic::Channel, rc::Channel, t_ref::Ref{Union{Task, Nothing}}) 
        # ic -> instructions channel
        # rc -> results channel
        # t_ref -> worker task reference 

        while true
            try
                if !isready(t_ref[]) continue end

                cmd = take!(ic)

                if cmd == "exit"
                    # close channels
                    # ...
                    break
                end

                if cmd == "solve"  

                    t_ref[] = Threads.@spawn begin 
                        res = PeformHeavyComputations()  # function has try_catch for InterruptException
                        put!(rc, res)  # send results back to main
                    end
                end

                if cmd == "stop" 
                    if !isnothing(t_ref)
                        # stop the computations 
                        Base.throwto(t_ref[], InterruptException())
                        t_ref[] = nothing
                    end
                end

            catch ex
                println("exception")
            end
        end
    end

    function f() 

        INSTRUCTIONS_CHANNEL = Channel(1)
        RESULTS_CHANNEL = Channel(1)
        T = Ref(nothing)

        _f = @spawnat 2 g(INSTRUCTIONS_CHANNEL, RESULTS_CHANNEL, T)

        # @async begin
        while true
            # listen for the command line and send instructions via channel to the worker _f 
            l = readline()

            put!(INSTRUCTIONS_CHANNEL)

            if isready(RESULTS_CHANNEL)
                r = take!(RESULTS_CHANNEL)
                ProcessResults(r) 
            end

            if l == "exit" 
                # wait for thread to complete exiting
                wait(_f)

                # exit main loop
                break
            end
        end
        # end
    end


    f()

Problems

  • the script blocks after _f = @spawnat 2 g(INS_C, RES_C, T) I think becuase of the while true ... end loop in g()

    • to address the blocking. I wrap the main while loop in an @async begin ... main while loop end block. but this causes the application to just run to completion and does
      not respect any of the while loops.
  • I do not know if how long the PerformHeavyComputations() function would take take and i do not have access to the code.

    • if i did, i would have passed a reference to an interrupt flag down into the function.

Questions

  1. How can I address the problems i am facing?
  2. Is my approach to building this application correct?
  3. What resources can i look at to get myself better aquainted with the multithreading and multiprocessing?

@ndehngwa, welcome to Julia Discourse.

Your example cannot be executed as is, so I made some alterations in the context of your declared objectives (if I were to write this from scratch myself, the approach would be quite different):

using Distributed

addprocs(1)

@everywhere function PeformHeavyComputations()
    println("Performing heavy computations")
    sleep(15)
    println("Heavy computations done")
    return 1
end

@everywhere function RunHeavyComputations(rc)
    res = PeformHeavyComputations()
    put!(rc, res)
end

@everywhere function ProcessResults(r)
    println("Processing results: ", r)
    sleep(1)
    r
end

@everywhere function g(ic, rc)
    # ic -> instructions channel
    # rc -> results channel
    # t_ref -> worker task reference 

    t_ref = Ref{Union{Task,Nothing}}(nothing)

    while true
        try
            @info "Waiting for instructions on g..."

            # if !isready(t_ref[])
            #     continue
            # end

            cmd = take!(ic)

            if cmd == "exit"
                # close channels
                # ...
                break
            end

            if cmd == "solve"
                @info "Starting heavy computations in a new task..."
                t_ref[] = Threads.@spawn RunHeavyComputations(rc)
            end

            if cmd == "stop"
                if !isnothing(t_ref)
                    # stop the computations 
                    Base.throwto(t_ref[], InterruptException())
                    t_ref[] = nothing
                end
            end

        catch ex
            println("exception")
        end
    end
end

function f()

    INSTRUCTIONS_CHANNEL = RemoteChannel(() -> Channel(1))
    RESULTS_CHANNEL = RemoteChannel(() -> Channel(1))

    #_f = @spawnat 2 g(INSTRUCTIONS_CHANNEL, RESULTS_CHANNEL, T)
    _f = Threads.@spawn remote_do(g, 2, INSTRUCTIONS_CHANNEL, RESULTS_CHANNEL)

    # @async begin
    while true
        # listen for the command line and send instructions via channel to the worker _f 
        @info "Enter command: "
        l = readline()

        put!(INSTRUCTIONS_CHANNEL, l)

        if isready(RESULTS_CHANNEL)
            r = take!(RESULTS_CHANNEL)
            ProcessResults(r)
            @info "Results received: ", r
        end

        if l == "exit"
            # wait for thread to complete exiting
            wait(_f)

            # exit main loop
            break
        end
    end
    # end
end


const t = Threads.@spawn f()

println("Main thread is running")

println("Do something else here...")

wait(t)

Notes:

  • when using @spawnat, you actually need to fetch the result - you want to do that when you have a kind of one-time computation that you just want to pull back on your main process. The @spawnat is incompatible with your intentions about long-running processes that wait for jobs to be delivered via CLI. Take a look at the documentation for @remote_do vs. @spawnat.
  • Channels vs. RemoteChannels: this is important topic, but not complicated. In general the RemoteChannel behaves in the multiprocess context the same as the Channel behaves in the multithreading context. However, please note that you used the regular channel in your code (also, take a look at the adjustments I made).
  • MWE is important: I am glad that you started asking questions here. One important advice to engage people in answering is to provide your code in a way that we can run it as a minimum working example. People can tweak your example without having to guess what your intention was in various parts of the code. For example, you don’t need to actually provide any proprietary code - you can just create a placeholder for stuff you cannot share: take a look at what I did with your non-existent definition for PeformHeavyComputations().

I hope this helps you advance on your learning path - and feel free to add more questions (especially if any of my modifications look strange to you).

I suggest as a next step extract the following chunk and make it independent of your CLI commands:

if isready(RESULTS_CHANNEL)
    r = take!(RESULTS_CHANNEL)
    ProcessResults(r) 
end

Right now, the snippet above only executes after you send a CLI command.

Julia documentation is not perfect in all the aspects, but is the best place to get familiar with these subjects. A good start might be:

But don’t spend too much time trying to understand all the content. Start playing with the examples early: try, fail, try, ask, fail… and succeed.

Start small with toy/simple examples and make sure you understand the underlying concepts - afterward, it will feel natural to build more complex systems.

Also, if something is unclear in the documentation, feel free to ask here on Discourse. There is a good chance somebody will help.

4 Likes

Thank you very much for timely and detialed response.
Let us hope my sanity does not degrade as i embark on this journey of julia

Sorry for the rather long MWE, at the moment of submitting, I couldn’t think of a shorter way of presenting the problem i was facing.

1 Like