I am trying to create small subprocesses that would operate in parallel to my main process. In the example below I have 10 different agents that are callable and that generate some numbers. I am interested in saving these numbers in a database that my main process uses. What I want is the agents to continuously generate data while the main process runs.
mutable struct Agent
b::Int
a::Int
end
function (a::Agent)()
sum(a .* rand(300)) + b
end
function agentprocess(db::Array{Float34,1}, a::Agent)
while true
push!(db, a())
if length(db) > 1000
popfirst!(db)
end
end
end
function mainprocess()
database = Array{Float64,1}()
agents = Array{Agent,1}()
for i in 1:10
push!(agents, Agent(rand(), rand()))
end
Threads.@threads for a in agents
agentprocess(database, a)
end
#Do stuff with the database for n iterations
#Stop all threads and return main process result
end
The problem with the @threads macro is that it cannot have the main process start right after the agent processes started. What is the right framework to do this. Also, I would like to give some sort of priority to the main process so that it is not constantly interrupted by the agent threads. Maybe by reserving a core to that process and letting the agent processes share the others ?
Start the main process on thread 1 and agents on the other threads? But the difficulty will be maintaining locks for access to the shared data, I think.
You need to use RemoteChannel to get something that works like a channel but allows tasks to run tasks asynchronously on separate processes. They work like regular channels but live on worker processes. See: Parallel Computing · The Julia Language
Edit: Let me add a small example where you run a function asynchronously on a remote worker, then do some work in parallel on the main process and then wait for the result of the asynchronous computation on the main process:
# Load Distributed std lib and add 1 worker process
using Distributed
addprocs(1)
# Create a RemoteChannel on worker process 2
ch = RemoteChannel(()->Channel{Array{Float64,1}}(1),2)
# Define a function that computes a value and puts it in a remote channel
@everywhere begin
function randsleep(ch,nr,ns)
x = randn(nr)
sleep(ns)
put!(ch,x)
end
end
#Call the function on worker 2, asynchronously
#This line will return immediately
@async remotecall(randsleep,2,ch,12,12)
# Do other stuff on the main process
# ...
# Block until the results are ready then take them out of the channel
local_result = take!(ch)