Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 12 additions & 25 deletions dlt/dataset/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,36 +206,23 @@ def __call__(
"""Convenience method to proxy `Dataset.query()`. See this method for details."""
return self.query(query, query_dialect, _execute_raw_query=_execute_raw_query)

@overload
def table(self, table_name: str) -> dlt.Relation: ...

@overload
def table(self, table_name: str, table_type: Literal["relation"]) -> dlt.Relation: ...

@overload
def table(self, table_name: str, table_type: Literal["ibis"]) -> ir.Table: ...

# TODO remove `table_type` argument. Instead, `dlt.Relation()` should have `.to_ibis()` method
def table(
self, table_name: str, table_type: Literal["relation", "ibis"] = "relation"
) -> Union[dlt.Relation, ir.Table]:
def table(self, table_name: str, **kwargs: Any) -> dlt.Relation:
"""Get a `dlt.Relation` associated with a table from the dataset."""

# dataset only provides access to tables known in dlt schema, direct query may circumvent this
available_tables = self.tables
if table_name not in available_tables:
# NOTE dataset only provides access to tables known in dlt schema
# raw query execution could access tables unknown by dlt
if table_name not in self.tables:
# TODO: raise TableNotFound
raise ValueError(
f"Table `{table_name}` not found in schema `{self.schema.name}` of dataset"
f" `{self.dataset_name}`. Available table(s):"
f" {', '.join(available_tables)}"
raise ValueError(f"Table `{table_name}` not found. Available table(s): {self.tables}")

# TODO remove in due time;
if kwargs.get("table_type") == "ibis":
raise DeprecationWarning(
"Calling `.table(..., table_type='ibis') is deprecated. Instead, call"
" `.table('foo').to_ibis()` to create a `dlt.Relation` and then retrieve the"
" Ibis Table."
)

if table_type == "ibis":
from dlt.helpers.ibis import create_unbound_ibis_table

return create_unbound_ibis_table(self.schema, self.dataset_name, table_name)

# fallback to the standard dbapi relation
return dlt.Relation(dataset=self, table_name=table_name)

Expand Down
10 changes: 5 additions & 5 deletions docs/website/docs/general-usage/dataset-access/dataset.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ If you install the amazing [ibis](https://ibis-project.org/) library, you can us
pip install ibis-framework
```

dlt will then allow you to get an `ibis.UnboundTable` for each table which you can use to build a query with ibis expressions, which you can then execute on your dataset.
dlt will then allow you to get an `ibis.Table` for each table which you can use to build a query with ibis expressions, which you can then execute on your dataset.

:::warning
A previous version of dlt allowed to use ibis expressions in a slightly different way, allowing users to directly execute and retrieve data on ibis Unbound tables. This method does not work anymore. See the migration guide below for instructions on how to update your code.
Expand All @@ -146,7 +146,7 @@ You can learn more about the available expressions on the [ibis for sql users](h

### Migrating from the previous dlt / ibis implementation

As describe above, the new way to use ibis expressions is to first get one or many `UnboundTable` objects, construct your expression and then bind it to your data via the `Dataset` to get a `Relation` object which you may execute to retrieve your data.
As describe above, the new way to use ibis expressions is to first get one or many `Table` objects and construct your expression. Then, you can pass it `Dataset` to get a `Relation` to execute the full query and retrieve data.

An example from our previous docs for joining a customers and a purchase table was this:

Expand All @@ -169,9 +169,9 @@ df = joined_relation.df()
The migrated version looks like this:

```py
# we need to explicitely select table type ibis here
customers_expression = dataset.table("customers", table_type="ibis")
purchases_expression = dataset.table("purchases", table_type="ibis")
# we convert the dlt.Relation an Ibis Table object
customers_expression = dataset.table("customers").to_ibis()
purchases_expression = dataset.table("purchases").to_ibis()

# join them using an ibis expression, same code as above
joined_epxression = customers_expression.join(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ def ibis_expressions_snippet(pipeline: dlt.Pipeline) -> None:
dataset = pipeline.dataset()

# get two table expressions
customers_expression = dataset.table("customers", table_type="ibis")
purchases_expression = dataset.table("purchases", table_type="ibis")
customers_expression = dataset.table("customers").to_ibis()
purchases_expression = dataset.table("purchases").to_ibis()

# join them using an ibis expression
join_expression = customers_expression.join(
Expand Down
2 changes: 1 addition & 1 deletion docs/website/docs/general-usage/state.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def comments(user_id: str):
# get user comments table from pipeline dataset
# get last user comment id with ibis expression, ibis-extras need to be installed
dataset = current_pipeline.dataset()
user_comments = dataset.table("user_comments", table_type="ibis")
user_comments = dataset.table("user_comments").to_ibis()
max_id_expression = user_comments.filter(user_comments.user_id == user_id).select(user_comments["_id"].max())
max_id_df = dataset(max_id_expression).df()
# if there are no comments for the user, max_id will be None, so we replace it with 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def orders_per_user_snippet(fruitshop_pipeline: dlt.Pipeline) -> None:

@dlt.hub.transformation(name="orders_per_user", write_disposition="merge")
def orders_per_user(dataset: dlt.Dataset) -> Any:
purchases = dataset.table("purchases", table_type="ibis")
purchases = dataset.table("purchases").to_ibis()
yield purchases.group_by(purchases.customer_id).aggregate(order_count=purchases.id.count())

# @@@DLT_SNIPPET_END orders_per_user
Expand Down Expand Up @@ -94,13 +94,13 @@ def multiple_transformations_snippet(fruitshop_pipeline: dlt.Pipeline) -> None:
def my_transformations(dataset: dlt.Dataset) -> Any:
@dlt.hub.transformation(write_disposition="append")
def enriched_purchases(dataset: dlt.Dataset) -> Any:
purchases = dataset.table("purchases", table_type="ibis")
customers = dataset.table("customers", table_type="ibis")
purchases = dataset.table("purchases").to_ibis()
customers = dataset.table("customers").to_ibis()
yield purchases.join(customers, purchases.customer_id == customers.id)

@dlt.hub.transformation(write_disposition="replace")
def total_items_sold(dataset: dlt.Dataset) -> Any:
purchases = dataset.table("purchases", table_type="ibis")
purchases = dataset.table("purchases").to_ibis()
yield purchases.aggregate(total_qty=purchases.quantity.sum())

return enriched_purchases(dataset), total_items_sold(dataset)
Expand Down Expand Up @@ -249,8 +249,8 @@ def computed_schema_snippet(fruitshop_pipeline: dlt.Pipeline) -> None:
# @@@DLT_SNIPPET_START computed_schema
# Show the computed schema before the transformation is executed
dataset = fruitshop_pipeline.dataset()
purchases = dataset.table("purchases", table_type="ibis")
customers = dataset.table("customers", table_type="ibis")
purchases = dataset.table("purchases").to_ibis()
customers = dataset.table("customers").to_ibis()
enriched_purchases = purchases.join(customers, purchases.customer_id == customers.id)
print(dataset(enriched_purchases).columns)
# @@@DLT_SNIPPET_END computed_schema
Expand Down Expand Up @@ -309,8 +309,8 @@ def in_transit_transformations_snippet() -> None:
# load aggregated data to a warehouse destination
@dlt.hub.transformation
def orders_per_store(dataset: dlt.Dataset) -> Any:
orders = dataset.table("orders", table_type="ibis")
stores = dataset.table("stores", table_type="ibis")
orders = dataset.table("orders").to_ibis()
stores = dataset.table("stores").to_ibis()
yield (
orders.join(stores, orders.store_id == stores.id)
.group_by(stores.name)
Expand Down Expand Up @@ -341,17 +341,15 @@ def cleaned_customers(dataset: dlt.Dataset) -> Any:
try:
output_dataset = dlt.current.pipeline().dataset()
if output_dataset.schema.tables.get("cleaned_customers"):
max_pimary_key_expr = output_dataset.table(
"cleaned_customers", table_type="ibis"
).id.max()
max_pimary_key_expr = output_dataset.table("cleaned_customers").to_ibis().id.max()
max_pimary_key = output_dataset(max_pimary_key_expr).fetchscalar()
except PipelineNeverRan:
# we get this exception if the destination dataset has not been run yet
# so we can assume that all customers are new
pass

# return filtered transformation
customers_table = dataset.table("customers", table_type="ibis")
customers_table = dataset.table("customers").to_ibis()

# filter only new customers and exclude the name column in the result
yield customers_table.filter(customers_table.id > max_pimary_key).drop(customers_table.name)
Expand Down
2 changes: 1 addition & 1 deletion tests/load/test_model_item_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ def test_write_dispositions(
# In Databricks, Ibis adds a helper column to emulate offset, causing a schema mismatch
# when the query attempts to insert it. We explicitly select only the expected columns.
# Note that we also explicitly select "_dlt_id" because its addition is disabled by default
example_table_2 = dataset.table("example_table_2", table_type="ibis")
example_table_2 = dataset.table("example_table_2").to_ibis()
expression = (
example_table_2.filter(example_table_2.a >= 3).order_by("a").limit(7)[["a", "_dlt_id"]]
)
Expand Down
4 changes: 2 additions & 2 deletions tests/load/test_read_interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -1038,8 +1038,8 @@ def test_ibis_expression_relation(populated_pipeline: Pipeline) -> None:
dataset = populated_pipeline.dataset()
total_records = _total_records(populated_pipeline.destination.destination_type)

items_table = dataset.table("items", table_type="ibis")
double_items_table = dataset.table("double_items", table_type="ibis")
items_table = dataset.table("items").to_ibis()
double_items_table = dataset.table("double_items").to_ibis()

# check full table access
df = dataset(items_table).df()
Expand Down
Loading