Are Unions thread-safe in Agents.jl?

Yesterday I asked a very similar question, and my code turned out to have had a race condition. That was this Multithreading agents.jl with @threads for all agents question.

That was a simplified example. My actual code has two types of agents. I create the model using a Union of both agents when calling ABM(). However, when I do that, I get the same type of cryptic stack trace that looks almost identical to yesterday.
BTW: I would prefer to use UnremoveableABM() to create the model, but I don’t know how to do that with different kinds of agents.

So here is a slightly more elaborate example. First the stack trace:

 julia demo_agents_thread.jl  
┌ Warning: Agent type is not concrete. If your agent is parametrically typed, you're probably
│ seeing this warning because you gave `Agent` instead of `Agent{Float64}`
│ (for example) to this function. You can also create an instance of your agent
│ and pass it to this function. If you want to use `Union` types for mixed agent
│ models, you can silence this warning.
└ @ Agents ~/.julia/packages/Agents/kEwy8/src/core/model_concrete.jl:139
ERROR: LoadError: TaskFailedException
Stacktrace:
 [1] wait
   @ ./task.jl:345 [inlined]
 [2] threading_run(fun::var"#6#threadsfor_fun#7"{var"#6#threadsfor_fun#5#8"{StandardABM{Nothing, Union{UserA, UserB}, typeof(Agents.Schedulers.fastest), Dict{Symbol, Int64}, Random.TaskLocalRNG}, Base.ValueIterator{Dict{Int64, Union{UserA, UserB}}}}}, static::Bool)
   @ Base.Threads ./threadingconstructs.jl:38
 [3] macro expansion
   @ ./threadingconstructs.jl:89 [inlined]
 [4] model_step!(model::StandardABM{Nothing, Union{UserA, UserB}, typeof(Agents.Schedulers.fastest), Dict{Symbol, Int64}, Random.TaskLocalRNG})
   @ Main ~/src/masiri/Initial model/demo_agents_thread.jl:68
 [5] step!
   @ ~/.julia/packages/Agents/kEwy8/src/simulations/collect.jl:0 [inlined]
 [6] run!(model::StandardABM{Nothing, Union{UserA, UserB}, typeof(Agents.Schedulers.fastest), Dict{Symbol, Int64}, Random.TaskLocalRNG}, agent_step!::typeof(dummystep), model_step!::typeof(model_step!), n::Int64; when::Bool, when_model::Bool, mdata::Vector{Symbol}, adata::Nothing, obtainer::Function, agents_first::Bool, showprogress::Bool)
   @ Agents ~/.julia/packages/Agents/kEwy8/src/simulations/collect.jl:153
 [7] top-level scope
   @ ~/src/masiri/Initial model/demo_agents_thread.jl:76

    nested task error: MethodError: no method matching firstindex(::Base.ValueIterator{Dict{Int64, Union{UserA, UserB}}})
    Closest candidates are:
      firstindex(::Any, ::Any) at abstractarray.jl:402
      firstindex(::Union{Tables.AbstractColumns, Tables.AbstractRow}) at ~/.julia/packages/Tables/AcRIE/src/Tables.jl:182
      firstindex(::LinRange) at range.jl:693
      ...
    Stacktrace:
     [1] #6#threadsfor_fun#5
       @ ./threadingconstructs.jl:69 [inlined]
     [2] #6#threadsfor_fun
       @ ./threadingconstructs.jl:51 [inlined]
     [3] (::Base.Threads.var"#1#2"{var"#6#threadsfor_fun#7"{var"#6#threadsfor_fun#5#8"{StandardABM{Nothing, Union{UserA, UserB}, typeof(Agents.Schedulers.fastest), Dict{Symbol, Int64}, Random.TaskLocalRNG}, Base.ValueIterator{Dict{Int64, Union{UserA, UserB}}}}}, Int64})()
       @ Base.Threads ./threadingconstructs.jl:30
in expression starting at /home/andreas/src/masiri/Initial model/demo_agents_thread.jl:76

and here the code:


using Agents, Base.Threads

const FIB_NUM = 10

mutable struct UserA <: AbstractAgent
    id::Int
    fibonacci_num::Int
end

mutable struct UserB <: AbstractAgent
    id::Int
    fibonacci_num::Int
    money::Float64
end

mutable struct MyModel 
    agents::Vector{AbstractAgent}
    step::Int
    total_fib::Float64
end

function fibonacci(n::Int)
    if n <= 1
        return n
    else
        return fibonacci(n - 1) + fibonacci(n - 2)
    end
end

function userA_step!(agent::UserA, model)
    agent.fibonacci_num += fibonacci(FIB_NUM)
end

function userB_step!(agent::UserB, model)
    agent.fibonacci_num += fibonacci(FIB_NUM)
    agent.money += 1.1
end

