Skip to content

Setup GCP BigQuery Integration Tests for Analysis Modules #244

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env_sample
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
GCP_PROJECT_ID =
63 changes: 63 additions & 0 deletions .github/workflows/bigquery-integration.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
name: BigQuery Integration Tests

on:
workflow_dispatch:
inputs:
test_suite:
type: choice
description: Test Suite to Run
default: "all"
options:
- all
- cohort_analysis
- composite_rank
- cross_shop
- customer_decision_hierarchy
- haversine
- hml_segmentation
- product_association
- revenue_tree
- rfm_segmentation
- segstats_segmentation
- threshold_segmentation

permissions:
contents: read

concurrency:
group: "bigquery-tests"
cancel-in-progress: true

jobs:
integration-tests:
name: Run BigQuery Integration Tests
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4

- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: "3.11"

- name: Install uv Package
run: |
pip install --upgrade pip
pip install uv==0.5.30

- name: Install Dependencies
run: |
uv sync

- name: Set up GCP Authentication
uses: google-github-actions/auth@v2
with:
credentials_json: ${{ secrets.GCP_SA_KEY }}

- name: Run Integration Tests
env:
TEST_SUITE: ${{ inputs.test_suite }}
run: |
uv run pytest tests/integration/bigquery -v \
$(if [ "$TEST_SUITE" != "all" ]; then echo "-k $TEST_SUITE"; fi)
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ repos:
hooks:
- id: pytest
name: pytest
entry: uv run pytest --cov=pyretailscience --cov-report=xml --cov-branch tests
language: system
entry: uv run pytest --cov=pyretailscience --cov-report=xml --cov-branch tests --ignore=tests/integration
types: [python]
pass_filenames: false
always_run: true
Expand Down
66 changes: 66 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
<!-- README.md -->
![PyRetailScience Logo](https://raw.githubusercontent.com/Data-Simply/pyretailscience/main/readme_assets/logo.png)

# PyRetailScience
Expand Down Expand Up @@ -208,3 +209,68 @@ Built with expertise doing analytics and data science for scale-ups to multi-nat
## License

This project is licensed under the Elastic License 2.0 - see the [LICENSE](LICENSE) file for details.

# BigQuery Integration Tests

## Overview

This directory contains integration tests that verify all PyRetailScience analysis modules
work correctly with Google BigQuery as a backend. These tests confirm that the Ibis-based
code paths function correctly when connected to BigQuery.

## Test Coverage

The integration tests cover the analysis modules.

## Prerequisites

To run these tests, you need:

1. Access to a Google Cloud Platform account
2. A service account with BigQuery permissions
3. The service account key JSON file
4. The test dataset must be loaded in BigQuery (dataset: `test_data`, table: `transactions`)

## Running the Tests

### Manual Setup

- Set up authentication:

```bash
export GOOGLE_APPLICATION_CREDENTIALS=/path/to/your/service-account-key.json
export GCP_PROJECT_ID=your-project-id
```

- Install dependencies:

```bash
uv sync
```

- Run the tests:

```bash
# Run all tests
uv run pytest tests/integration/bigquery -v

# Run specific test module
uv run pytest tests/integration/bigquery/test_cohort_analysis.py -v
```

## Using GitHub Actions

These tests can be run manually in GitHub Actions via the "BigQuery Integration Tests" workflow. To run:

1. Go to the "Actions" tab in the GitHub repository
2. Select the "BigQuery Integration Tests" workflow
3. Click "Run workflow"
4. Optionally enter a test filter pattern (e.g., "test_cohort_analysis")
5. Click "Run workflow"

### Required Secrets

To run the workflow in GitHub Actions, add these secrets to your repository:

- `GCP_SA_KEY`: The entire JSON content of your GCP service account key file
- `GCP_PROJECT_ID`: Your GCP project ID
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ name = "Murray Vanwyk"
[dependency-groups]
dev = [
"freezegun>=1.5.1,<2",
"ibis-framework[bigquery]>=10.0.0,<11",
"nbstripout>=0.7.1,<0.8",
"pre-commit>=3.6.2,<4",
"pytest-cov>=4.1.0,<5",
"pytest-mock>=3.14.0,<4",
"pytest>=8.0.0,<9",
"python-dotenv>=1.0.0,<2",
"ruff>=0.9,<0.10",
"tomlkit>=0.12,<1",
]
Expand Down
4 changes: 3 additions & 1 deletion pyretailscience/segmentation/rfm.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ def _compute_rfm(self, df: ibis.Table, current_date: datetime.date) -> ibis.Tabl
current_date_expr = ibis.literal(current_date)

customer_metrics = df.group_by(cols.customer_id).aggregate(
recency_days=(current_date_expr - df[cols.transaction_date].max().cast("date")).cast("int32"),
recency_days=current_date_expr.delta(df[cols.transaction_date].max().cast("date"), unit="day").cast(
"int32",
),
frequency=df[cols.transaction_id].nunique(),
monetary=df[cols.unit_spend].sum(),
)
Expand Down
9 changes: 2 additions & 7 deletions pyretailscience/segmentation/threshold.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,9 @@ def __init__(
window = ibis.window(order_by=ibis.asc(df[value_col]))
df = df.mutate(ptile=ibis.percent_rank().over(window))

case = ibis.case()
case_args = [(df["ptile"] <= quantile, segment) for quantile, segment in zip(thresholds, segments, strict=True)]

for quantile, segment in zip(thresholds, segments, strict=True):
case = case.when(df["ptile"] <= quantile, segment)

case = case.end()

df = df.mutate(segment_name=case).drop(["ptile"])
df = df.mutate(segment_name=ibis.cases(*case_args)).drop(["ptile"])

if zero_value_customers == "separate_segment":
df = ibis.union(df, zero_df)
Expand Down
33 changes: 33 additions & 0 deletions tests/integration/bigquery/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""BigQuery integration test fixtures."""

import os

import ibis
import pytest
from dotenv import load_dotenv
from google.cloud import bigquery
from loguru import logger

load_dotenv()
client = bigquery.Client(project="pyretailscience-infra")


@pytest.fixture(scope="session")
def bigquery_connection():
"""Connect to BigQuery for integration tests."""
try:
conn = ibis.bigquery.connect(
project_id=os.environ.get("GCP_PROJECT_ID"),
)
logger.info("Connected to BigQuery")
except Exception as e:
logger.error(f"Failed to connect to BigQuery: {e}")
raise
else:
return conn


@pytest.fixture(scope="session")
def transactions_table(bigquery_connection):
"""Get the transactions table for testing."""
return bigquery_connection.table("test_data.transactions")
20 changes: 20 additions & 0 deletions tests/integration/bigquery/test_cohort_analysis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
"""Integration tests for Cohort Analysis with BigQuery."""

from pyretailscience.analysis.cohort import CohortAnalysis


def test_cohort_analysis_with_bigquery(transactions_table):
"""Integration test for CohortAnalysis using BigQuery backend and Ibis table.

This test ensures that the CohortAnalysis class initializes and executes successfully
using BigQuery data with various combinations of aggregation parameters.
"""
limited_table = transactions_table.limit(5000)

CohortAnalysis(
df=limited_table,
aggregation_column="unit_spend",
agg_func="sum",
period="week",
percentage=True,
)
23 changes: 23 additions & 0 deletions tests/integration/bigquery/test_composite_rank.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""Integration tests for Composite Rank Analysis with BigQuery."""

import pytest

from pyretailscience.analysis.composite_rank import CompositeRank


@pytest.mark.parametrize("ignore_ties", [False, True])
def test_tie_handling(transactions_table, ignore_ties):
"""Test handling of ties during rank calculation."""
rank_cols = [
("unit_spend", "desc"),
("customer_id", "desc"),
]
result = CompositeRank(
df=transactions_table,
rank_cols=rank_cols,
agg_func="mean",
ignore_ties=ignore_ties,
)
assert result is not None
executed_result = result.df
assert executed_result is not None
51 changes: 51 additions & 0 deletions tests/integration/bigquery/test_cross_shop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""Integration tests for Cross Shop Analysis with BigQuery."""

import pytest

from pyretailscience.analysis.cross_shop import CrossShop


@pytest.mark.parametrize(
"group_3_col",
[
"category_1_name",
None,
],
)
def test_cross_shop_with_bigquery(transactions_table, group_3_col):
"""Test CrossShop with data fetched from BigQuery.

This parameterized test verifies that CrossShop can be initialized
and run with data from BigQuery using different combinations of group columns,
value columns, and aggregation functions without throwing exceptions.
"""
transactions_df = transactions_table.limit(5000)
group_1_col = "brand_name"
group_2_col = "category_0_name"
group_1_vals = transactions_df[group_1_col].execute().dropna().unique()
group_2_vals = transactions_df[group_2_col].execute().dropna().unique()

group_1_val = group_1_vals[0]
group_2_val = group_2_vals[0]

group_3_val = None
if group_3_col is not None:
group_3_vals = transactions_df[group_3_col].execute().dropna().unique()
if len(group_3_vals) == 0:
pytest.skip(f"Not enough unique values for {group_3_col}")
group_3_val = group_3_vals[0]

labels = ["Group 1", "Group 2"] if group_3_col is None else ["Group 1", "Group 2", "Group 3"]

CrossShop(
df=transactions_table,
group_1_col=group_1_col,
group_1_val=group_1_val,
group_2_col=group_2_col,
group_2_val=group_2_val,
group_3_col=group_3_col,
group_3_val=group_3_val,
labels=labels,
value_col="unit_quantity",
agg_func="count",
)
35 changes: 35 additions & 0 deletions tests/integration/bigquery/test_customer_decision_hierarchy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""Integration tests for Customer Decision Hierarchy Analysis with BigQuery."""

import pytest

from pyretailscience.analysis.customer_decision_hierarchy import CustomerDecisionHierarchy


@pytest.mark.parametrize(
("method", "exclude_same_transaction"),
[
("truncated_svd", False),
("truncated_svd", None),
("yules_q", False),
("yules_q", None),
],
)
def test_customer_decision_hierarchy_with_bigquery(
transactions_table,
method,
exclude_same_transaction,
):
"""Test CustomerDecisionHierarchy with data fetched from BigQuery.

This parameterized test verifies that CustomerDecisionHierarchy can be initialized
and run with data from BigQuery using different combinations of product columns
and methods without throwing exceptions.
"""
transactions_df = transactions_table.limit(5000).execute()

CustomerDecisionHierarchy(
df=transactions_df,
product_col="product_name",
exclude_same_transaction_products=exclude_same_transaction,
method=method,
)
24 changes: 24 additions & 0 deletions tests/integration/bigquery/test_date.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""Tests for the date utility functions with BigQuery integration."""

from datetime import UTC, datetime

from pyretailscience.utils.date import filter_and_label_by_periods


def test_filter_and_label_by_periods_with_bigquery(transactions_table):
"""Test filter_and_label_by_periods with data using Ibis.

This test verifies that filter_and_label_by_periods can process data
through an Ibis without throwing exceptions.
"""
limited_table = transactions_table.limit(1000)
period_ranges = {
"Q1": (datetime(2023, 1, 1, tzinfo=UTC), datetime(2023, 3, 31, tzinfo=UTC)),
"Q2": (datetime(2023, 4, 1, tzinfo=UTC), datetime(2023, 6, 30, tzinfo=UTC)),
}
result = filter_and_label_by_periods(limited_table, period_ranges)

assert result is not None

df = result.execute()
assert df is not None
Loading