diff --git a/pyretailscience/analysis/product_association.py b/pyretailscience/analysis/product_association.py index 47dc8754..c49df6bf 100644 --- a/pyretailscience/analysis/product_association.py +++ b/pyretailscience/analysis/product_association.py @@ -35,13 +35,8 @@ operations, and drive business growth. """ -from itertools import combinations -from typing import Literal - -import numpy as np +import ibis import pandas as pd -from scipy.sparse import csc_matrix -from tqdm import tqdm from pyretailscience.options import get_option @@ -83,19 +78,14 @@ class ProductAssociation: - support: The proportion of transactions containing both products. - confidence: The probability of buying product_2 given that product_1 was bought. - uplift: The ratio of the observed support to the expected support if the products were independent. - - The class uses efficient sparse matrix operations to handle large datasets and - calculates associations for either pairs (2) or triples (3) of products, depending - on the 'number_of_combinations' parameter in _calc_association. """ def __init__( self, - df: pd.DataFrame, + df: pd.DataFrame | ibis.Table, value_col: str, group_col: str = get_option("column.customer_id"), target_item: str | None = None, - number_of_combinations: Literal[2, 3] = 2, min_occurrences: int = 1, min_cooccurrences: int = 1, min_support: float = 0.0, @@ -106,14 +96,12 @@ def __init__( """Initialize the ProductAssociation object. Args: - df (pandas.DataFrame): The input DataFrame containing transaction data. + df (pd.DataFrame | ibis.Table) : The input DataFrame or ibis Table containing transaction data. value_col (str): The name of the column in the input DataFrame that contains the product identifiers. group_col (str, optional): The name of the column that identifies unique transactions or customers. Defaults to option column.unit_spend. target_item (str or None, optional): A specific product to focus the association analysis on. If None, associations for all products are calculated. Defaults to None. - number_of_combinations (int, optional): The number of products to consider in the association analysis. Can - be either 2 or 3. Defaults to 2. min_occurrences (int, optional): The minimum number of occurrences required for each product in the association analysis. Defaults to 1. Must be at least 1. min_cooccurrences (int, optional): The minimum number of co-occurrences required for the product pairs in @@ -143,7 +131,6 @@ def __init__( value_col=value_col, group_col=group_col, target_item=target_item, - number_of_combinations=number_of_combinations, min_occurrences=min_occurrences, min_cooccurrences=min_cooccurrences, min_support=min_support, @@ -153,18 +140,16 @@ def __init__( ) @staticmethod - def _calc_association( # noqa: C901 (ignore complexity) - Excluded due to min_* arguments checks - df: pd.DataFrame, + def _calc_association( + df: pd.DataFrame | ibis.Table, value_col: str, group_col: str = get_option("column.customer_id"), target_item: str | None = None, - number_of_combinations: Literal[2, 3] = 2, min_occurrences: int = 1, min_cooccurrences: int = 1, min_support: float = 0.0, min_confidence: float = 0.0, min_uplift: float = 0.0, - show_progress: bool = False, ) -> pd.DataFrame: """Calculate product association rules based on transaction data. @@ -172,14 +157,12 @@ def _calc_association( # noqa: C901 (ignore complexity) - Excluded due to min_* helping to identify patterns in customer purchasing behavior. Args: - df (pandas.DataFrame): The input DataFrame containing transaction data. + df (pd.DataFrame | ibis.Table) : The input DataFrame or ibis Table containing transaction data. value_col (str): The name of the column in the input DataFrame that contains the product identifiers. group_col (str, optional): The name of the column that identifies unique transactions or customers. Defaults to option column.unit_spend. target_item (str or None, optional): A specific product to focus the association analysis on. If None, associations for all products are calculated. Defaults to None. - number_of_combinations (int, optional): The number of products to consider in the association analysis. Can - be either 2 or 3. Defaults to 2. min_occurrences (int, optional): The minimum number of occurrences required for each product in the association analysis. Defaults to 1. Must be at least 1. min_cooccurrences (int, optional): The minimum number of co-occurrences required for the product pairs in @@ -208,13 +191,7 @@ def _calc_association( # noqa: C901 (ignore complexity) - Excluded due to min_* - support: The proportion of transactions containing both products. - confidence: The probability of buying product_2 given that product_1 was bought. - uplift: The ratio of the observed support to the expected support if the products were independent. - - The method uses efficient sparse matrix operations to handle large datasets and - calculates associations for either pairs (2) or triples (3) of products, depending - on the 'number_of_combinations' parameter. """ - if number_of_combinations not in [2, 3]: - raise ValueError("Number of combinations must be either 2 or 3.") if min_occurrences < 1: raise ValueError("Minimum occurrences must be at least 1.") if min_cooccurrences < 1: @@ -226,83 +203,125 @@ def _calc_association( # noqa: C901 (ignore complexity) - Excluded due to min_* if min_uplift < 0.0: raise ValueError("Minimum uplift must be greater or equal to 0.") - unique_combo_df = df[[group_col, value_col]].drop_duplicates() - unique_combo_df[value_col] = pd.Categorical(unique_combo_df[value_col], ordered=True) - unique_combo_df[group_col] = pd.Categorical(unique_combo_df[group_col], ordered=True) - - sparse_matrix = csc_matrix( - ( - [1] * len(unique_combo_df), - ( - unique_combo_df[group_col].cat.codes, - unique_combo_df[value_col].cat.codes, - ), - ), + if isinstance(df, pd.DataFrame): + df = ibis.memtable(df) + + unique_transactions = df.select(df[group_col], df[value_col]).distinct() + total_transactions = unique_transactions.alias("t")[group_col].nunique().name("total_count") + + product_occurrences = ( + unique_transactions.group_by(value_col) + .aggregate( + occurrences=lambda t: t[group_col].nunique(), + ) + .mutate(occurrence_probability=lambda t: t.occurrences / total_transactions) + .filter(lambda t: t.occurrences >= min_occurrences) ) - row_count = sparse_matrix.shape[0] + left_table = unique_transactions.rename({"item_1": value_col}) + right_table = unique_transactions.rename({"item_2": value_col}) + + join_logic = [left_table[group_col] == right_table[group_col]] + if target_item is None: + join_logic.append(left_table["item_1"] < right_table["item_2"]) + else: + join_logic.extend( + [ + left_table["item_1"] != right_table["item_2"], + left_table["item_1"] == target_item, + ], + ) + merged_df = left_table.join( + right_table, + predicates=join_logic, + lname="", + rname="{name}_right", + ) - results = [] + product_occurrences_1 = product_occurrences.rename( + {"item_1": value_col, "occurrences_1": "occurrences", "occurrence_probability_1": "occurrence_probability"}, + ) + product_occurrences_2 = product_occurrences.rename( + {"item_2": value_col, "occurrences_2": "occurrences", "occurrence_probability_2": "occurrence_probability"}, + ) - occurrences = np.array(sparse_matrix.sum(axis=0)).flatten() - occurence_prob = occurrences / row_count + merged_df = merged_df.join( + product_occurrences_1, + predicates=[merged_df["item_1"] == product_occurrences_1["item_1"]], + ) - base_items = [target_item] - if number_of_combinations == 2: # noqa: PLR2004 - if target_item is None: - base_items = unique_combo_df[value_col].cat.categories - items = [([unique_combo_df[value_col].cat.categories.get_loc(cat)], cat) for cat in base_items] - elif number_of_combinations == 3: # noqa: PLR2004 - if target_item is None: - base_items = sorted(combinations(unique_combo_df[value_col].cat.categories, 2)) - items = [ - ([unique_combo_df[value_col].cat.categories.get_loc(i) for i in cats], cats) for cats in base_items - ] + merged_df = merged_df.join( + product_occurrences_2, + predicates=[merged_df["item_2"] == product_occurrences_2["item_2"]], + ) - if show_progress: - items = tqdm(items) - - cols_mask = np.zeros(sparse_matrix.shape[1], dtype=bool) - - for target_item_loc, item_2 in items: - target_item_col_index = cols_mask.copy() - target_item_col_index[target_item_loc] = True - rows_with_target_item = sparse_matrix[:, target_item_col_index].getnnz(axis=1) == len(target_item_loc) - - cooccurrences = np.array(sparse_matrix[rows_with_target_item, :].sum(axis=0)).flatten() - if (cooccurrences == 0).all(): - continue - - rows_with_target_item_sum = rows_with_target_item.sum() - coocurrence_prob = cooccurrences / row_count - - target_prob = rows_with_target_item_sum / row_count - expected_prob = target_prob * occurence_prob - - # TODO: Try to avoid constructing a pandas Dataframe - pa_df = pd.DataFrame( - { - f"{value_col}_1": [item_2] * sparse_matrix.shape[1], - f"{value_col}_2": unique_combo_df[value_col].cat.categories.values, - "occurrences_1": rows_with_target_item_sum, - "occurrences_2": occurrences, - "cooccurrences": cooccurrences, - "support": coocurrence_prob, - "confidence": cooccurrences / rows_with_target_item_sum, - "uplift": coocurrence_prob / expected_prob, - }, - ) + cooccurrences = merged_df.group_by(["item_1", "item_2"]).aggregate(cooccurrences=merged_df[group_col].nunique()) + cooccurrences = cooccurrences.mutate( + support=cooccurrences.cooccurrences / total_transactions, + ) + cooccurrences = cooccurrences.filter( + (cooccurrences.cooccurrences >= min_cooccurrences) & (cooccurrences.support >= min_support), + ) - excl_pairs_idx = ( - target_item_col_index - | (pa_df["occurrences_1"] < min_occurrences) - | (pa_df["occurrences_2"] < min_occurrences) - | (pa_df["cooccurrences"] < min_cooccurrences) - | (pa_df["support"] < min_support) - | (pa_df["confidence"] < min_confidence) - | (pa_df["uplift"] < min_uplift) - ) + product_occurrences_1_rename = product_occurrences.rename( + {"item_1": value_col, "occurrences_1": "occurrences", "prob_1": "occurrence_probability"}, + ) + product_occurrences_2_rename = product_occurrences.rename( + {"item_2": value_col, "occurrences_2": "occurrences", "prob_2": "occurrence_probability"}, + ) - results.append(pa_df[~excl_pairs_idx]) + product_pairs = cooccurrences.join( + product_occurrences_1_rename, + predicates=[cooccurrences["item_1"] == product_occurrences_1_rename["item_1"]], + ) + product_pairs = product_pairs.join( + product_occurrences_2_rename, + predicates=[product_pairs["item_2"] == product_occurrences_2_rename["item_2"]], + ) + + product_pairs = product_pairs.mutate( + confidence=product_pairs["cooccurrences"] / product_pairs["occurrences_1"], + uplift=product_pairs["support"] / (product_pairs["prob_1"] * product_pairs["prob_2"]), + ) - return pd.concat(results).sort_values([f"{value_col}_1", f"{value_col}_2"]).reset_index(drop=True) + result = product_pairs.filter(product_pairs.uplift >= min_uplift) + + if target_item is None: + col_order = [ + "item_1", + "item_2", + "occurrences_1", + "occurrences_2", + "cooccurrences", + "support", + "confidence", + "uplift", + ] + inverse_pairs = result.mutate( + item_1=result["item_2"], + item_2=result["item_1"], + occurrences_1=result["occurrences_2"], + occurrences_2=result["occurrences_1"], + prob_1=result["prob_2"], + prob_2=result["prob_1"], + confidence=result["cooccurrences"] / result["occurrences_2"], + ) + result = result[col_order].union(inverse_pairs[col_order]) + + result = result.filter(result.confidence >= min_confidence) + + final_result = result.execute().sort_values(by=["item_1", "item_2"]).reset_index(drop=True) + final_result = final_result.rename(columns={"item_1": f"{value_col}_1", "item_2": f"{value_col}_2"}) + + return final_result[ + [ + f"{value_col}_1", + f"{value_col}_2", + "occurrences_1", + "occurrences_2", + "cooccurrences", + "support", + "confidence", + "uplift", + ] + ] diff --git a/tests/analysis/test_product_association.py b/tests/analysis/test_product_association.py index 187d246f..7c0f56dd 100644 --- a/tests/analysis/test_product_association.py +++ b/tests/analysis/test_product_association.py @@ -132,34 +132,6 @@ def test_calc_association_target_single_items(self, transactions_df, expected_re ), ) - def test_calc_association_all_pair_items(self, transactions_df, expected_results_pair_items_df): - """Test calculating association rules for a pairs of items versus another item for all items.""" - calc_df = ProductAssociation._calc_association( - df=transactions_df, - value_col="product", - group_col=cols.transaction_id, - number_of_combinations=3, - ) - - pd.testing.assert_frame_equal(calc_df, expected_results_pair_items_df) - - def test_calc_association_target_pair_items(self, transactions_df, expected_results_pair_items_df): - """Test calculating association rules for a target pairs of items versus another item.""" - calc_df = ProductAssociation._calc_association( - df=transactions_df, - value_col="product", - group_col=cols.transaction_id, - number_of_combinations=3, - target_item=("bread", "butter"), - ) - - pd.testing.assert_frame_equal( - calc_df, - expected_results_pair_items_df[ - expected_results_pair_items_df["product_1"] == ("bread", "butter") - ].reset_index(drop=True), - ) - def test_calc_association_min_occurrences(self, transactions_df, expected_results_single_items_df): """Test calculating association rules with a min occurrences level.""" min_occurrences = 2 @@ -251,23 +223,6 @@ def test_calc_association_min_uplift(self, transactions_df, expected_results_sin ), ) - def test_calc_association_invalid_number_of_combinations(self, transactions_df): - """Test calculating association rules with an invalid number of combinations.""" - with pytest.raises(ValueError, match="Number of combinations must be either 2 or 3."): - ProductAssociation._calc_association( - df=transactions_df, - value_col="product", - group_col=cols.transaction_id, - number_of_combinations=4, - ) - with pytest.raises(ValueError, match="Number of combinations must be either 2 or 3."): - ProductAssociation._calc_association( - df=transactions_df, - value_col="product", - group_col=cols.transaction_id, - number_of_combinations=1, - ) - def test_calc_association_invalid_min_occurrences(self, transactions_df): """Test calculating association rules with an invalid minimum occurrences value.""" with pytest.raises(ValueError, match="Minimum occurrences must be at least 1."):