TaskFailedException with @threads macro with Dict

When using Julia v1.7.2, I get a TaskFailedException when trying to use the @threads macro on a for loop over a dictionary, even though a non-threaded for loop works correctly. I lock the DataFrame that is written to. Does anyone see what I’m doing wrong? Here’s my code:

using DataFrames

txt = "Let the stormy clouds chase, everyone from the place."
wds = split(txt, r"\W+", keepempty=false)
wds_dict = Dict(w=>i for (i, w) in enumerate(wds))

df = DataFrame(wd = String[], index = Int64[])
lk = ReentrantLock()
#for (w, i) in wds_dict  # this for loop work correctly
Threads.@threads for (w, i) in wds_dict
    lock(lk) do  
        push!(df, [w i])
    end
end

println(df)
println(Threads.nthreads())

And here’s the error message:

ERROR: LoadError: TaskFailedException
Stacktrace:
 [1] wait
   @ ./task.jl:334 [inlined]
 [2] threading_run(func::Function)
   @ Base.Threads ./threadingconstructs.jl:38
 [3] top-level scope
   @ ./threadingconstructs.jl:97

    nested task error: MethodError: no method matching firstindex(::Dict{SubString{String}, Int64})
    Closest candidates are:
      firstindex(::Any, ::Any) at ~/Downloads/julia-1.7.2/share/julia/base/abstractarray.jl:396
      firstindex(::Union{Tables.AbstractColumns, Tables.AbstractRow}) at ~/.julia/packages/Tables/PxO1m/src/Tables.jl:177
      firstindex(::DataFrames.DataFrameColumns) at ~/.julia/packages/DataFrames/MA4YO/src/abstractdataframe/iteration.jl:189
      ...
    Stacktrace:
     [1] (::var"#4#threadsfor_fun#4"{Dict{SubString{String}, Int64}})(onethread::Bool)
       @ Main ./threadingconstructs.jl:70
     [2] (::var"#4#threadsfor_fun#4"{Dict{SubString{String}, Int64}})()
       @ Main ./threadingconstructs.jl:52
in expression starting at /home/earl/Downloads/delete.jl:10

I think (without looking) it is because enumerating over Dicts uses an Iterator with internal state which is not threadsafe
This works

    Threads.@threads for (w,i) in collect(wds_dict)
        lock(lk) do  
            push!(df, [w i])
        end
    end

Although I prefer to just collect the keys

    k = collect(keys(wds_dict))
    Threads.@threads for i in 1:length(k)
        lock(lk) do  
            push!(df, [k[i] wds_dict[k[i]]])
        end
    end
1 Like

@lawless-m thank you much! That fixed the problem.

1 Like

There are a couple of anti-patterns:

  1. Using @threads for processing collection type other than an array type
  2. Using lock
  3. Incrementally constructing DataFrame

You can use FLoops to avoid them:

using FLoops

@floop for (w, i) in wds_dict
    @reduce wd = append!(String[], (w,))
    @reduce index = append!(Int[], (i,))
end

df = DataFrame(wd = wd, index = index)

For more information, see Data-parallel programming in Julia

1 Like

Let’s go all the way:

df = rename!(DataFrame(enumerate(wds)), ["1"=>:index, "2"=>:wd])