Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions dlt/destinations/impl/bigquery/bigquery_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def bigquery_adapter(
column.pop(PARTITION_HINT, None) # type: ignore[typeddict-item]

if isinstance(partition, str):
column_hints[partition] = {"name": partition, PARTITION_HINT: True} # type: ignore[typeddict-unknown-key]
column_hints.setdefault(partition, {"name": partition})[PARTITION_HINT] = True # type: ignore[typeddict-unknown-key]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fix is correct.
Here's also a chance to improve code quality: this pattern of setting column hints repeats 4 times. Let's extract this pattern into a helper function (e.g. _set_column_hint()) for maintainability. Most likely it could be used in other adapters, but this is a for a follow-up PR. For now I'd keep _set_column_hint in this module.


if isinstance(partition, PartitionTransformation):
partition_hint: Dict[str, str] = {}
Expand All @@ -156,7 +156,7 @@ def bigquery_adapter(
"`cluster` must be a list of column names or a single column name as a string."
)
for column_name in cluster:
column_hints[column_name] = {"name": column_name, CLUSTER_HINT: True} # type: ignore[typeddict-unknown-key]
column_hints.setdefault(column_name, {"name": column_name})[CLUSTER_HINT] = True # type: ignore[typeddict-unknown-key]
additional_table_hints[CLUSTER_COLUMNS_HINT] = cluster

# Implementing rounding logic flags
Expand All @@ -169,7 +169,7 @@ def bigquery_adapter(
" name."
)
for column_name in round_half_away_from_zero:
column_hints[column_name] = {"name": column_name, ROUND_HALF_AWAY_FROM_ZERO_HINT: True} # type: ignore[typeddict-unknown-key]
column_hints.setdefault(column_name, {"name": column_name})[ROUND_HALF_AWAY_FROM_ZERO_HINT] = True # type: ignore[typeddict-unknown-key]

if round_half_even:
if isinstance(round_half_even, str):
Expand All @@ -179,7 +179,7 @@ def bigquery_adapter(
"`round_half_even` must be a list of column names or a single column name."
)
for column_name in round_half_even:
column_hints[column_name] = {"name": column_name, ROUND_HALF_EVEN_HINT: True} # type: ignore[typeddict-unknown-key]
column_hints.setdefault(column_name, {"name": column_name})[ROUND_HALF_EVEN_HINT] = True # type: ignore[typeddict-unknown-key]

if round_half_away_from_zero and round_half_even:
if intersection_columns := set(round_half_away_from_zero).intersection(
Expand Down
49 changes: 49 additions & 0 deletions tests/load/bigquery/test_bigquery_table_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,25 @@ def partitioned_table():
assert expected_clause in sql_partitioned


def test_adapter_hints_comprehensive_single_column() -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A unit test is good. But this particular scenario likely already worked before the fix because bigquery_adapter() below creates a new column_hints = {} each time.
It's best to add also a unit test for the actual bug: a single call with both partition and cluster on the same column.

@dlt.resource
def partitioned_table():
yield {
"user_id": 10000,
"name": "user 1",
"created_at": "2021-01-01T00:00:00Z",
"category": "category 1",
"score": 100.0,
}

# hints should merge even across multiple adapter calls
bigquery_adapter(partitioned_table, partition="user_id")
bigquery_adapter(partitioned_table, cluster="user_id")
assert partitioned_table.columns == {
"user_id": {"name": "user_id", PARTITION_HINT: True, CLUSTER_HINT: True},
}


@pytest.mark.parametrize(
"destination_config",
destinations_configs(default_sql_configs=True, subset=["bigquery"]),
Expand Down Expand Up @@ -1001,6 +1020,36 @@ def sources() -> List[DltResource]:
assert ["col1"] == hints_cluster_fields, "`hints` table IS NOT clustered by `col1`."


@pytest.mark.parametrize(
"destination_config",
destinations_configs(default_sql_configs=True, subset=["bigquery"]),
ids=lambda x: x.name,
)
def test_adapter_hints_clustering_and_partitioning(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good!

destination_config: DestinationTestConfiguration,
) -> None:
@dlt.resource(columns=[{"name": "col1", "data_type": "bigint"}])
def data() -> Iterator[Dict[str, str]]:
yield from [{"col1": str(i)} for i in range(10)]

bigquery_adapter(data, partition="col1", cluster="col1")

pipeline = destination_config.setup_pipeline(f"bigquery_{uniq_id()}", dev_mode=True)
pipeline.run(data)

with pipeline.sql_client() as c:
nc: google.cloud.bigquery.client.Client = c.native_connection
fqtn = c.make_qualified_table_name("data", quote=False)
table = nc.get_table(fqtn)

assert table.clustering_fields == [
"col1"
], f"Expected clustering fields ['col1'], got {table.clustering_fields}"
assert (
table.range_partitioning is not None and table.range_partitioning.field == "col1"
), f"Expected partition field 'col1', got {table.range_partitioning}"


def test_adapter_hints_empty() -> None:
@dlt.resource(columns=[{"name": "int_col", "data_type": "bigint"}])
def some_data() -> Iterator[Dict[str, str]]:
Expand Down
Loading