From f86c90d5e9dba06a09971677af2847ebedcf53b8 Mon Sep 17 00:00:00 2001 From: dave Date: Mon, 29 Jul 2024 16:40:59 +0200 Subject: [PATCH 1/8] prevent accidental wrapping of sources in resources when using adapters --- dlt/destinations/utils.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/dlt/destinations/utils.py b/dlt/destinations/utils.py index 9dd8b83509..54a0d67d98 100644 --- a/dlt/destinations/utils.py +++ b/dlt/destinations/utils.py @@ -14,7 +14,7 @@ from typing import Any, cast, Tuple, Dict, Type from dlt.destinations.exceptions import DatabaseTransientException -from dlt.extract import DltResource, resource as make_resource +from dlt.extract import DltResource, resource as make_resource, DltSource RE_DATA_TYPE = re.compile(r"([A-Z]+)\((\d+)(?:,\s?(\d+))?\)") @@ -23,6 +23,12 @@ def ensure_resource(data: Any) -> DltResource: """Wraps `data` in a DltResource if it's not a DltResource already.""" if isinstance(data, DltResource): return data + # prevent accidental wrapping sources with adapters + if isinstance(data, DltSource): + raise Exception( + "You are trying to use an adapter on a dlt source. You can only use adapters on pure" + " data or dlt resources." + ) resource_name = None if hasattr(data, "__name__") else "content" return cast(DltResource, make_resource(data, name=resource_name)) From bc3030af5221271832d325f8c6cadf1b7b5a9b81 Mon Sep 17 00:00:00 2001 From: dave Date: Mon, 29 Jul 2024 16:42:36 +0200 Subject: [PATCH 2/8] fix typo --- dlt/destinations/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dlt/destinations/utils.py b/dlt/destinations/utils.py index 54a0d67d98..c3caa1b93f 100644 --- a/dlt/destinations/utils.py +++ b/dlt/destinations/utils.py @@ -23,7 +23,7 @@ def ensure_resource(data: Any) -> DltResource: """Wraps `data` in a DltResource if it's not a DltResource already.""" if isinstance(data, DltResource): return data - # prevent accidental wrapping sources with adapters + # prevent accidentally wrapping sources with adapters if isinstance(data, DltSource): raise Exception( "You are trying to use an adapter on a dlt source. You can only use adapters on pure" From fd45350e87885c4871cb414cdbc7c3c16bd811f0 Mon Sep 17 00:00:00 2001 From: dave Date: Mon, 29 Jul 2024 17:18:32 +0200 Subject: [PATCH 3/8] fix qdrant zendesk example --- docs/examples/qdrant_zendesk/qdrant_zendesk.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/docs/examples/qdrant_zendesk/qdrant_zendesk.py b/docs/examples/qdrant_zendesk/qdrant_zendesk.py index 5416f2f2d0..4e90e84ee7 100644 --- a/docs/examples/qdrant_zendesk/qdrant_zendesk.py +++ b/docs/examples/qdrant_zendesk/qdrant_zendesk.py @@ -165,14 +165,13 @@ def get_pages( dataset_name="zendesk_data", ) - # run the dlt pipeline and save info about the load process - load_info = pipeline.run( - # here we use a special function to tell Qdrant which fields to embed - qdrant_adapter( - zendesk_support(), # retrieve tickets data - embed=["subject", "description"], - ) - ) + # here we instantiate the source + source = zendesk_support() + # ...and apply special hints on the ticket resource to tell qdrant which fields to embed + qdrant_adapter(source.tickets_data, embed=["subject", "description"]) + + # run the dlt pipeline and print info about the load process + load_info = pipeline.run(source) print(load_info) From 1a91659090746fc2c3ddb3e71b72ba431752a6f1 Mon Sep 17 00:00:00 2001 From: dave Date: Mon, 29 Jul 2024 17:26:16 +0200 Subject: [PATCH 4/8] another fix to the qdrant example --- docs/examples/qdrant_zendesk/qdrant_zendesk.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/examples/qdrant_zendesk/qdrant_zendesk.py b/docs/examples/qdrant_zendesk/qdrant_zendesk.py index 4e90e84ee7..9b6fbee150 100644 --- a/docs/examples/qdrant_zendesk/qdrant_zendesk.py +++ b/docs/examples/qdrant_zendesk/qdrant_zendesk.py @@ -188,7 +188,7 @@ def get_pages( # query Qdrant with prompt: getting tickets info close to "cancellation" response = qdrant_client.query( - "zendesk_data_content", # collection/dataset name with the 'content' suffix -> tickets content table + "zendesk_data_tickets_data", # tickets_data collection query_text="cancel subscription", # prompt to search limit=3, # limit the number of results to the nearest 3 embeddings ) From 1d6a18fd137867028135cff0590f45b05394f8d0 Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 30 Jul 2024 11:16:47 +0200 Subject: [PATCH 5/8] rename utils function add support for source with single resource add tests --- .../impl/athena/athena_adapter.py | 4 +- .../impl/bigquery/bigquery_adapter.py | 4 +- .../impl/clickhouse/clickhouse_adapter.py | 4 +- .../impl/lancedb/lancedb_adapter.py | 4 +- .../impl/qdrant/qdrant_adapter.py | 4 +- .../impl/synapse/synapse_adapter.py | 4 +- .../impl/weaviate/weaviate_adapter.py | 4 +- dlt/destinations/utils.py | 22 +++++++--- tests/destinations/test_utils.py | 41 +++++++++++++++++++ 9 files changed, 71 insertions(+), 20 deletions(-) create mode 100644 tests/destinations/test_utils.py diff --git a/dlt/destinations/impl/athena/athena_adapter.py b/dlt/destinations/impl/athena/athena_adapter.py index cb600335c0..50f7abc54a 100644 --- a/dlt/destinations/impl/athena/athena_adapter.py +++ b/dlt/destinations/impl/athena/athena_adapter.py @@ -4,7 +4,7 @@ from dlt.common.pendulum import timezone from dlt.common.schema.typing import TColumnNames, TTableSchemaColumns, TColumnSchema -from dlt.destinations.utils import ensure_resource +from dlt.destinations.utils import get_resource_for_adapter from dlt.extract import DltResource from dlt.extract.items import TTableHintTemplate @@ -89,7 +89,7 @@ def athena_adapter( >>> athena_adapter(data, partition=["department", athena_partition.year("date_hired"), athena_partition.bucket(8, "name")]) [DltResource with hints applied] """ - resource = ensure_resource(data) + resource = get_resource_for_adapter(data) additional_table_hints: Dict[str, TTableHintTemplate[Any]] = {} if partition: diff --git a/dlt/destinations/impl/bigquery/bigquery_adapter.py b/dlt/destinations/impl/bigquery/bigquery_adapter.py index 4dee572f57..55fe1b6b74 100644 --- a/dlt/destinations/impl/bigquery/bigquery_adapter.py +++ b/dlt/destinations/impl/bigquery/bigquery_adapter.py @@ -7,7 +7,7 @@ TColumnNames, TTableSchemaColumns, ) -from dlt.destinations.utils import ensure_resource +from dlt.destinations.utils import get_resource_for_adapter from dlt.extract import DltResource from dlt.extract.items import TTableHintTemplate @@ -78,7 +78,7 @@ def bigquery_adapter( >>> bigquery_adapter(data, partition="date_hired", table_expiration_datetime="2024-01-30", table_description="Employee Data") [DltResource with hints applied] """ - resource = ensure_resource(data) + resource = get_resource_for_adapter(data) additional_table_hints: Dict[str, TTableHintTemplate[Any]] = {} column_hints: TTableSchemaColumns = {} diff --git a/dlt/destinations/impl/clickhouse/clickhouse_adapter.py b/dlt/destinations/impl/clickhouse/clickhouse_adapter.py index dc030ef88c..41be531b71 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse_adapter.py +++ b/dlt/destinations/impl/clickhouse/clickhouse_adapter.py @@ -5,7 +5,7 @@ TABLE_ENGINE_TYPES, TABLE_ENGINE_TYPE_HINT, ) -from dlt.destinations.utils import ensure_resource +from dlt.destinations.utils import get_resource_for_adapter from dlt.extract import DltResource from dlt.extract.items import TTableHintTemplate @@ -46,7 +46,7 @@ def clickhouse_adapter(data: Any, table_engine_type: TTableEngineType = None) -> >>> clickhouse_adapter(data, table_engine_type="merge_tree") [DltResource with hints applied] """ - resource = ensure_resource(data) + resource = get_resource_for_adapter(data) additional_table_hints: Dict[str, TTableHintTemplate[Any]] = {} if table_engine_type is not None: diff --git a/dlt/destinations/impl/lancedb/lancedb_adapter.py b/dlt/destinations/impl/lancedb/lancedb_adapter.py index bb33632b48..99d5ef43c6 100644 --- a/dlt/destinations/impl/lancedb/lancedb_adapter.py +++ b/dlt/destinations/impl/lancedb/lancedb_adapter.py @@ -1,7 +1,7 @@ from typing import Any from dlt.common.schema.typing import TColumnNames, TTableSchemaColumns -from dlt.destinations.utils import ensure_resource +from dlt.destinations.utils import get_resource_for_adapter from dlt.extract import DltResource @@ -32,7 +32,7 @@ def lancedb_adapter( >>> lancedb_adapter(data, embed="description") [DltResource with hints applied] """ - resource = ensure_resource(data) + resource = get_resource_for_adapter(data) column_hints: TTableSchemaColumns = {} diff --git a/dlt/destinations/impl/qdrant/qdrant_adapter.py b/dlt/destinations/impl/qdrant/qdrant_adapter.py index 215d87a920..e39d3e3644 100644 --- a/dlt/destinations/impl/qdrant/qdrant_adapter.py +++ b/dlt/destinations/impl/qdrant/qdrant_adapter.py @@ -2,7 +2,7 @@ from dlt.common.schema.typing import TColumnNames, TTableSchemaColumns from dlt.extract import DltResource, resource as make_resource -from dlt.destinations.utils import ensure_resource +from dlt.destinations.utils import get_resource_for_adapter VECTORIZE_HINT = "x-qdrant-embed" @@ -32,7 +32,7 @@ def qdrant_adapter( >>> qdrant_adapter(data, embed="description") [DltResource with hints applied] """ - resource = ensure_resource(data) + resource = get_resource_for_adapter(data) column_hints: TTableSchemaColumns = {} diff --git a/dlt/destinations/impl/synapse/synapse_adapter.py b/dlt/destinations/impl/synapse/synapse_adapter.py index 8b262f3621..e12823c7bf 100644 --- a/dlt/destinations/impl/synapse/synapse_adapter.py +++ b/dlt/destinations/impl/synapse/synapse_adapter.py @@ -3,7 +3,7 @@ from dlt.extract import DltResource, resource as make_resource from dlt.extract.items import TTableHintTemplate from dlt.extract.hints import TResourceHints -from dlt.destinations.utils import ensure_resource +from dlt.destinations.utils import get_resource_for_adapter TTableIndexType = Literal["heap", "clustered_columnstore_index"] """ @@ -37,7 +37,7 @@ def synapse_adapter(data: Any, table_index_type: TTableIndexType = None) -> DltR >>> synapse_adapter(data, table_index_type="clustered_columnstore_index") [DltResource with hints applied] """ - resource = ensure_resource(data) + resource = get_resource_for_adapter(data) additional_table_hints: Dict[str, TTableHintTemplate[Any]] = {} if table_index_type is not None: diff --git a/dlt/destinations/impl/weaviate/weaviate_adapter.py b/dlt/destinations/impl/weaviate/weaviate_adapter.py index a290ac65b4..9bd0b41783 100644 --- a/dlt/destinations/impl/weaviate/weaviate_adapter.py +++ b/dlt/destinations/impl/weaviate/weaviate_adapter.py @@ -2,7 +2,7 @@ from dlt.common.schema.typing import TColumnNames, TTableSchemaColumns from dlt.extract import DltResource, resource as make_resource -from dlt.destinations.utils import ensure_resource +from dlt.destinations.utils import get_resource_for_adapter TTokenizationTMethod = Literal["word", "lowercase", "whitespace", "field"] TOKENIZATION_METHODS: Set[TTokenizationTMethod] = set(get_args(TTokenizationTMethod)) @@ -54,7 +54,7 @@ def weaviate_adapter( >>> weaviate_adapter(data, vectorize="description", tokenization={"description": "word"}) [DltResource with hints applied] """ - resource = ensure_resource(data) + resource = get_resource_for_adapter(data) column_hints: TTableSchemaColumns = {} if vectorize: diff --git a/dlt/destinations/utils.py b/dlt/destinations/utils.py index c3caa1b93f..7ea5cf3a71 100644 --- a/dlt/destinations/utils.py +++ b/dlt/destinations/utils.py @@ -1,4 +1,6 @@ import re +import inspect + from typing import Any, List, Optional, Tuple from dlt.common import logger @@ -19,16 +21,24 @@ RE_DATA_TYPE = re.compile(r"([A-Z]+)\((\d+)(?:,\s?(\d+))?\)") -def ensure_resource(data: Any) -> DltResource: - """Wraps `data` in a DltResource if it's not a DltResource already.""" +def get_resource_for_adapter(data: Any) -> DltResource: + """ + Helper function for adapters. Wraps `data` in a DltResource if it's not a DltResource already. + Alternatively if `data` is a DltSource, throws an error if there are multiple resource in the source + or returns the single resource if available. + """ if isinstance(data, DltResource): return data # prevent accidentally wrapping sources with adapters if isinstance(data, DltSource): - raise Exception( - "You are trying to use an adapter on a dlt source. You can only use adapters on pure" - " data or dlt resources." - ) + if len(data.resources.keys()) == 1: + return list(data.resources.values())[0] + else: + raise Exception( + "You are trying to use an adapter on a DltSource with multiple resources. You can" + " only use adapters on pure data, direclty on a DltResouce or a DltSource" + " containing a single DltResource." + ) resource_name = None if hasattr(data, "__name__") else "content" return cast(DltResource, make_resource(data, name=resource_name)) diff --git a/tests/destinations/test_utils.py b/tests/destinations/test_utils.py new file mode 100644 index 0000000000..01d0a5cce0 --- /dev/null +++ b/tests/destinations/test_utils.py @@ -0,0 +1,41 @@ +import dlt +import pytest + +from dlt.destinations.utils import get_resource_for_adapter +from dlt.extract import DltResource + + +def test_get_resource_for_adapter() -> None: + # test on pure data + data = [1, 2, 3] + adapted_resource = get_resource_for_adapter(data) + assert isinstance(adapted_resource, DltResource) + assert list(adapted_resource) == [1, 2, 3] + + # test on resource + @dlt.resource(table_name="my_table") + def some_resource(): + yield [1, 2, 3] + + adapted_resource = get_resource_for_adapter(some_resource) + assert adapted_resource == some_resource + + # test on source with one resource + @dlt.source + def source(): + return [some_resource] + + adapted_resource = get_resource_for_adapter(source()) + assert adapted_resource.table_name == "my_table" + + # test on source with multiple resources + @dlt.resource(table_name="my_table") + def other_resource(): + yield [1, 2, 3] + + @dlt.source + def other_source(): + return [some_resource, other_resource] + + with pytest.raises(Exception): + get_resource_for_adapter(other_source()) From 4b00b4b4ee8514b7b5a58bd43ab39aa8cc80f16a Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 30 Jul 2024 11:21:54 +0200 Subject: [PATCH 6/8] add logger warning when setting default name for resource --- dlt/destinations/utils.py | 7 ++++++- tests/destinations/test_utils.py | 2 ++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/dlt/destinations/utils.py b/dlt/destinations/utils.py index 7ea5cf3a71..eab5341b01 100644 --- a/dlt/destinations/utils.py +++ b/dlt/destinations/utils.py @@ -39,7 +39,12 @@ def get_resource_for_adapter(data: Any) -> DltResource: " only use adapters on pure data, direclty on a DltResouce or a DltSource" " containing a single DltResource." ) - resource_name = None if hasattr(data, "__name__") else "content" + + resource_name = None + if not hasattr(data, "__name__"): + logger.warning("Setting default resource name to `content` for adapted resource.") + resource_name = "content" + return cast(DltResource, make_resource(data, name=resource_name)) diff --git a/tests/destinations/test_utils.py b/tests/destinations/test_utils.py index 01d0a5cce0..10079273d2 100644 --- a/tests/destinations/test_utils.py +++ b/tests/destinations/test_utils.py @@ -11,6 +11,7 @@ def test_get_resource_for_adapter() -> None: adapted_resource = get_resource_for_adapter(data) assert isinstance(adapted_resource, DltResource) assert list(adapted_resource) == [1, 2, 3] + assert adapted_resource.name == "content" # test on resource @dlt.resource(table_name="my_table") @@ -19,6 +20,7 @@ def some_resource(): adapted_resource = get_resource_for_adapter(some_resource) assert adapted_resource == some_resource + assert adapted_resource.name == "some_resource" # test on source with one resource @dlt.source From 8f379643bd36deba3017295a44ec827e56ae688a Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 30 Jul 2024 11:37:31 +0200 Subject: [PATCH 7/8] only use selected resources in get_resource_for_adapter --- dlt/destinations/utils.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dlt/destinations/utils.py b/dlt/destinations/utils.py index eab5341b01..c6420ffc9e 100644 --- a/dlt/destinations/utils.py +++ b/dlt/destinations/utils.py @@ -31,8 +31,8 @@ def get_resource_for_adapter(data: Any) -> DltResource: return data # prevent accidentally wrapping sources with adapters if isinstance(data, DltSource): - if len(data.resources.keys()) == 1: - return list(data.resources.values())[0] + if len(data.selected_resources.keys()) == 1: + return list(data.selected_resources.values())[0] else: raise Exception( "You are trying to use an adapter on a DltSource with multiple resources. You can" @@ -42,7 +42,7 @@ def get_resource_for_adapter(data: Any) -> DltResource: resource_name = None if not hasattr(data, "__name__"): - logger.warning("Setting default resource name to `content` for adapted resource.") + logger.info("Setting default resource name to `content` for adapted resource.") resource_name = "content" return cast(DltResource, make_resource(data, name=resource_name)) From bccbbba6f3540d3a5e9e0dc0029a958180bf1cac Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 30 Jul 2024 12:30:16 +0200 Subject: [PATCH 8/8] switch to value error --- dlt/destinations/utils.py | 2 +- tests/destinations/test_utils.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dlt/destinations/utils.py b/dlt/destinations/utils.py index c6420ffc9e..fcc2c4fd16 100644 --- a/dlt/destinations/utils.py +++ b/dlt/destinations/utils.py @@ -34,7 +34,7 @@ def get_resource_for_adapter(data: Any) -> DltResource: if len(data.selected_resources.keys()) == 1: return list(data.selected_resources.values())[0] else: - raise Exception( + raise ValueError( "You are trying to use an adapter on a DltSource with multiple resources. You can" " only use adapters on pure data, direclty on a DltResouce or a DltSource" " containing a single DltResource." diff --git a/tests/destinations/test_utils.py b/tests/destinations/test_utils.py index 10079273d2..32fc286830 100644 --- a/tests/destinations/test_utils.py +++ b/tests/destinations/test_utils.py @@ -39,5 +39,5 @@ def other_resource(): def other_source(): return [some_resource, other_resource] - with pytest.raises(Exception): + with pytest.raises(ValueError): get_resource_for_adapter(other_source())