Multi-threads error (undefRefError) in simple ABM with Agents.jl

Dear all, I am working on an ABM and now I am trying to improve performance through in-model parallelization using @threads since I have 10^6 agents (I am working with fish populations and simulation is in 0D for now, I would like to avoid super-individuals atm).

My agents do not interact with each other so, ideally, multi-threading should be possible, but I struggled with function creating new agents (calling add_agent!). So I decided to run them separately and in series in a complex_step!() function. But I guess the same problem is with remove_agent!()

I get infact this error:

*julia> show(err)*
*1-element ExceptionStack:*
*LoadError: UndefRefError: access to undefined reference*
*Stacktrace:*
*  [1] getindex*
*    @ .\essentials.jl:13 [inlined]*
*  [2] iterate*
*    @ .\array.jl:945 [inlined]*
*  [3] filter!(f::var"#8#11", a::Vector{Person})*
*    @ Base .\array.jl:2721*
*  [4] complex_step!(model::StandardABM{Nothing, Person, typeof(Agents.Schedulers.fastest), Dict{Symbol, Real}, Random.TaskLocalRNG})*
*    @ Main c:\Users\elli2\Documents\PhD\Multi-SPelAgents\#MWE.jl:65*
*  [5] step!*
*    @ C:\Users\elli2\.julia\packages\Agents\xtlGn\src\simulations\step.jl:40 [inlined]*
*  [6] run!(model::StandardABM{Nothing, Person, typeof(Agents.Schedulers.fastest), Dict{Symbol, Real}, Random.TaskLocalRNG}, agent_step!::typeof(dummystep), model_step!::typeof(complex_step!), n::Int64; when::Bool, when_model::Bool, mdata::Nothing, adata::Nothing, obtainer::Function, agents_first::Bool, showprogress::Bool)*
*    @ Agents C:\Users\elli2\.julia\packages\Agents\xtlGn\src\simulations\collect.jl:151*
*  [7] run!(model::StandardABM{Nothing, Person, typeof(Agents.Schedulers.fastest), Dict{Symbol, Real}, Random.TaskLocalRNG}, agent_step!::Function, model_step!::Function, n::Int64)*
*    @ Agents C:\Users\elli2\.julia\packages\Agents\xtlGn\src\simulations\collect.jl:114*
*  [8] top-level scope*
*    @ c:\Users\elli2\Documents\PhD\Multi-SPelAgents\#MWE.jl:82*
*  [9] eval*
*    @ .\boot.jl:385 [inlined]*
* [10] include_string(mapexpr::typeof(REPL.softscope), mod::Module, code::String, filename::String)*
*    @ Base .\loading.jl:2070*
* [11] invokelatest(::Any, ::Any, ::Vararg{Any}; kwargs::@Kwargs{})*
*    @ Base .\essentials.jl:887*
* [12] invokelatest(::Any, ::Any, ::Vararg{Any})*
*    @ Base .\essentials.jl:884*
* [13] inlineeval(m::Module, code::String, code_line::Int64, code_column::Int64, file::String; softscope::Bool)*
*    @ VSCodeServer c:\Users\elli2\.vscode\extensions\julialang.language-julia-1.65.2\scripts\packages\VSCodeServer\src\eval.jl:263*
* [14] (::VSCodeServer.var"#67#72"{Bool, Bool, Bool, Module, String, Int64, Int64, String, VSCodeServer.ReplRunCodeRequestParams})()*
*    @ VSCodeServer c:\Users\elli2\.vscode\extensions\julialang.language-julia-1.65.2\scripts\packages\VSCodeServer\src\eval.jl:181*
* [15] withpath(f::VSCodeServer.var"#67#72"{Bool, Bool, Bool, Module, String, Int64, Int64, String, VSCodeServer.ReplRunCodeRequestParams}, path::String) *
*    @ VSCodeServer c:\Users\elli2\.vscode\extensions\julialang.language-julia-1.65.2\scripts\packages\VSCodeServer\src\repl.jl:274*
* [16] (::VSCodeServer.var"#66#71"{Bool, Bool, Bool, Module, String, Int64, Int64, String, VSCodeServer.ReplRunCodeRequestParams})()*
*    @ VSCodeServer c:\Users\elli2\.vscode\extensions\julialang.language-julia-1.65.2\scripts\packages\VSCodeServer\src\eval.jl:179*
* [17] hideprompt(f::VSCodeServer.var"#66#71"{Bool, Bool, Bool, Module, String, Int64, Int64, String, VSCodeServer.ReplRunCodeRequestParams})*
*    @ VSCodeServer c:\Users\elli2\.vscode\extensions\julialang.language-julia-1.65.2\scripts\packages\VSCodeServer\src\repl.jl:38*
* [18] (::VSCodeServer.var"#65#70"{Bool, Bool, Bool, Module, String, Int64, Int64, String, VSCodeServer.ReplRunCodeRequestParams})()*
*    @ VSCodeServer c:\Users\elli2\.vscode\extensions\julialang.language-julia-1.65.2\scripts\packages\VSCodeServer\src\eval.jl:150*
* [19] with_logstate(f::Function, logstate::Any)*
*    @ Base.CoreLogging .\logging.jl:515*
* [20] with_logger*
*    @ .\logging.jl:627 [inlined]*
* [21] (::VSCodeServer.var"#64#69"{VSCodeServer.ReplRunCodeRequestParams})()*
*    @ VSCodeServer c:\Users\elli2\.vscode\extensions\julialang.language-julia-1.65.2\scripts\packages\VSCodeServer\src\eval.jl:255*
* [22] #invokelatest#2*
*    @ Base .\essentials.jl:887 [inlined]*
* [23] invokelatest(::Any)*
*    @ Base .\essentials.jl:884*
* [24] (::VSCodeServer.var"#62#63")()*
*    @ VSCodeServer c:\Users\elli2\.vscode\extensions\julialang.language-julia-1.65.2\scripts\packages\VSCodeServer\src\eval.jl:34*
*in expression starting at c:\Users\elli2\Documents\PhD\Multi-SPelAgents\#MWE.jl:82*

