Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 85 additions & 51 deletions dlt/common/libs/ibis.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
from __future__ import annotations

import contextlib
from typing import TYPE_CHECKING, Any, Optional, Union
from typing_extensions import override
from typing import TYPE_CHECKING, Any, Generator, Mapping, Optional, Union

import dlt
from dlt.common.destination.dataset import SupportsDataAccess
from dlt.helpers.ibis import DATA_TYPE_MAP, create_ibis_backend, _get_ibis_to_sqlglot_compiler
from dlt.common.schema import Schema as DltSchema
from dlt.common.destination import TDestinationReferenceArg
from dlt.common.schema.typing import TDataType, TTableSchema
from dlt.common.exceptions import MissingDependencyException, TypeErrorWithKnownTypes
from dlt.common.exceptions import MissingDependencyException

if TYPE_CHECKING:
import pandas as pd
Expand All @@ -19,12 +19,14 @@
try:
# ibis imports follow the convention used in the ibis source code
import ibis
from ibis import util as ibis_util
import ibis.expr.datatypes as dt
import ibis.expr.operations as ops
import ibis.expr.schema as sch
import ibis.expr.types as ir
import sqlglot as sg
from ibis.backends import _get_backend_names, NoUrl, NoExampleLoader
import ibis.backends.sql.compilers as sc
from ibis.backends import NoUrl, NoExampleLoader
from ibis.backends.sql import SQLBackend
from ibis.formats import TypeMapper
from sqlglot import expressions as sge
Expand All @@ -46,17 +48,17 @@ def to_ibis(cls, typ: TDataType, nullable: Optional[bool] = None) -> dt.DataType
return ibis.dtype(DATA_TYPE_MAP[typ], nullable=nullable)


def _transpile(query: sge.ExpOrStr, *, target_dialect: type[sg.Dialect]) -> str:
if isinstance(query, sg.Expression):
query = query.sql(dialect=target_dialect)
elif isinstance(query, str):
query = sg.transpile(query, write=target_dialect)[0]
else:
raise TypeErrorWithKnownTypes(
key="query", value_received=query, valid_types=["str", "sqlglot.Expression"]
)
# def _transpile(query: sge.ExpOrStr, *, target_dialect: type[sg.Dialect]) -> str:
# if isinstance(query, sg.Expression):
# query = query.sql(dialect=target_dialect)
# elif isinstance(query, str):
# query = sg.transpile(query, write=target_dialect)[0]
# else:
# raise TypeErrorWithKnownTypes(
# key="query", value_received=query, valid_types=["str", "sqlglot.Expression"]
# )

return query
# return query


# TODO support `database` kwarg (equiv. `dataset_name`) to enable DltBackend to access multiple database
Expand Down Expand Up @@ -94,7 +96,7 @@ def from_dataset(cls, dataset: dlt.Dataset) -> _DltBackend:
# sync with destination should be made through the dataset.
new_backend = cls()
new_backend._dataset = dataset
new_backend.compiler = _get_ibis_to_sqlglot_compiler(new_backend._dataset._destination) # type: ignore[arg-type]
new_backend.compiler = _get_ibis_to_sqlglot_compiler(dataset.destination_dialect)
return new_backend

def to_native_ibis(self, *, read_only: bool = False) -> BaseBackend:
Expand All @@ -121,76 +123,73 @@ def do_connect(
dataset_name=dataset_name,
schema=schema,
)
self.compiler = _get_ibis_to_sqlglot_compiler(self._dataset._destination) # type: ignore[arg-type]

# derived from Ibis Snowflake implementation
@contextlib.contextmanager
# @override
def _safe_raw_sql(self, query: sge.ExpOrStr, **kwargs: Any) -> Any:
yield self.raw_sql(query, **kwargs)
self.compiler = _get_ibis_to_sqlglot_compiler(self._dataset.destination_dialect)

# derived from Ibis Snowflake implementation
# @override
def raw_sql(self, query: Union[str, sg.Expression], **kwargs: Any) -> Any:
"""Execute SQL string or SQLGlot expression using the dlt destination SQL client"""
with contextlib.suppress(AttributeError):
query = _transpile(query, target_dialect=self.compiler.dialect)

assert isinstance(query, str)
# TODO return a cursor instead of rows; this will allow to load pyarrow data
# more efficiently
with self._dataset.sql_client as client:
result = client.execute_sql(query)
def disconnect(self) -> None:
# no need to disconnect, no connections are persisted
pass

return result
@contextlib.contextmanager
def _safe_raw_sql(
self, query: Union[sge.ExpOrStr, str, ir.Expr], **kwargs: Any
) -> Generator[SupportsDataAccess, None, None]:
# just re-yield our cursor which is dbapi compatible
with self._dataset.query(query)._cursor() as cur:
yield cur

