Correct use of @spawn with data frames

I am new to multithreading, I have the following snippet of code:

memberships = table_read(db, "IndividualMemberships")
sort!(memberships, [:IndividualId, :HouseholdId, :StartDate])
gdf_memberships = groupby(memberships, [:IndividualId, :HouseholdId]; sort=false)
df = DataFrame(IndividualId=Int32[], HouseholdId=Int32[], HHRelationshipTypeId=Int16[], DayDate=Date[], StartType=Int16[], EndType=Int16[], Episode=Int16[])
task = nothing
for key in keys(gdf_memberships)
    e = gdf_memberships[key]
    # decompose membership episodes into days
    d = process_membershipdays(e.IndividualId, e.HouseholdId, e.HHRelationshipTypeId, e.StartDate, e.EndDate, e.StartType, e.EndType, e.Episode)
    append!(df, d)
    if nrow(df) > 10_000_000
        if !isnothing(task)
            wait(task)
        end
        task = @spawn(insertdataframe(df, db, "MembershipDays"))
        df = empty(df)
    end
end
if !isnothing(task)
    wait(task)
end
if nrow(df) > 0
    insertdataframe(df, db, "MembershipDays")
end

Am I using @spawn correctly? I am concerned about whether, setting df to an empty data frame will impact the data frame I have submitted to the spawned task. I have reason to believe that on occasion rows are not saved to the database.

Secondly, am I using wait correctly?

Finally, is there a better (more performant) way of doing this?

I believe this is going to produce issues, because task holds a reference to the df variable, which gets modified on the next line. So it’s possible that the df will be modified. The easiest way around this would be to copy the dataframe on insert:

insertdataframe(deepcopy(df), db, "MembershipDays")

In terms of a more performant method – I don’t know the exact constraints of your problem, but the easiest method would be to create one task per key:

using Threads
memberships = table_read(db, "IndividualMemberships")
sort!(memberships, [:IndividualId, :HouseholdId, :StartDate])
gdf_memberships = groupby(memberships, [:IndividualId, :HouseholdId]; sort=false)
@sync for (key, e) in gdf_memberships
    @spawn begin
        # decompose membership episodes into days
        d = process_membershipdays(e.IndividualId, e.HouseholdId, e.HHRelationshipTypeId, e.StartDate, e.EndDate, e.StartType, e.EndType, e.Episode)
        insertdataframe(DataFrame(d), db, "MembershipDays")
    end
end

Of course, the problem with this version is that we lose the aggregation and are opening a lot of db connections. If the memberships are roughly the same size, you could break it into chunks with Iterators.partition:

...
@sync for key_chunk in Iterators.partition(keys(gdf_memberships), 10)
    @spawn begin
    e = vcat([gdf_memberships[key] for key in key_chunk]...)
...

If you really need to know the number of rows and insert dataframes of roughly equivalent size, then your original approach + copying is probably best.

1 Like

Satvik,

Thank you for this, it is the guidance I was looking for. I don’t have time right now to test this, but as soon as I have done so I will come bank to mark this as the solution.