running this MWE:

#MWE 

using Agents
using Distributed
@agent Person NoSpaceAgent begin
    type::Symbol
    height::Int64
end

properties = Dict(:time_sim => 0, 
                    :height_to_die => 30, 
                    :height_to_generate => 90,
                    :sum_h => 0.0)



#step functions
function random_h!(Person, model)
    Person.height = rand(1:100)
end

function die!(Person, model)
    if Person.height < model.height_to_die
        remove_agent!(Person, model)
    end
end

function add_people!(Person, model)
    #youngs generate less people
    if Person.type == :young && Person.height > model.height_to_generate
        for i in 1:10
            add_agent!(model, rand((:baby, :young, :adult)), rand(1:100))
        end
    #adults generate more people
    elseif Person.type == :adult && Person.height > model.height_to_generate
        for i in 10:20
            add_agent!(model, rand((:baby, :young, :adult)), rand(1:100))
        end
    end
end

function evolve_environment!(model)
    model.time_sim += 1
    model.sum_h = 0
    for person in allagents(model)
        model.sum_h += getproperty(person, :height)
    end
end

#complex step
function complex_step!(model)

    #first check of agents
    #The collect(values(allagents(model))) is used to create a copy of the agents in the model. 
    #This is necessary because you can't add or remove agents 
    #while iterating over them directly?

    all_agents = collect(values(allagents(model)))
    println(nagents(model))

    Threads.@threads for person in all_agents
                        random_h!(person, model)
                        die!(person, model)
                    end
    
    all_agents = collect(values(allagents(model)))
    young_adults = filter!(person -> person.type == :young || person.type == :adult, all_agents)

    println(nagents(model))
    #update of agents who survived and can reproduce; only young and adults can generate
    for person in young_adults
        add_people!(person, model)
    end
    
    evolve_environment!(model)
end

model = ABM(Person; properties)

#add agents
for i in 1:100
    add_agent!(model, rand((:baby, :young, :adult)), rand(1:100))
end
#run
run!(model, dummystep, complex_step!, 1000)

Probably is more than a minimum example but I tried to include all the functions I need in a simplified way and where I got errors.
I know multi-thread is really case-dependent and complicated, but still hope some of you can help since this is an important step for my simulation.
Thank you :grin:

1 Like

The error comes from this line (I haven’t run the code, I am assuming the stack trace you pasted is accurate)

    young_adults = filter!(person -> person.type == :young || person.type == :adult, all_agents)

which is after the multi threading.

Have you checked that all_agents is not empty after your “killing spree”? :smiley:

Yes the model still have many agents :smiley:

