Faster counting of sequences

I have a problem in which I am trying to figure out how many items were first selected by user i and later by user j for all users.

My idea was to create a sparse matrix M with n-users \times n-users in which the rows selected first and columns second s.t. M_{i,j} shows the number of items that were first selected by user i and later by user j (the diagonal should accumulate the total number of items selected by a user).

My table looks like this:

User Item time
“abc” “ijk” 2020-01-01
“def” “ijk” 2020-01-02

\vdots

Which should result in matrix M

\begin{bmatrix} 1\quad 1 \\ 0\quad 1 \end{bmatrix}

I group a time-sorted DataFrame by item and then process each item separately (This allows for associative reduction). Since GroupedDataFrame does not work directly with Floops I am using the indices. Is that a problem? Any suggestions for making this faster?
Here is a minimal example of my approach:

using Random, StatsBase, Dates, DataFrames, SparseArrays, Tables, FLoops

Random.seed!(1)

items = [randstring(10) for _ in 1:300]
users = [randstring(5) for _ in 1:5000]

item_selection = sample(items, 20000, replace = true)
user_selection = sample(users, 20000, replace = true)
dates = sample(Date("2015-01-01"):Day(1):Date("2021-01-20"), 20000, replace = true)

data = DataFrame(items = item_selection, users = user_selection, dates = dates)
sort!(data, :dates)
# only look at first selection
data = combine(groupby(data, [:items, :users]), first)

uniqueusers = unique(data.users)
user_index = Dict(uniqueusers .=> eachindex(uniqueusers))
nusers = length(uniqueusers)

gitems = groupby(data, :items)

M = spzeros(nusers, nusers)

function processitem(itemdata, userdict)
    nusers = length(userdict)
    M_tmp = spzeros(Int, nusers, nusers)
    dates = itemdata.dates
    users = itemdata.users
    for item in Tables.namedtupleiterator(itemdata)
        startindex = findfirst(>(item.dates), dates)
        if !isnothing(startindex)
            later_users = users[startindex:end]
            current_index = userdict[item.users]
            later_indices = [userdict[user] for user in later_users]
            M_tmp[current_index, later_indices] .+= 1
            M_tmp[current_index, current_index] += 1
        end
    end
    return M_tmp
end

@time @floop ThreadedEx() for i in 1:length(gitems)
    item = gitems[i]
    @reduce(Mfloop += processitem(item, user_index))
end;

@time Mmapfoldl = mapfoldl(i -> processitem(i, user_index), +, gitems);

Mfloop == Mmapfoldl

can’t understand it

Thanks for the reply, sorry for being unclear. Basically I want to determine which users always follow another user in their selection. The goal is to have for a given user (user1)

  1. The number of unique items a user selected
  2. For all other users the number of items they have selected that were previously selected by user1.
    So for example if there are apples, bananas and chocolate and I get an apple today I’d like to add 1 to my number of selections. If you get an apple and chocolate tomorrow I’d like to add 2 to your number of selections and add 1 to the number of items I have selected before you. If I then select chocolate on yet another day I’d like to add 1 to my number and 1 to the number of items you have selected before me.

still very confusing to be honest. Not sure how that description relates to the matrix you have shown.

Sorry I’m having a hard time explaining the problem. So in the data above we have two users and one item. The matrix is my current approach to represent the solution. Since both users pick the item the diagonal is 1 for both. Since the first user picked the item before the second user the element 1,2 is 1. If I do this over all items and sum the matrices I get a diagonal with the number of chosen items and off diagonal of how many items were first chosen by the user mapped to the row and later the user mapped to the column.

There is a reason for using a Matrix here? It needs to be a Matrix to work with Floops?

The main reason is that I am not well versed in data structures but know matrices well. I guess floops can do any reduction. Do you have any suggestions for a better structure?

Did you try to work over the DataFrame itself?

For the number of unique items, I would do:

julia> df = DataFrame(:user => ["abc", "def"], :item => ["ijk", "ijk"], :time => Date.(["2021-01-01", "2021-01-02"]))
2×3 DataFrame
 Row │ user    item    time       
     │ String  String  Date       
─────┼────────────────────────────
   1 │ abc     ijk     2021-01-01
   2 │ def     ijk     2021-01-02

julia> combine(groupby(df, :user), :item => length ∘ unique! ∘ sort  => :qt_unique_items)
2×2 DataFrame
 Row │ user    qt_unique_items 
     │ String  Int64           
─────┼─────────────────────────
   1 │ abc                   1
   2 │ def                   1

