|
| 1 | +"""Module for analyzing customer purchase paths from transaction data. |
| 2 | +
|
| 3 | +This module defines the `purchase_path_analysis` function that tracks |
| 4 | +customer journeys through product categories over time. |
| 5 | +""" |
| 6 | + |
| 7 | +import ibis |
| 8 | +import pandas as pd |
| 9 | + |
| 10 | +from pyretailscience.options import ColumnHelper |
| 11 | + |
| 12 | + |
| 13 | +def _build_category_group_df( |
| 14 | + first_df: pd.DataFrame, |
| 15 | + category_column: str, |
| 16 | + sort_by_metric: bool, |
| 17 | + multi_category_handling: str, |
| 18 | +) -> pd.DataFrame: |
| 19 | + """Creates a DataFrame mapping customers to concatenated or individual categories.""" |
| 20 | + if multi_category_handling == "concatenate": |
| 21 | + sort_cols = ["customer_id", "first_basket_number"] |
| 22 | + if sort_by_metric: |
| 23 | + sort_cols.append("metric_value") |
| 24 | + ascending = [True, True, False] |
| 25 | + else: |
| 26 | + sort_cols.append(category_column) |
| 27 | + ascending = [True, True, True] |
| 28 | + |
| 29 | + return ( |
| 30 | + first_df.sort_values(sort_cols, ascending=ascending) |
| 31 | + .groupby(["customer_id", "first_basket_number"])[category_column] |
| 32 | + .apply(lambda x: ",".join(x)) |
| 33 | + .reset_index() |
| 34 | + .rename(columns={category_column: "categories"}) |
| 35 | + ) |
| 36 | + return first_df[["customer_id", "first_basket_number", category_column]].rename( |
| 37 | + columns={category_column: "categories"}, |
| 38 | + ) |
| 39 | + |
| 40 | + |
| 41 | +def _build_paths_df(category_groups_df: pd.DataFrame) -> pd.DataFrame: |
| 42 | + """Constructs a pivoted DataFrame representing customer purchase paths.""" |
| 43 | + actual_baskets = sorted(category_groups_df["first_basket_number"].unique()) if not category_groups_df.empty else [] |
| 44 | + paths_df = category_groups_df.pivot_table( |
| 45 | + index="customer_id", |
| 46 | + columns="first_basket_number", |
| 47 | + values="categories", |
| 48 | + aggfunc="first", |
| 49 | + ).reset_index() |
| 50 | + |
| 51 | + column_mapping = {"customer_id": "customer_id"} |
| 52 | + for i, basket_num in enumerate(sorted(actual_baskets), 1): |
| 53 | + if basket_num in paths_df.columns: |
| 54 | + column_mapping[basket_num] = f"basket_{i}" |
| 55 | + return paths_df.rename(columns=column_mapping).fillna("") |
| 56 | + |
| 57 | + |
| 58 | +def purchase_path_analysis( |
| 59 | + transactions_df: pd.DataFrame, |
| 60 | + category_column: str = "product_category", |
| 61 | + min_transactions: int = 3, |
| 62 | + min_basket_size: int = 2, |
| 63 | + min_basket_value: float = 10.0, |
| 64 | + max_depth: int = 10, |
| 65 | + min_customers: int = 5, |
| 66 | + exclude_negative_revenue: bool = True, |
| 67 | + multi_category_handling: str = "concatenate", |
| 68 | + sort_by: str = "alphabetical", |
| 69 | + aggregation_column: str | None = None, |
| 70 | + aggregation_function: str = "sum", |
| 71 | +) -> pd.DataFrame: |
| 72 | + """Analyzes customer purchase paths through product categories over time.""" |
| 73 | + cols = ColumnHelper() |
| 74 | + required_cols = [cols.customer_id, cols.transaction_id, cols.transaction_date, category_column] |
| 75 | + missing_cols = set(required_cols) - set(transactions_df.columns) |
| 76 | + if missing_cols: |
| 77 | + msg = f"The following columns are required but missing: {missing_cols}" |
| 78 | + raise ValueError(msg) |
| 79 | + |
| 80 | + transactions_table = ( |
| 81 | + ibis.memtable(transactions_df) if isinstance(transactions_df, pd.DataFrame) else transactions_df |
| 82 | + ) |
| 83 | + if exclude_negative_revenue: |
| 84 | + transactions_table = transactions_table.filter(transactions_table.revenue > 0) |
| 85 | + |
| 86 | + customer_baskets = ( |
| 87 | + transactions_table.group_by(["customer_id", "transaction_id", "transaction_date"]) |
| 88 | + .aggregate( |
| 89 | + item_count=ibis._.product_id.nunique(), |
| 90 | + basket_value=ibis._.revenue.sum(), |
| 91 | + ) |
| 92 | + .filter( |
| 93 | + (ibis._.item_count >= min_basket_size) & (ibis._.basket_value >= min_basket_value), |
| 94 | + ) |
| 95 | + .mutate( |
| 96 | + basket_number=ibis.row_number().over( |
| 97 | + ibis.window(group_by="customer_id", order_by="transaction_date"), |
| 98 | + ), |
| 99 | + ) |
| 100 | + .filter(ibis._.basket_number <= max_depth) |
| 101 | + ) |
| 102 | + eligible_customers = ( |
| 103 | + customer_baskets.group_by("customer_id") |
| 104 | + .aggregate(transaction_count=ibis._.basket_number.count()) |
| 105 | + .filter(ibis._.transaction_count >= min_transactions) |
| 106 | + .select("customer_id") |
| 107 | + ) |
| 108 | + |
| 109 | + transactions_with_baskets = transactions_table.inner_join( |
| 110 | + customer_baskets.inner_join(eligible_customers, "customer_id").select( |
| 111 | + ["customer_id", "transaction_id", "basket_number"], |
| 112 | + ), |
| 113 | + ["customer_id", "transaction_id"], |
| 114 | + ) |
| 115 | + |
| 116 | + use_agg_sort = ( |
| 117 | + multi_category_handling == "concatenate" |
| 118 | + and sort_by == "aggregation" |
| 119 | + and aggregation_column |
| 120 | + and aggregation_function |
| 121 | + ) |
| 122 | + |
| 123 | + if use_agg_sort: |
| 124 | + agg_func = getattr( |
| 125 | + transactions_with_baskets[aggregation_column], |
| 126 | + {"sum": "sum", "max": "max", "min": "min", "avg": "mean"}[aggregation_function], |
| 127 | + ) |
| 128 | + first_df = transactions_with_baskets.group_by(["customer_id", category_column]).aggregate( |
| 129 | + first_basket_number=ibis._.basket_number.min(), |
| 130 | + metric_value=agg_func(), |
| 131 | + ) |
| 132 | + else: |
| 133 | + first_df = transactions_with_baskets.group_by(["customer_id", category_column]).aggregate( |
| 134 | + first_basket_number=ibis._.basket_number.min(), |
| 135 | + ) |
| 136 | + first_df = first_df.execute() |
| 137 | + |
| 138 | + if first_df.empty: |
| 139 | + return pd.DataFrame(columns=["customer_count", "transition_probability"]) |
| 140 | + |
| 141 | + category_groups_df = _build_category_group_df(first_df, category_column, use_agg_sort, multi_category_handling) |
| 142 | + paths_df = _build_paths_df(category_groups_df) |
| 143 | + |
| 144 | + basket_cols = sorted( |
| 145 | + [col for col in paths_df.columns if col.startswith("basket_")], |
| 146 | + key=lambda x: int(x.split("_")[1]), |
| 147 | + ) |
| 148 | + paths_df = paths_df[paths_df[basket_cols].ne("").any(axis=1)] |
| 149 | + |
| 150 | + if paths_df.empty: |
| 151 | + return pd.DataFrame(columns=["customer_count", "transition_probability"]) |
| 152 | + |
| 153 | + pattern_counts = paths_df.groupby(basket_cols).size().reset_index(name="customer_count") |
| 154 | + pattern_counts = pattern_counts[pattern_counts.customer_count >= min_customers] |
| 155 | + |
| 156 | + if not pattern_counts.empty: |
| 157 | + total_customers = pattern_counts.customer_count.sum() |
| 158 | + pattern_counts["transition_probability"] = (pattern_counts.customer_count / total_customers).round(3) |
| 159 | + return pattern_counts.sort_values("customer_count", ascending=False).reset_index(drop=True) |
| 160 | + |
| 161 | + return pd.DataFrame(columns=[*basket_cols, "customer_count", "transition_probability"]) |
0 commit comments