Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion dlt/destinations/impl/athena/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,10 @@ def update_stored_schema(
applied_update = super().update_stored_schema(only_tables, expected_update=expected_update)
# here we could apply tags only if any migration happened, right now we do it on each run
# NOTE: tags are applied before any data is loaded
if self.config.lakeformation_config is not None:
if (
self.config.lakeformation_config is not None
and self.config.lakeformation_config.enabled
):
self.manage_lf_tags()
return applied_update

Expand Down
35 changes: 35 additions & 0 deletions tests/load/pipeline/test_athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,41 @@
pytestmark = pytest.mark.essential


@pytest.mark.parametrize(
"destination_config",
destinations_configs(
default_sql_configs=True,
subset=["athena"],
with_table_format="iceberg",
),
ids=lambda x: x.name,
)
@pytest.mark.parametrize("lf_enabled", [True, False], ids=["lf-on", "lf-off"])
def test_athena_lakeformation_config_gating(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these kind of tests are "more secure" if you parametrize them to have one run with lf enabled and check that the mocked function is called and then one run where it is not and thus the mocked function is not called

destination_config: DestinationTestConfiguration, lf_enabled: bool, mocker, monkeypatch
) -> None:
# Configure Lake Formation gating via env (read by client config)
monkeypatch.setenv("DESTINATION__LAKEFORMATION_CONFIG__ENABLED", str(lf_enabled))

pipeline = destination_config.setup_pipeline("athena_" + uniq_id(), dev_mode=True)

with pipeline.destination_client() as client:
mocked_manage = mocker.patch(
"dlt.destinations.impl.athena.athena.AthenaClient.manage_lf_tags"
)
# avoid side effects in parent update_stored_schema; gating still runs in AthenaClient
mocker.patch(
"dlt.destinations.job_client_impl.SqlJobClientWithStagingDataset.update_stored_schema",
return_value=None,
)

client.update_stored_schema()
if lf_enabled:
mocked_manage.assert_called()
else:
mocked_manage.assert_not_called()


@pytest.mark.parametrize(
"destination_config",
destinations_configs(default_sql_configs=True, subset=["athena"]),
Expand Down
Loading