How to make parallel subprocesses?

I am trying to create small subprocesses that would operate in parallel to my main process. In the example below I have 10 different agents that are callable and that generate some numbers. I am interested in saving these numbers in a database that my main process uses. What I want is the agents to continuously generate data while the main process runs.

mutable struct Agent
    b::Int
    a::Int
end

function (a::Agent)()
    sum(a .* rand(300)) + b
end

function agentprocess(db::Array{Float34,1}, a::Agent)
    while true
        push!(db, a())
        if length(db) > 1000
            popfirst!(db)
        end
    end
end

function mainprocess()
    database = Array{Float64,1}()
    agents = Array{Agent,1}()
    for i in 1:10
        push!(agents, Agent(rand(), rand()))
    end
    Threads.@threads for a in agents 
        agentprocess(database, a)
    end

    #Do stuff with the database for n iterations
    #Stop all threads and return main process result
end

The problem with the @threads macro is that it cannot have the main process start right after the agent processes started. What is the right framework to do this. Also, I would like to give some sort of priority to the main process so that it is not constantly interrupted by the agent threads. Maybe by reserving a core to that process and letting the agent processes share the others ?

https://docs.julialang.org/en/v1/manual/parallel-computing/
Would Channels be of use here?

mmh, maybe. I have to read to understand what they are. Can they distribute on multiple cores ?

It doesn’t seem so according to the last paragraph of the Channels section.

Start the main process on thread 1 and agents on the other threads? But the difficulty will be maintaining locks for access to the shared data, I think.

You need to use RemoteChannel to get something that works like a channel but allows tasks to run tasks asynchronously on separate processes. They work like regular channels but live on worker processes. See: https://docs.julialang.org/en/v1/manual/parallel-computing/index.html#Channels-and-RemoteChannels-1

Edit: Let me add a small example where you run a function asynchronously on a remote worker, then do some work in parallel on the main process and then wait for the result of the asynchronous computation on the main process:

# Load Distributed std lib and add 1 worker process
using Distributed
addprocs(1)

# Create a RemoteChannel on worker process 2
ch = RemoteChannel(()->Channel{Array{Float64,1}}(1),2)

# Define a function that computes a value and puts it in a remote channel
@everywhere begin
    function randsleep(ch,nr,ns)
        x = randn(nr)
        sleep(ns)
        put!(ch,x)
    end
end

#Call the function on worker 2, asynchronously
#This line will return immediately
@async remotecall(randsleep,2,ch,12,12)

# Do other stuff on the main process
# ...

# Block until the results are ready then take them out of the channel
local_result = take!(ch)
3 Likes

Have you tried using @async and @sync? Some examples here