def compile( # noqa
self,
expr: ir.Expr,
/,
*,
limit: Union[int, str, None] = None,
params: Optional[Mapping[ir.Scalar, Any]] = None,
pretty: bool = False,
) -> str:
# this reuses dlt.Relation to generate destination-specific SQL from destination agnostic
# expr
r_ = self._dataset.query(expr)
if limit:
r_ = r_.limit(int(limit))
sql = r_.to_sql(pretty=pretty)
self._log(sql)
# TODO: allow for `params`
return sql

# required for marimo DataSources UI to work
@property
def current_database(self) -> str:
return self._dataset.dataset_name

# required for marimo DataSources UI to work
# @override
def list_tables(
self, *, like: Optional[str] = None, database: Union[tuple[str, str], str, None] = None
) -> list[str]:
"""Return the list of table names"""
return list(self._dataset.schema.tables.keys())

# required for marimo DataSources UI to work
# @override
def get_schema(self, table_name: str, *args: Any, **kwargs: Any) -> sch.Schema:
"""Get the Ibis table schema"""
return _to_ibis_schema(self._dataset.table(table_name).schema)

# required for marimo DataSources UI to work
# @override
def _get_schema_using_query(self, query: str) -> sch.Schema:
"""Required to subclass SQLBackend"""
return _to_ibis_schema(self._dataset(query).schema)

# required for marimo DataSources UI to work
# @override
def table(
self, name: str, /, *, database: Union[tuple[str, str], str, None] = None
) -> ir.Table:
"""Construct a table expression"""
# TODO maybe there's a more straighforward way to retrieve catalog and db
sql_client = self._dataset.sql_client
catalog = sql_client.catalog_name()
database = sql_client.capabilities.casefold_identifier(sql_client.dataset_name)

table_schema = self.get_schema(name)
return ops.DatabaseTable(
name,
schema=table_schema,
source=self,
namespace=ops.Namespace(catalog=catalog, database=database),
namespace=ops.Namespace(database=self.current_database),
).to_expr()

