I have a problem in which I am trying to figure out how many items were first selected by user i and later by user j for all users.
My idea was to create a sparse matrix M with n-users \times n-users in which the rows selected first and columns second s.t. M_{i,j} shows the number of items that were first selected by user i and later by user j (the diagonal should accumulate the total number of items selected by a user).
My table looks like this:
User | Item | time |
---|---|---|
“abc” | “ijk” | 2020-01-01 |
“def” | “ijk” | 2020-01-02 |
\vdots |
Which should result in matrix M
I group a time-sorted DataFrame
by item and then process each item separately (This allows for associative reduction). Since GroupedDataFrame
does not work directly with Floops
I am using the indices. Is that a problem? Any suggestions for making this faster?
Here is a minimal example of my approach:
using Random, StatsBase, Dates, DataFrames, SparseArrays, Tables, FLoops
Random.seed!(1)
items = [randstring(10) for _ in 1:300]
users = [randstring(5) for _ in 1:5000]
item_selection = sample(items, 20000, replace = true)
user_selection = sample(users, 20000, replace = true)
dates = sample(Date("2015-01-01"):Day(1):Date("2021-01-20"), 20000, replace = true)
data = DataFrame(items = item_selection, users = user_selection, dates = dates)
sort!(data, :dates)
# only look at first selection
data = combine(groupby(data, [:items, :users]), first)
uniqueusers = unique(data.users)
user_index = Dict(uniqueusers .=> eachindex(uniqueusers))
nusers = length(uniqueusers)
gitems = groupby(data, :items)
M = spzeros(nusers, nusers)
function processitem(itemdata, userdict)
nusers = length(userdict)
M_tmp = spzeros(Int, nusers, nusers)
dates = itemdata.dates
users = itemdata.users
for item in Tables.namedtupleiterator(itemdata)
startindex = findfirst(>(item.dates), dates)
if !isnothing(startindex)
later_users = users[startindex:end]
current_index = userdict[item.users]
later_indices = [userdict[user] for user in later_users]
M_tmp[current_index, later_indices] .+= 1
M_tmp[current_index, current_index] += 1
end
end
return M_tmp
end
@time @floop ThreadedEx() for i in 1:length(gitems)
item = gitems[i]
@reduce(Mfloop += processitem(item, user_index))
end;
@time Mmapfoldl = mapfoldl(i -> processitem(i, user_index), +, gitems);
Mfloop == Mmapfoldl