function create_my_model()
    n_users = 100
    usersA = [UserA(i, 0) for i in 1:n_users]
    usersB = [UserB(i + n_users, 0, 0.0) for i in 1:n_users]
    agents = vcat(usersA, usersB)

    properties = Dict(
        :total_fib => 0,
        :tick => 1
    )

    model = ABM(Union{UserA, UserB}; properties=properties)
    for agent in agents
        add_agent!(agent, model)
    end

    return model
end

function agent_step!(agent, model)
    if isa(agent, UserA)
        userA_step!(agent, model)
    elseif isa(agent, UserB)
        userB_step!(agent, model)
    end
end

function model_step!(model)
    model.tick += 1
    @threads for agent in allagents(model)
        agent_step!(agent, model)
    end
    model.total_fib = sum(model[id].fibonacci_num for id in allids(model))
end

model = create_my_model()
n_steps = 356
run!(model, dummystep, model_step!, n_steps-1, mdata = [:total_fib])
println("total Fibonacci:", model.total_fib )

I would be happy if someone could explain how I could pinpoint the issue by looking at the stack trace. It’s a total guessing game of what could be going on for me. That introducing a union could lead to a race condition is not obvious to me.

Agents.jl returns iterators whenever possible to avoid allocations. This is more performant in most situations.

allagents is ultimately a Values iterator over a Dict. Values has internal state and thus not thread safe.

The solution (note: I haven’t had the chance to test) would be to containerise the iterator before sending it to your thread call:

collect(allagents(model))

1 Like

It is possible to get the agets by index allids() instad of a full instance `allagents(). Does that avoid the allocation, and would it solve the issue at hand, too?

If so, how can I rewrite the step functions to use indices instead of real agent instances?

A Dict and a value-iterator with an internal state are features of the ABM() model. UnremoveableABM() is a list and faster, too. Would switching to that make it thread save? I wanted to do that anyway. How can I do that?

collect() makes the stack trace go away and allows the code to run. I would like answers on the other issues (like how to decipher what goes on in the stack trace) because I don’t want to come here for every instance of firstindex() :slight_smile:

The stacktrace tells you pretty clearly what’s going wrong:

What @Libbum described is exactly the problem. And his recommendation to collect the value iterator first is a good solution and I would have recommended the same.

Yes, that’s likely to be the case because elements of arrays have fixed positions and therefore firstindex used inside the @threads macro should work as expected.

Note that this has nothing to do with Agents.jl in particular:

julia> d = Dict{Int, Union{Int,Float64}}()
Dict{Int64, Union{Float64, Int64}}()

julia> d[1] = 1;

julia> d[2] = 2.0;

julia> Threads.@threads for v in d
           @show v
       end
ERROR: TaskFailedException

    nested task error: MethodError: no method matching firstindex(::Dict{Int64, Union{Float64, Int64}})

or even with the Union type:

julia> d = Dict{Int,Float64}()
Dict{Int64, Float64}()

julia> d[1] = 1.0
1.0

julia> d[2] = 2.0
2.0

julia> Threads.@threads for v in d
           @show v
       end
ERROR: TaskFailedException

    nested task error: MethodError: no method matching firstindex(::Dict{Int64, Float64})

The issue is that for multi-threading you need to have an indexable collection, such that the workload can be split among threads, and a Dict is not:

julia> firstindex(d)
ERROR: MethodError: no method matching firstindex(::Dict{Int64, Float64})

ps: I don’t understand why this is related to thread safety on the other side, seems to be just an incompatibility of the way threading is implemented relative to indexing.

1 Like

FLoops seems to deal with that, for instance:

julia> using FLoops

julia> d = Dict{Int,Float64}()
Dict{Int64, Float64}()

julia> for i in 1:100
           d[i] = 1.0
       end;

julia> @floop for v in d
           @reduce s += d[2]  
       end

julia> s
100.0

julia> Threads.nthreads()
12

Replacing the threaded loop with:

julia> using FLoops

julia> function model_step!(model)
           model.tick += 1
           @floop for agent in allagents(model)
               agent_step!(agent, model)
           end
           model.total_fib = sum(model[id].fibonacci_num for id in allids(model))
       end

makes the code run (correctness not tested!).

(I don’t see any concurrency problem if the agent_step! does not modify model).

Another alternative is to just iterate over the length of the dict, if you are sure that the keys of the dicts correspond to the indexes of the agents:

julia> function model_step!(model)
           model.tick += 1
           @threads for id in 1:length(allagents(model))
               agent_step!(model[id], model)
           end
           model.total_fib = sum(model[id].fibonacci_num for id in allids(model))
       end

(how much Agents.jl is happy with accessing the agents by model[id] I’m not sure)

2 Likes

I appreciate that this is a simple example and perhaps not representative of the complexity your real code, but in cases like these it’s generally preferable to only have a single agent type with a parameter specifying their “type” i.e.

@enum UserType::Bool A B # Could also be left untyped

mutable struct User <: AbstractAgent
    id::Int
    type::UserType
    fibonacci_num::Int
    money::Float64
end

And by preferable I mean the performance tradeoff of having an abstract container is not (typically) made up being able to statically dispatch on agent types. Your mileage may vary.

1 Like