# TODO use the new dlt `model` format with INSERT statement
# for non-SQL (e.g., filesystem) use JobClient
# @override
def create_table(
self,
name: str,
Expand All @@ -204,15 +203,50 @@ def create_table(
) -> ir.Table:
raise NotImplementedError

# @override
def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
"""Required to subclass SQLBackend"""
raise NotImplementedError

# @override
def _register_udfs(self, expr: ir.Expr) -> None:
"""Override SQLBackend method to avoid round-trip to Ibis SQL compiler"""

def _fetch_from_cursor(self, cursor: SupportsDataAccess, schema: sch.Schema) -> pd.DataFrame:
# SqlBackend implementation is clearly wrong - it passes cursor to `from_records`
# which expects data
# as a bonus - this will provide native pandas reading for destinations that support it

from ibis.formats.pandas import PandasData

df = PandasData.convert_table(cursor.df(), schema)
return df

@ibis_util.experimental
def to_pyarrow(
self,
expr: ir.Expr,
/,
*,
params: Optional[Mapping[ir.Scalar, Any]] = None,
limit: Union[int, str, None] = None,
**kwargs: Any,
) -> pa.Table:
self._run_pre_execute_hooks(expr)

table_expr = expr.as_table()
schema = table_expr.schema()
arrow_schema = schema.to_pyarrow()

with self._safe_raw_sql(self.compile(expr, limit=limit, params=params)) as cur:
table = cur.arrow()

return expr.__pyarrow_result__(
table.rename_columns(list(table_expr.columns)).cast(arrow_schema)
)

# TODO: implement native arrow batches
# @util.experimental
# def to_pyarrow_batches(


def _to_ibis_schema(table_schema: TTableSchema) -> sch.Schema:
return sch.Schema(
Expand Down
37 changes: 12 additions & 25 deletions dlt/dataset/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,36 +206,23 @@ def __call__(
"""Convenience method to proxy `Dataset.query()`. See this method for details."""
return self.query(query, query_dialect, _execute_raw_query=_execute_raw_query)

@overload
def table(self, table_name: str) -> dlt.Relation: ...

@overload
def table(self, table_name: str, table_type: Literal["relation"]) -> dlt.Relation: ...

@overload
def table(self, table_name: str, table_type: Literal["ibis"]) -> ir.Table: ...

# TODO remove `table_type` argument. Instead, `dlt.Relation()` should have `.to_ibis()` method
def table(
self, table_name: str, table_type: Literal["relation", "ibis"] = "relation"
) -> Union[dlt.Relation, ir.Table]:
def table(self, table_name: str, **kwargs: Any) -> dlt.Relation:
"""Get a `dlt.Relation` associated with a table from the dataset."""

# dataset only provides access to tables known in dlt schema, direct query may circumvent this
available_tables = self.tables
if table_name not in available_tables:
# NOTE dataset only provides access to tables known in dlt schema
# raw query execution could access tables unknown by dlt
if table_name not in self.tables:
# TODO: raise TableNotFound
raise ValueError(
f"Table `{table_name}` not found in schema `{self.schema.name}` of dataset"
f" `{self.dataset_name}`. Available table(s):"
f" {', '.join(available_tables)}"
raise ValueError(f"Table `{table_name}` not found. Available table(s): {self.tables}")

# TODO remove in due time;
if kwargs.get("table_type") == "ibis":
raise DeprecationWarning(
"Calling `.table(..., table_type='ibis') is deprecated. Instead, call"
" `.table('foo').to_ibis()` to create a `dlt.Relation` and then retrieve the"
" Ibis Table."
)

if table_type == "ibis":
from dlt.helpers.ibis import create_unbound_ibis_table

return create_unbound_ibis_table(self.schema, self.dataset_name, table_name)

# fallback to the standard dbapi relation
return dlt.Relation(dataset=self, table_name=table_name)

Expand Down
3 changes: 2 additions & 1 deletion dlt/dataset/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,8 @@ def to_ibis(self) -> ir.Table:
if self._table_name:
ibis_table = backend.table(self._table_name)
else:
ibis_table = backend.sql(self.to_sql())
# pass raw query before any identifiers are expanded, quoted or normalized
ibis_table = backend.sql(self.sqlglot_expression.sql(dialect=self.destination_dialect))

return ibis_table

Expand Down
10 changes: 5 additions & 5 deletions docs/website/docs/general-usage/dataset-access/dataset.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ If you install the amazing [ibis](https://ibis-project.org/) library, you can us
pip install ibis-framework
```

dlt will then allow you to get an `ibis.UnboundTable` for each table which you can use to build a query with ibis expressions, which you can then execute on your dataset.
dlt will then allow you to get an `ibis.Table` for each table which you can use to build a query with ibis expressions, which you can then execute on your dataset.

:::warning
A previous version of dlt allowed to use ibis expressions in a slightly different way, allowing users to directly execute and retrieve data on ibis Unbound tables. This method does not work anymore. See the migration guide below for instructions on how to update your code.
Expand All @@ -146,7 +146,7 @@ You can learn more about the available expressions on the [ibis for sql users](h

### Migrating from the previous dlt / ibis implementation

As describe above, the new way to use ibis expressions is to first get one or many `UnboundTable` objects, construct your expression and then bind it to your data via the `Dataset` to get a `Relation` object which you may execute to retrieve your data.
As describe above, the new way to use ibis expressions is to first get one or many `Table` objects and construct your expression. Then, you can pass it `Dataset` to get a `Relation` to execute the full query and retrieve data.

An example from our previous docs for joining a customers and a purchase table was this:

Expand All @@ -169,9 +169,9 @@ df = joined_relation.df()
The migrated version looks like this:

```py
# we need to explicitely select table type ibis here
customers_expression = dataset.table("customers", table_type="ibis")
purchases_expression = dataset.table("purchases", table_type="ibis")
# we convert the dlt.Relation an Ibis Table object
customers_expression = dataset.table("customers").to_ibis()
purchases_expression = dataset.table("purchases").to_ibis()

# join them using an ibis expression, same code as above
joined_epxression = customers_expression.join(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ def ibis_expressions_snippet(pipeline: dlt.Pipeline) -> None:
dataset = pipeline.dataset()

# get two table expressions
customers_expression = dataset.table("customers", table_type="ibis")
purchases_expression = dataset.table("purchases", table_type="ibis")
customers_expression = dataset.table("customers").to_ibis()
purchases_expression = dataset.table("purchases").to_ibis()

# join them using an ibis expression
join_expression = customers_expression.join(
Expand Down
2 changes: 1 addition & 1 deletion docs/website/docs/general-usage/state.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def comments(user_id: str):
# get user comments table from pipeline dataset
# get last user comment id with ibis expression, ibis-extras need to be installed
dataset = current_pipeline.dataset()
user_comments = dataset.table("user_comments", table_type="ibis")
user_comments = dataset.table("user_comments").to_ibis()
max_id_expression = user_comments.filter(user_comments.user_id == user_id).select(user_comments["_id"].max())
max_id_df = dataset(max_id_expression).df()
# if there are no comments for the user, max_id will be None, so we replace it with 0
Expand Down
Loading