Skip to content

Commit 3243025

Browse files
committed
moves pipeline tests to pipelines
1 parent b8e5eba commit 3243025

File tree

2 files changed

+216
-204
lines changed

2 files changed

+216
-204
lines changed

tests/load/bigquery/test_bigquery_table_builder.py

Lines changed: 0 additions & 203 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import google
66
import pytest
77
import sqlfluff
8-
from google.cloud.bigquery import Table
98

109
import dlt
1110
from dlt.common.configuration import resolve_configuration
@@ -1040,205 +1039,3 @@ def some_data() -> Iterator[Dict[str, str]]:
10401039
bigquery_adapter(some_data, table_expiration_datetime="2030-01-01")
10411040

10421041
assert some_data._hints["x-bigquery-table-expiration"] == pendulum.datetime(2030, 1, 1) # type: ignore
1043-
1044-
1045-
@pytest.mark.parametrize(
1046-
"destination_config",
1047-
destinations_configs(default_sql_configs=True, subset=["bigquery"]),
1048-
ids=lambda x: x.name,
1049-
)
1050-
def test_adapter_additional_table_hints_table_expiration(
1051-
destination_config: DestinationTestConfiguration,
1052-
) -> None:
1053-
@dlt.resource(columns=[{"name": "col1", "data_type": "text"}])
1054-
def no_hints() -> Iterator[Dict[str, str]]:
1055-
yield from [{"col1": str(i)} for i in range(10)]
1056-
1057-
hints = bigquery_adapter(
1058-
no_hints.with_name(new_name="hints"), table_expiration_datetime="2030-01-01"
1059-
)
1060-
1061-
@dlt.source(max_table_nesting=0)
1062-
def sources() -> List[DltResource]:
1063-
return [no_hints, hints]
1064-
1065-
pipeline = destination_config.setup_pipeline(
1066-
f"bigquery_{uniq_id()}",
1067-
dev_mode=True,
1068-
)
1069-
1070-
pipeline.run(sources())
1071-
1072-
with pipeline.sql_client() as c:
1073-
nc: google.cloud.bigquery.client.Client = c.native_connection
1074-
1075-
fqtn_no_hints = c.make_qualified_table_name("no_hints", escape=False)
1076-
fqtn_hints = c.make_qualified_table_name("hints", escape=False)
1077-
1078-
no_hints_table = nc.get_table(fqtn_no_hints)
1079-
hints_table = nc.get_table(fqtn_hints)
1080-
1081-
assert not no_hints_table.expires
1082-
assert hints_table.expires == pendulum.datetime(2030, 1, 1, 0)
1083-
1084-
1085-
@pytest.mark.parametrize(
1086-
"destination_config",
1087-
destinations_configs(default_sql_configs=True, subset=["bigquery"]),
1088-
ids=lambda x: x.name,
1089-
)
1090-
def test_adapter_merge_behaviour(
1091-
destination_config: DestinationTestConfiguration,
1092-
) -> None:
1093-
@dlt.resource(
1094-
columns=[
1095-
{"name": "col1", "data_type": "text"},
1096-
{"name": "col2", "data_type": "bigint"},
1097-
{"name": "col3", "data_type": "double"},
1098-
]
1099-
)
1100-
def hints() -> Iterator[Dict[str, Any]]:
1101-
yield from [{"col1": str(i), "col2": i, "col3": float(i)} for i in range(10)]
1102-
1103-
bigquery_adapter(hints, table_expiration_datetime="2030-01-01", cluster=["col1"])
1104-
bigquery_adapter(
1105-
hints,
1106-
table_description="A small table somewhere in the cosmos...",
1107-
partition="col2",
1108-
)
1109-
1110-
pipeline = destination_config.setup_pipeline(
1111-
f"bigquery_{uniq_id()}",
1112-
dev_mode=True,
1113-
)
1114-
1115-
pipeline.run(hints)
1116-
1117-
with pipeline.sql_client() as c:
1118-
nc: google.cloud.bigquery.client.Client = c.native_connection
1119-
1120-
table_fqtn = c.make_qualified_table_name("hints", escape=False)
1121-
1122-
table: Table = nc.get_table(table_fqtn)
1123-
1124-
table_cluster_fields = [] if table.clustering_fields is None else table.clustering_fields
1125-
1126-
# Test merging behaviour.
1127-
assert table.expires == pendulum.datetime(2030, 1, 1, 0)
1128-
assert ["col1"] == table_cluster_fields, "`hints` table IS NOT clustered by `col1`."
1129-
assert table.description == "A small table somewhere in the cosmos..."
1130-
1131-
if not table.range_partitioning:
1132-
raise ValueError("`hints` table IS NOT clustered on a column.")
1133-
else:
1134-
assert (
1135-
table.range_partitioning.field == "col2"
1136-
), "`hints` table IS NOT clustered on column `col2`."
1137-
1138-
1139-
@pytest.mark.parametrize(
1140-
"destination_config",
1141-
destinations_configs(default_sql_configs=True, subset=["bigquery"]),
1142-
ids=lambda x: x.name,
1143-
)
1144-
def test_adapter_autodetect_schema_with_hints(
1145-
destination_config: DestinationTestConfiguration,
1146-
) -> None:
1147-
@dlt.resource(
1148-
columns=[
1149-
{"name": "col1", "data_type": "text"},
1150-
{"name": "col2", "data_type": "bigint"},
1151-
{"name": "col3", "data_type": "double"},
1152-
]
1153-
)
1154-
def general_types() -> Iterator[Dict[str, Any]]:
1155-
yield from [{"col1": str(i), "col2": i, "col3": float(i)} for i in range(10)]
1156-
1157-
@dlt.resource(
1158-
columns=[
1159-
{"name": "my_time_column", "data_type": "timestamp"},
1160-
]
1161-
)
1162-
def partition_time() -> Iterator[Dict[str, Any]]:
1163-
for i in range(10):
1164-
yield {
1165-
"my_time_column": pendulum.from_timestamp(1700784000 + i * 50_000),
1166-
}
1167-
1168-
@dlt.resource(
1169-
columns=[
1170-
{"name": "my_date_column", "data_type": "date"},
1171-
]
1172-
)
1173-
def partition_date() -> Iterator[Dict[str, Any]]:
1174-
for i in range(10):
1175-
yield {
1176-
"my_date_column": pendulum.from_timestamp(1700784000 + i * 50_000).date(),
1177-
}
1178-
1179-
bigquery_adapter(
1180-
general_types,
1181-
table_description="A small table somewhere in the cosmos...",
1182-
partition="col2",
1183-
cluster=["col1"],
1184-
autodetect_schema=True,
1185-
)
1186-
1187-
pipeline = destination_config.setup_pipeline(
1188-
f"bigquery_{uniq_id()}",
1189-
dev_mode=True,
1190-
)
1191-
1192-
pipeline.run(general_types)
1193-
1194-
bigquery_adapter(
1195-
partition_time,
1196-
partition="my_time_column",
1197-
autodetect_schema=True,
1198-
)
1199-
1200-
pipeline_time = destination_config.setup_pipeline(
1201-
f"bigquery_{uniq_id()}",
1202-
dev_mode=True,
1203-
)
1204-
1205-
pipeline_time.run(partition_time)
1206-
1207-
bigquery_adapter(
1208-
partition_date,
1209-
partition="my_date_column",
1210-
autodetect_schema=True,
1211-
)
1212-
1213-
pipeline_date = destination_config.setup_pipeline(
1214-
f"bigquery_{uniq_id()}",
1215-
dev_mode=True,
1216-
)
1217-
1218-
pipeline_date.run(partition_date)
1219-
1220-
with pipeline.sql_client() as c:
1221-
nc: google.cloud.bigquery.client.Client = c.native_connection
1222-
1223-
table_fqtn = c.make_qualified_table_name("general_types", escape=False)
1224-
1225-
table: Table = nc.get_table(table_fqtn)
1226-
1227-
table_cluster_fields = [] if table.clustering_fields is None else table.clustering_fields
1228-
assert ["col1"] == table_cluster_fields, "NOT clustered by `col1`."
1229-
1230-
assert table.description == "A small table somewhere in the cosmos..."
1231-
assert table.range_partitioning.field == "col2", "NOT partitioned on column `col2`."
1232-
1233-
with pipeline_time.sql_client() as c:
1234-
nc: google.cloud.bigquery.client.Client = c.native_connection # type: ignore[no-redef]
1235-
table_fqtn = c.make_qualified_table_name("partition_time", escape=False)
1236-
table: Table = nc.get_table(table_fqtn) # type: ignore[no-redef]
1237-
assert table.time_partitioning.field == "my_time_column"
1238-
1239-
with pipeline_date.sql_client() as c:
1240-
nc: google.cloud.bigquery.client.Client = c.native_connection # type: ignore[no-redef]
1241-
table_fqtn = c.make_qualified_table_name("partition_date", escape=False)
1242-
table: Table = nc.get_table(table_fqtn) # type: ignore[no-redef]
1243-
assert table.time_partitioning.field == "my_date_column"
1244-
assert table.time_partitioning.type_ == "DAY"

0 commit comments

Comments
 (0)