Hello. I just got into julia and i could use some help.
I am trying to create an application that listens for instructions form the command line and sends them to a worker process. The worker process performs some computations and returns the results. The results from this worker should be processed in the main process.
Aim
-
I want the main and worker process to be alive until the
"exit"
command is passed via the cli input or I termiate the main process. -
I want to be able to control (spawn and terminate at will) a task on the worker process from either the main or worker processes.
-
While the worker is peforming computations, it should also be able to receieve and process other information; like terminating a workking task
Current Implementation.
using Distributed, ...
addprocs(1)
@everywhere function g(ic::Channel, rc::Channel, t_ref::Ref{Union{Task, Nothing}})
# ic -> instructions channel
# rc -> results channel
# t_ref -> worker task reference
while true
try
if !isready(t_ref[]) continue end
cmd = take!(ic)
if cmd == "exit"
# close channels
# ...
break
end
if cmd == "solve"
t_ref[] = Threads.@spawn begin
res = PeformHeavyComputations() # function has try_catch for InterruptException
put!(rc, res) # send results back to main
end
end
if cmd == "stop"
if !isnothing(t_ref)
# stop the computations
Base.throwto(t_ref[], InterruptException())
t_ref[] = nothing
end
end
catch ex
println("exception")
end
end
end
function f()
INSTRUCTIONS_CHANNEL = Channel(1)
RESULTS_CHANNEL = Channel(1)
T = Ref(nothing)
_f = @spawnat 2 g(INSTRUCTIONS_CHANNEL, RESULTS_CHANNEL, T)
# @async begin
while true
# listen for the command line and send instructions via channel to the worker _f
l = readline()
put!(INSTRUCTIONS_CHANNEL)
if isready(RESULTS_CHANNEL)
r = take!(RESULTS_CHANNEL)
ProcessResults(r)
end
if l == "exit"
# wait for thread to complete exiting
wait(_f)
# exit main loop
break
end
end
# end
end
f()
Problems
-
the script blocks after
_f = @spawnat 2 g(INS_C, RES_C, T)
I think becuase of thewhile true ... end
loop ing()
- to address the blocking. I wrap the main while loop in an
@async begin ... main while loop end
block. but this causes the application to just run to completion and does
not respect any of thewhile
loops.
- to address the blocking. I wrap the main while loop in an
-
I do not know if how long the
PerformHeavyComputations()
function would take take and i do not have access to the code.- if i did, i would have passed a reference to an interrupt flag down into the function.
Questions
- How can I address the problems i am facing?
- Is my approach to building this application correct?
- What resources can i look at to get myself better aquainted with the multithreading and multiprocessing?