I am new to multithreading, I have the following snippet of code:
memberships = table_read(db, "IndividualMemberships")
sort!(memberships, [:IndividualId, :HouseholdId, :StartDate])
gdf_memberships = groupby(memberships, [:IndividualId, :HouseholdId]; sort=false)
df = DataFrame(IndividualId=Int32[], HouseholdId=Int32[], HHRelationshipTypeId=Int16[], DayDate=Date[], StartType=Int16[], EndType=Int16[], Episode=Int16[])
task = nothing
for key in keys(gdf_memberships)
e = gdf_memberships[key]
# decompose membership episodes into days
d = process_membershipdays(e.IndividualId, e.HouseholdId, e.HHRelationshipTypeId, e.StartDate, e.EndDate, e.StartType, e.EndType, e.Episode)
append!(df, d)
if nrow(df) > 10_000_000
if !isnothing(task)
wait(task)
end
task = @spawn(insertdataframe(df, db, "MembershipDays"))
df = empty(df)
end
end
if !isnothing(task)
wait(task)
end
if nrow(df) > 0
insertdataframe(df, db, "MembershipDays")
end
Am I using @spawn correctly? I am concerned about whether, setting df to an empty data frame will impact the data frame I have submitted to the spawned task. I have reason to believe that on occasion rows are not saved to the database.
Secondly, am I using wait correctly?
Finally, is there a better (more performant) way of doing this?