I find myself having to process a lot of data recently where a) the processing of two different rows is independent and b) processing each row is expensive. I find myself coding a version of the following every time:
using DataFrames
using Base.Threads: @spawn
myrows = rand(20)
function processmyrow(row) # => some expensive function
sleep(5)
return DataFrame(r = row)
end
outdfs = [DataFrame() for _ in 1:Threads.nthreads()]
myparts = Iterators.partition(myrows, cld(length(myrows), Threads.nthreads()))
@sync for part in myparts
@spawn begin
for row in part
append!(outdfs[Threads.threadid()], processmyrow(row))
end
end
end
Is there a better way of doing this? I have started looking into a couple of parallelism exploiting packages recently its quite hard for me to comprehend. Could someone point me in the right direction here?
Edit: I do not care about the order of the result as I will usually just sort it afterwards.
If the function on a row takes sufficently long time (e.g. 1 ms or more), you should be fine to spawn a task per row and construct a DataFrame from the result:
df = DataFrame(a=1:10, b=11:20)
processmyrow(row) = (a=row.a, b=row.b, c=row.a^2 + row.b)
outdf = DataFrame(fetch.([Threads.@spawn processmyrow(row) for row in eachrow(df)]))