Processing Rows with multiple threads

I find myself having to process a lot of data recently where a) the processing of two different rows is independent and b) processing each row is expensive. I find myself coding a version of the following every time:

using DataFrames
using Base.Threads: @spawn
myrows = rand(20)
function processmyrow(row) # => some expensive function
    sleep(5)
    return DataFrame(r = row)
end
outdfs = [DataFrame() for _ in 1:Threads.nthreads()]
myparts = Iterators.partition(myrows, cld(length(myrows), Threads.nthreads()))


@sync for part in myparts
    @spawn begin
        for row in part
            append!(outdfs[Threads.threadid()], processmyrow(row))
        end
    end
end

Is there a better way of doing this? I have started looking into a couple of parallelism exploiting packages recently its quite hard for me to comprehend. Could someone point me in the right direction here?

Edit: I do not care about the order of the result as I will usually just sort it afterwards.

Thanks!

1 Like

Instead of the above, you can do the following.

using BangBang, Transducers
foldxt(append!!, Map(processmyrow), myrow))

The doc for Transducers has a good parallelism tutorial

1 Like

Thank you that looks great!

I also found

tcollect(Map(processmyrow), myrows))

in case the rows cannot easily be appended (e.g. not all rows return the same collumns)

I think you pasted the link twice its: parallelism tutorial

If the function on a row takes sufficently long time (e.g. 1 ms or more), you should be fine to spawn a task per row and construct a DataFrame from the result:

df = DataFrame(a=1:10, b=11:20)
processmyrow(row) = (a=row.a, b=row.b, c=row.a^2 + row.b)
outdf = DataFrame(fetch.([Threads.@spawn processmyrow(row) for row in eachrow(df)]))
1 Like

Bah! That’s what I get for typing with a baby in one arm…

1 Like

Transducers has changed my life! I already added it to 2 projects I’m working on.