My idea was to threads what could be run in parallel and then converge again running the add_people!() on a subset of agents (actually in the complex simulation I created a scheduler to do that). So after the thread I thought the the model was updated and ready to run the function is series. But yeah, also in my complex model the problem gets out when there is a filtering somewhere. Should I make a separate function? a parallel_step! and another one taking the add_agent!() and evolve_environment part?

But when does it have agents? Did you check that after the Threads.@threads for the all_agents = collect(values(allagents(model))) is not empty?

I added in another version a println(nagents(model)) after the threads and after the add_agent!() part and there were still agents in it. I am modifying the MWE

I don’t know if relevant but in the scheduler I created I was retrieving the agents IDs with allids(model):

mutable struct scheduler_EggAdults2 end

function (sEA::scheduler_EggAdults2)(model::ABM)
    ids = [agent.id for agent in collect(allids(model))]
    # filter all ids whose agents have `w` less than some amount
    ids = filter!(id -> haskey(model.agents, id) && (model[id].type == :adult ||  model[id].type == :eggmass), ids)
    return ids
end

sEA = scheduler_EggAdults2()

and while trying to debugging to understand which created or removed agent was giving the problem, I could see that at some point, out of nowhere there was a very large ID (like 25472829186421) when the total number of agents (included removed agents) was 50000. I tried to fix that by moving to allagents() but I am going to try to replicate the error with a MWE and let you all know.

This is confusing. Why isn’t this just collect(allids(model))?

Given what you said so far, I have to run your MWE and examine it, as I don’t see a trivial solution! I can’t do it now but hopfeully soon :crossed_fingers:

1 Like

Yep sorry, my fault in modifying on the fly the code, thank you for your availability!

I tried to move outside the die!() function from the threading and it seems the simulation does not complain. I think the problem is that obviously remove_agent!() modify the model object. Need to figure out a smart way to bypass this problem since in my real simulation several functions I would like to run in parallel are expected to remove agents

Oh sorry, I should have read your initial code more carefully.

Yes, you can’t do a multi threaded loop over removing elements from a container. The threading rearranges the sequence in a way that indexing may not work.

Although with a dictionary container it should be working, hm…

CU!

George Datseris (he/him)

1 Like

I didn’t follow the discussion, but note that Dict isn’t thread safe.

1 Like

Yes, I thought the same but if I am not wrong I could have a vector container only by using an UnremovableABM right? @Datseris
and it’s not what I need. So I think I have to rearrange the code

Okay, I was wrong to think it would work :smiley:

CU!

George Datseris (he/him)

the vector would have the same problems if you tried to -remove- entries via multithreading.

Its just that parallelization in ABMS is not possible in the majority of cases due to algorithmic limitations, which is why Agents.jl offers no parallelization out of the box.

Another way to work around non-threadsafe removal is to add an agent type :dead (or :deceased). And instead of removing just marking agents as dead. Outside the threads loop, all the dead agents can be removed using:

remove_all!(model::ABM, f::Function)

with f being ag -> ag.type == ':dead' which is probably faster than going agent by agent.

Great idea!

But would the threading also be safe to even access the agents over threading (agents are just the values of a dictionary)

CU!

George

Once again, a technical comment from the sidelines :slight_smile:

Reading a dict (or array) is fine. Mutation is the issue.

However, you could use a lock to ensure thread-safe Dict access (at the cost of extra synchronization and thus reduced parallelism of course).

julia> d = Dict();

julia> lck = ReentrantLock();

julia> @threads for i in 1:15 # or @threads for ... end
           lock(lck) do
               d[i] = i # thread-safe mutation
           end
       end

julia> d
Dict{Any, Any} with 15 entries:
  5  => 5
  12 => 12
  8  => 8
  1  => 1
  6  => 6
  11 => 11
  9  => 9
  14 => 14
  3  => 3
  7  => 7
  13 => 13
  4  => 4
  15 => 15
  2  => 2
  10 => 10
1 Like

I was thinking the same!

That’s great! I am new to these technical parts, I really appreciate all your support!

Apparantly, the need for a thread-safe Dicts is not new and there is a package:

which basically does it. The only need is to use this instead of the standard Dict. This might require defining:

using ThreadSafeDicts
Agents.construct_agent_container(::Type{<:ThreadSafeDict}, A) = ThreadSafeDict{Int,A}

and maybe this is good-to-go. But haven’t tested this at all.
The ABM construction call should have ThreadSafeDict as the value of container parameter.

1 Like