For the second value I need more info. If some user buys the same item in two different moments, which time does count for checking if the item was previously selected by the primary user? It only counts if the first time the item was bought by a secondary user happened before the first time of the primary user?

2 Likes

Yes for the own counts that works. As for the second value I am only looking at the very first purchase of each user. I first sort by time and the group by user and item and select the first row.

I have written the code below, but I have reached the conclusion I yet do not understand your problem well. What I have written computes the sum of how many other people have selected each unique item before you selected it. Now I am in doubt, do you wanna a single value per user, or do you wanna a single value for pair of users? Because in your example you say:

So it seems like you are keeping an score for each pair of users, what would kind of explain your idea of a matrix. Because m[1, 2] would give how many items user 1 selected before user 2, and m[2, 1] would give how many items user 2 selected before user 1.

function compute_qt_selected_before(df)
    unique_users = unique!(sort(df[!, :user]))
    qt_selected_before = Dict{String, Int}(unique_users .=> 0)
    df_unique = combine(groupby(df, [:user, :item]), :time => first)
    sort!(df_unique, [:item, :time_first])
    c = 0
    for (row_idx, item) in pairs(df_unique[!, :item])
        item != last_item && (c = 0)
        qt_selected_before[df_unique[row_idx, :user]] += c
        c += 1
    end
    return qt_selected_before
end
1 Like

Yes exactly! Thank you for your suggestion! I went a bit overboard but this is the final implementation

using LinearAlgebra, Random, StatsBase, Dates, DataFrames, Tables, Chain, ProgressMeter

Random.seed!(1)
items = [randstring(10) for _ in 1:30]
users = [randstring(5) for _ in 1:50]

item_selection = sample(items, 100, replace = true)
user_selection = sample(users, 100, replace = true)
dates = sample(Date("2020-10-15"):Day(1):Date("2021-01-20"), 100, replace = true)

data = DataFrame(item = item_selection, user = user_selection, date = dates)

function proc(fulldata)
    fulldata = @chain fulldata begin
        sort!(:date)
        groupby([:item, :user])
        combine(first)
        sort!([:item, :date])
    end
    uuser = unique(fulldata.user)
    nuser = length(uuser)
    user_index = Dict(uuser .=> eachindex(uuser))
    transform!(fulldata, :user => ByRow(x-> user_index[x]) => :user_id)
    
    M = zeros(Int, nuser, nuser)
    indexer = Vector{Int}(undef, nuser)
    prev_date = Date(1971,1,1)
    prev_item = "__NOT_AN_ITEM__"
    offset = 0
    tab = Tables.rows((user_id = fulldata.user_id, item = fulldata.item, date = fulldata.date))
    M = procinner(tab, indexer, M, prev_date, prev_item, offset)
    return M, fulldata
end

function procinner(table, indexer, M, prev_date, prev_item, offset) 
    c = 0; daystartsat = 1
    @showprogress for r in table
        ruser_id = r.user_id
        ritem = r.item
        rdates = r.date
        # Accumulate number of previous listers
        same_item = (prev_item == ritem)
        same_item || (c = 0; daystartsat = 1)
        c += 1
        # Offset for same date listings (should not count as previous)
        same_date = (prev_date == rdates)
        offset = ifelse(same_item && same_date, offset + 1, 0)
        offset == 1 && (daystartsat = c-1)
        @inbounds indexer[c] = ruser_id
        for u in @inbounds(view(indexer, 1:daystartsat-1))
            @inbounds M[u, ruser_id] += 1
        end
        for u in @inbounds(view(indexer, daystartsat+offset:c))
            @inbounds M[u, ruser_id] += 1
        end
        prev_item = ritem
        prev_date = rdates
    end
    return M
end
M, data_id = proc(data)

And a version that is equivalent but uses a more DataFrame based approach:

function getrow(data, user_id)
    user = subset(data, :user_id => ByRow(==(user_id)))
    rout = []
    for comb in zip(user.item, user.date)
        rows = subset(data_id, :item => ByRow(==(comb[1])), :date => ByRow(>(comb[2])))
        push!(rout, rows)
    end
    out = combine(groupby(reduce(vcat, rout),:user_id), x->nrow(x))
    return rename(out, :x1 => :n)
end

function makeM(data_id)
    Mdiag = sort(combine(groupby(data_id, :user_id), :item => length), :user_id).item_length
    Mmanual = zeros(Int, length(Mdiag), length(Mdiag))
    Mmanual[diagind(Mmanual)] = Mdiag
    for i in 1:size(Mmanual,1)
        other = getrow(data_id, i)
        Mmanual[i, other.user_id] += other.n
    end
    return Mmanual
end

makeM(data_id) == M
1 Like