Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion dlt/common/data_writers/escape.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,10 @@ def format_datetime_literal(v: pendulum.DateTime, precision: int = 6, no_tz: boo
def format_bigquery_datetime_literal(
v: pendulum.DateTime, precision: int = 6, no_tz: bool = False
) -> str:
"""Returns BigQuery-adjusted datetime literal by prefixing required `TIMESTAMP` indicator."""
"""Returns BigQuery-adjusted datetime literal by prefixing required `TIMESTAMP` indicator.

Also works for Presto-based engines.
"""
# https://cloud.google.com/bigquery/docs/reference/standard-sql/lexical#timestamp_literals
return "TIMESTAMP " + format_datetime_literal(v, precision, no_tz)

Expand Down
6 changes: 5 additions & 1 deletion dlt/destinations/impl/athena/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.data_writers.escape import escape_athena_identifier
from dlt.common.data_writers.escape import (
escape_athena_identifier,
format_bigquery_datetime_literal,
)
from dlt.common.arithmetics import DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE


Expand All @@ -11,6 +14,7 @@ def capabilities() -> DestinationCapabilitiesContext:
caps.preferred_staging_file_format = "parquet"
caps.supported_staging_file_formats = ["parquet", "jsonl"]
caps.escape_identifier = escape_athena_identifier
caps.format_datetime_literal = format_bigquery_datetime_literal
caps.decimal_precision = (DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE)
caps.wei_precision = (DEFAULT_NUMERIC_PRECISION, 0)
caps.max_identifier_length = 255
Expand Down
71 changes: 67 additions & 4 deletions dlt/destinations/impl/athena/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,20 @@
from dlt.common.utils import without_none
from dlt.common.data_types import TDataType
from dlt.common.schema import TColumnSchema, Schema, TSchemaTables, TTableSchema
from dlt.common.schema.typing import TTableSchema, TColumnType, TWriteDisposition, TTableFormat
from dlt.common.schema.typing import (
TTableSchema,
TColumnType,
TWriteDisposition,
TTableFormat,
TSortOrder,
)
from dlt.common.schema.utils import table_schema_has_type, get_table_format
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.destination.reference import LoadJob, DoNothingFollowupJob, DoNothingJob
from dlt.common.destination.reference import TLoadJobState, NewLoadJob, SupportsStagingDestination
from dlt.common.storages import FileStorage
from dlt.common.data_writers.escape import escape_bigquery_identifier
from dlt.destinations.sql_jobs import SqlStagingCopyJob
from dlt.destinations.sql_jobs import SqlStagingCopyJob, SqlMergeJob

from dlt.destinations.typing import DBApi, DBTransaction
from dlt.destinations.exceptions import (
Expand Down Expand Up @@ -154,6 +160,64 @@ def __init__(self) -> None:
DLTAthenaFormatter._INSTANCE = self


class AthenaMergeJob(SqlMergeJob):
@classmethod
def _new_temp_table_name(cls, name_prefix: str, sql_client: SqlClientBase[Any]) -> str:
# reproducible name so we know which table to drop
with sql_client.with_staging_dataset(staging=True):
return sql_client.make_qualified_table_name(name_prefix)

@classmethod
def _to_temp_table(cls, select_sql: str, temp_table_name: str) -> str:
# regular table because Athena does not support temporary tables
return f"CREATE TABLE {temp_table_name} AS {select_sql};"

@classmethod
def gen_insert_temp_table_sql(
cls,
table_name: str,
staging_root_table_name: str,
sql_client: SqlClientBase[Any],
primary_keys: Sequence[str],
unique_column: str,
dedup_sort: Tuple[str, TSortOrder] = None,
condition: str = None,
condition_columns: Sequence[str] = None,
) -> Tuple[List[str], str]:
sql, temp_table_name = super().gen_insert_temp_table_sql(
table_name,
staging_root_table_name,
sql_client,
primary_keys,
unique_column,
dedup_sort,
condition,
condition_columns,
)
# DROP needs backtick as escape identifier
sql.insert(0, f"""DROP TABLE IF EXISTS {temp_table_name.replace('"', '`')};""")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, and there's PR (#998 ) that allows to escape for DDL and DML separately

return sql, temp_table_name

@classmethod
def gen_delete_temp_table_sql(
cls,
table_name: str,
unique_column: str,
key_table_clauses: Sequence[str],
sql_client: SqlClientBase[Any],
) -> Tuple[List[str], str]:
sql, temp_table_name = super().gen_delete_temp_table_sql(
table_name, unique_column, key_table_clauses, sql_client
)
# DROP needs backtick as escape identifier
sql.insert(0, f"""DROP TABLE IF EXISTS {temp_table_name.replace('"', '`')};""")
return sql, temp_table_name

@classmethod
def requires_temp_table_for_delete(cls) -> bool:
return True


class AthenaSQLClient(SqlClientBase[Connection]):
capabilities: ClassVar[DestinationCapabilitiesContext] = capabilities()
dbapi: ClassVar[DBApi] = pyathena
Expand Down Expand Up @@ -417,8 +481,7 @@ def _create_replace_followup_jobs(
return super()._create_replace_followup_jobs(table_chain)

def _create_merge_followup_jobs(self, table_chain: Sequence[TTableSchema]) -> List[NewLoadJob]:
# fall back to append jobs for merge
return self._create_append_followup_jobs(table_chain)
return [AthenaMergeJob.from_table_chain(table_chain, self.sql_client)]

def _is_iceberg_table(self, table: TTableSchema) -> bool:
table_format = table.get("table_format")
Expand Down
22 changes: 15 additions & 7 deletions dlt/destinations/sql_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,14 +197,18 @@ def gen_key_table_clauses(

@classmethod
def gen_delete_temp_table_sql(
cls, unique_column: str, key_table_clauses: Sequence[str], sql_client: SqlClientBase[Any]
cls,
table_name: str,
unique_column: str,
key_table_clauses: Sequence[str],
sql_client: SqlClientBase[Any],
) -> Tuple[List[str], str]:
"""Generate sql that creates delete temp table and inserts `unique_column` from root table for all records to delete. May return several statements.

Returns temp table name for cases where special names are required like SQLServer.
"""
sql: List[str] = []
temp_table_name = cls._new_temp_table_name("delete", sql_client)
temp_table_name = cls._new_temp_table_name("delete_" + table_name, sql_client)
select_statement = f"SELECT d.{unique_column} {key_table_clauses[0]}"
sql.append(cls._to_temp_table(select_statement, temp_table_name))
for clause in key_table_clauses[1:]:
Expand Down Expand Up @@ -281,6 +285,7 @@ def default_order_by(cls) -> str:
@classmethod
def gen_insert_temp_table_sql(
cls,
table_name: str,
staging_root_table_name: str,
sql_client: SqlClientBase[Any],
primary_keys: Sequence[str],
Expand All @@ -289,7 +294,7 @@ def gen_insert_temp_table_sql(
condition: str = None,
condition_columns: Sequence[str] = None,
) -> Tuple[List[str], str]:
temp_table_name = cls._new_temp_table_name("insert", sql_client)
temp_table_name = cls._new_temp_table_name("insert_" + table_name, sql_client)
if len(primary_keys) > 0:
# deduplicate
select_sql = cls.gen_select_from_dedup_sql(
Expand Down Expand Up @@ -417,7 +422,9 @@ def gen_merge_sql(
unique_column = escape_id(unique_columns[0])
# create temp table with unique identifier
create_delete_temp_table_sql, delete_temp_table_name = (
cls.gen_delete_temp_table_sql(unique_column, key_table_clauses, sql_client)
cls.gen_delete_temp_table_sql(
root_table["name"], unique_column, key_table_clauses, sql_client
)
)
sql.extend(create_delete_temp_table_sql)

Expand Down Expand Up @@ -470,6 +477,7 @@ def gen_merge_sql(
create_insert_temp_table_sql,
insert_temp_table_name,
) = cls.gen_insert_temp_table_sql(
root_table["name"],
staging_root_table_name,
sql_client,
primary_keys,
Expand Down Expand Up @@ -608,8 +616,8 @@ def gen_update_table_prefix(cls, table_name: str) -> str:

@classmethod
def requires_temp_table_for_delete(cls) -> bool:
"""this could also be a capabitiy, but probably it is better stored here
this identifies destinations that can have a simplified method for merging single
table table chains
"""Whether a temporary table is required to delete records.

Must be `True` for destinations that don't support correlated subqueries.
"""
return False
9 changes: 8 additions & 1 deletion docs/website/docs/dlt-ecosystem/destinations/athena.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ athena_work_group="my_workgroup"
The `athena` destination handles the write dispositions as follows:
- `append` - files belonging to such tables are added to the dataset folder.
- `replace` - all files that belong to such tables are deleted from the dataset folder, and then the current set of files is added.
- `merge` - falls back to `append`.
- `merge` - falls back to `append` (unless you're using [iceberg](#iceberg-data-tables) tables).

## Data loading

Expand Down Expand Up @@ -137,6 +137,13 @@ force_iceberg = "True"

For every table created as an iceberg table, the Athena destination will create a regular Athena table in the staging dataset of both the filesystem and the Athena glue catalog, and then copy all data into the final iceberg table that lives with the non-iceberg tables in the same dataset on both the filesystem and the glue catalog. Switching from iceberg to regular table or vice versa is not supported.

#### `merge` support
The `merge` write disposition is supported for Athena when using iceberg tables.

> Note that:
> 1. there is a risk of tables ending up in inconsistent state in case a pipeline run fails mid flight, because Athena doesn't support transactions, and `dlt` uses multiple DELETE/UPDATE/INSERT statements to implement `merge`,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use the merge syntax that is available in Athena?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Full discussion is here: #1294.

TLDR: we're planning to add a new merge strategy based on the MERGE statement, but this isn't properly specced yet. We want consistent behavior accross different destinations, hence the implementation of delete-insert for athena now. MERGE-based merge for athena will come together with the other destinations.

> 2. `dlt` creates additional helper tables called `insert_<table name>` and `delete_<table name>` in the staging schema to work around Athena's lack of temporary tables.

### dbt support

Athena is supported via `dbt-athena-community`. Credentials are passed into `aws_access_key_id` and `aws_secret_access_key` of the generated dbt profile. Iceberg tables are supported, but you need to make sure that you materialize your models as iceberg tables if your source table is iceberg. We encountered problems with materializing date time columns due to different precision on iceberg (nanosecond) and regular Athena tables (millisecond).
Expand Down
2 changes: 1 addition & 1 deletion tests/load/pipeline/test_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -852,7 +852,7 @@ def some_source():
expected_completed_jobs += 1
# add iceberg copy jobs
if destination_config.force_iceberg:
expected_completed_jobs += 4
expected_completed_jobs += 3 if destination_config.supports_merge else 4
assert len(package_info.jobs["completed_jobs"]) == expected_completed_jobs

with pipeline.sql_client() as sql_client:
Expand Down
2 changes: 1 addition & 1 deletion tests/load/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ def destinations_configs(
file_format="parquet",
bucket_url=AWS_BUCKET,
force_iceberg=True,
supports_merge=False,
supports_merge=True,
supports_dbt=False,
extra_info="iceberg",
)
Expand Down