Advice on smart multi-threading to minimize calling an expensive function

Hey all! :slight_smile:

I have been struggling a few days now on how to approach this. I have a set of query arrays and a set of reference arrays and I want to compare them. I do this via an index, which is expensive to create. So ideally I only call the index once for each reference, however, the caveat is that I need info from previous query-reference comparisons. I namely want to end up with an array (coverage) that records the maximum number of matches for a query compared to all references. Let me illustrate this with some simplified code:

using DataStructures: DefaultDict

function index(x::Array{Int64,1})
    ind = DefaultDict{Int64, Vector{Int64}}(() -> Int64[])
    for (i, val) in enumerate(x)
        push!(ind[val], i)
    end
    sleep(2) # more work..
    return ind
end

function match(q::Array{Int64}, someIndex::DefaultDict{Int64,Array{Int64,1}}, coverage::Array{Int32,1})
    for (i, val) in enumerate(q)
        # replace if we have more matches than before
        covered_by = length(get(someIndex, val, [])) 
        if covered_by > coverage[i]
            println("Replaced: ", coverage[i], " by ", covered_by)
            coverage[i] = covered_by
        end
    end   
    return coverage
end


function main()
    
    println("Using  ", Threads.nthreads() ," threads")
    
    queries = [[1,2,3,1], [5,6,7,8,9]]
    refs = [[1,2,2,2], [1,1,1,1,6,1], [5,6,6]]
    
    Threads.@threads for q in queries
        println(Threads.threadid()," Q: ", q)
        # query coverage vector
        coverage = zeros(Int32, length(q))
        # Compare to all refs
        for r in refs 
            println(Threads.threadid(), " is building index for: ", r)
            r_index = index(r)
            coverage = match(q, r_index, coverage)
        end
        println(coverage) # Store "coverage" somewhere
    end
end

main()

Note

  • Both the index and match function are much more complex in reality, but I only kept the fundamentals to illustrate the threading problem
  • I work with much bigger vectors than shown, query and reference vectors hold over >1M elements. So, for example, keeping them in RAM and merging them afterwards will be infeasible

In the code above I multi-thread every query comparisons, with the threads being sticky according to here, this should work. However, doing it this way I have to call the expensive index function for the same reference multiple times (see the println), which I really want to avoid. I have to do 13 million query-reference, having 600 queries I would have to call the index 600x more times this way. I tried:

  • A Channel to which I “put” all results. Then I have a Dict mapping each query to its coverage array and update it there. I ran this with 70 CPUs and the Channel seems to struggle to keep up with this. The matching goes fast but the Channel staggers the performance

  • Writing everything to a file while locking. I thought I could just parse this afterwards. My dataset however involves 13M comparisons, leaving me with over 15 Trillion “coverage points”. Just reading that on our HDD takes ages. (P.s 13M writes of >1M element arrays also wasn’t a great idea :face_with_open_eyes_and_hand_over_mouth:)

  • Now I thought of dedicating some CPUs to indexing the references and then share them with other CPUs that perform the matching. I’m not familiar enough with Julia to implement this and maybe this is overkill, hence I came to ask for advice here.

Really curious to see what you guys think would be the best way to approach this!