How to process a dictionary with multi tasking?

Hello

I have a dictionary that I currently process this way:

a = pairs(tiledict)
foreach(a) do paire
    # some stuff goes here using paire[1] and paire[2]
end

I have to go through the whole dictionary. It does the work without problem.

I am now trying to multitask this. I would like each thread to pick elements of the dictionary without duplication and perform the same processing. I have tried using Threads.foreach() but it requires some kind of channel. My Threads don’t need to communicate. Also, I provide each thread its own bucket where the processing result is saved to avoid using ReentrantLock. Only at the end of thread execution I merge the results in a common bucket using a lock.

I have used
Threads.@threads for i in 0:somenumber
in other parts of my program but they didn’t need to read a dictionary.

Can someone provide me with a very simple sample of a multitasked program reading a dictionary?

Thanks

Maybe: Public API · OhMyThreads.jl

Wouldn’t this work?

Threads.@threads for (k,v) in pairs(d)
    # work
end

I have tried this simple case with the following error message.

julia> dico = Dict{Int,Int}()
Dict{Int64, Int64}()

julia> for i in 1:200 merge!(dico, Dict(i => 2000+i)) end
julia> dico
Dict{Int64, Int64} with 200 entries:
  56  => 2056
  35  => 2035
  110 => 2110
  60  => 2060
  30  => 2030
  6   => 2006
  67  => 2067
  73  => 2073
  ⋮   => ⋮
julia> Threads.@threads for (k,v) in pairs(dico)
              print("thread: ", threadid(), " key: ", k, "\t")
          end
ERROR: TaskFailedException

    nested task error: MethodError: no method matching firstindex(::Dict{Int64, Int64})
    The function `firstindex` exists, but no method is defined for this combination of argument types.

    Closest candidates are:
      firstindex(::Any, ::Any)
       @ Base abstractarray.jl:450
      firstindex(::Base.JuliaSyntax.SourceFile)
       @ Base C:\workdir\base\JuliaSyntax\src\source_files.jl:131
      firstindex(::Markdown.MD)
       @ Markdown C:\Users\denys\.julia\juliaup\julia-1.11.1+0.x64.w64.mingw32\share\julia\stdlib\v1.11\Markdown\src\parse\parse.jl:36
      ...

    Stacktrace:
     [1] #23#threadsfor_fun#4
       @ .\threadingconstructs.jl:237 [inlined]
     [2] #23#threadsfor_fun
       @ .\threadingconstructs.jl:219 [inlined]
     [3] (::Base.Threads.var"#1#2"{var"#23#threadsfor_fun#5"{var"#23#threadsfor_fun#4#6"{Dict{Int64, Int64}}}, Int64})()
       @ Base.Threads .\threadingconstructs.jl:154

...and 7 more exceptions.

Stacktrace:
 [1] threading_run(fun::var"#23#threadsfor_fun#5"{var"#23#threadsfor_fun#4#6"{Dict{Int64, Int64}}}, static::Bool)
   @ Base.Threads .\threadingconstructs.jl:172
 [2] macro expansion
   @ .\threadingconstructs.jl:189 [inlined]
 [3] top-level scope
   @ .\REPL[13]:1

Looks like we are trying to index a dictionary.

However, you gave me an idea: Use a pop!() instead. The following works well, at the expense of emptying my dictionary:

julia> for i in 1:200 merge!(dico, Dict(i => 2000+i)) end
julia> my_lock = Threads.ReentrantLock()
julia> Threads.@threads for i in 1:200
           Threads.lock(my_lock) do
               tile = pop!(dico)
               println("threadid: ", Threads.threadid(), "  key: ", tile[1])
           end
       end

It’s a bit silly, but I can then merge! the extracted pair in another dictionary and save my data this way.
The lock being expensive, in a practical case I would probably pop! several elements of the dictionary before releasing the lock to another thread.

Perhaps

might be relevant?

Then you could do:

ks = collect(keys(d))

Threads.@threads for k in ks
    v=d[k]
...

To be clear, the other suggestions with packages probably have nice conveniences, but if you want to avoid a dependency, I think this should work even if it costs you an allocation.

1 Like

Thanks

Much better than my pop! solution.

1 Like

Is this thread-safe?

If you’re writing to the dictionary, no. But concurrent reads should be fine, unless I’m deeply misunderstanding how Dict is implemented.

Edit:

In my case I am only reading. What I do not want to happen is two threads reading the same entry in the dict and thus producing twice the same result. My count would be completely off.

For the creation if was I used multithreading but with a lock before the merge!. I also used an intermediary dictionary for each threads to avoid the cost of the lock, which was used only when I merged all dictionaries together.