-
Notifications
You must be signed in to change notification settings - Fork 414
Open
Description
After a refactor of query qualification code that happened in 28.1 we see errors like this. We need to do something about it soon.
- looks like
sqlglotproblem but we should still investigate if something weird is not happening in DltBackend - do not limit the version in our deps! everything else works and we should not limit the environments of our users
- maybe we need to post bug report to sqlglot github and get some feedback
- remove sqlglot upper bound from
devgroup inpyproject.toml
@pytest.mark.parametrize(
"populated_pipeline",
duckdb_conf,
indirect=True,
ids=lambda x: x.name,
)
def test_execute_expression(populated_pipeline: dlt.Pipeline):
backend = _DltBackend.from_dataset(populated_pipeline.dataset())
expected_schema = ibis.Schema(
{
"_dlt_id": ibis.dtype("string", nullable=False),
"id": ibis.dtype("int64", nullable=True),
}
)
table = backend.table("items")
expr = table.select("_dlt_id", "id")
> table2 = backend.execute(expr)
backend = <dlt.common.libs.ibis._DltBackend object at 0x7fc955f66010>
expected_schema = ibis.Schema {
_dlt_id !string
id int64
}
expr = r0 := DatabaseTable: read_test_202512161045373851.items
id int64
decimal decimal
other_decimal decimal
_dlt_load_id !string
_dlt_id !string
Project[r0]
_dlt_id: r0._dlt_id
id: r0.id
populated_pipeline = <dlt.pipeline(pipeline_name='read_pipeline', destination='duckdb', dataset_name='read_test_202512161045373851', defaul.../work/dlt/dlt/_storage/.dlt/pipelines', working_dir='/home/runner/work/dlt/dlt/_storage/.dlt/pipelines/read_pipeline')>
table = DatabaseTable: read_test_202512161045373851.items
id int64
decimal decimal
other_decimal decimal
_dlt_load_id !string
_dlt_id !string
tests/libs/test_ibis.py:129:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.venv/lib/python3.11/site-packages/ibis/backends/sql/__init__.py:299: in execute
sql = self.compile(table, params=params, limit=limit, **kwargs)
expr = r0 := DatabaseTable: read_test_202512161045373851.items
id int64
decimal decimal
other_decimal decimal
_dlt_load_id !string
_dlt_id !string
Project[r0]
_dlt_id: r0._dlt_id
id: r0.id
kwargs = {}
limit = None
params = None
self = <dlt.common.libs.ibis._DltBackend object at 0x7fc955f66010>
table = r0 := DatabaseTable: read_test_202512161045373851.items
id int64
decimal decimal
other_decimal decimal
_dlt_load_id !string
_dlt_id !string
Project[r0]
_dlt_id: r0._dlt_id
id: r0.id
dlt/common/libs/ibis.py:157: in compile
sql = r_.to_sql(pretty=pretty)
expr = r0 := DatabaseTable: read_test_202512161045373851.items
id int64
decimal decimal
other_decimal decimal
_dlt_load_id !string
_dlt_id !string
Project[r0]
_dlt_id: r0._dlt_id
id: r0.id
limit = None
params = None
pretty = False
r_ = <[AttributeError("'str' object has no attribute 'copy'") raised in repr()] Relation object at 0x7fc95626dad0>
self = <dlt.common.libs.ibis._DltBackend object at 0x7fc955f66010>
dlt/dataset/relation.py:244: in to_sql
_, _qualified_query = _get_relation_output_columns_schema(self)
_raw_query = False
pretty = False
self = <[AttributeError("'str' object has no attribute 'copy'") raised in repr()] Relation object at 0x7fc95626dad0>
dlt/dataset/relation.py:557: in _get_relation_output_columns_schema
columns_schema, normalized_query = lineage.compute_columns_schema(
allow_anonymous_columns = True
allow_partial = False
infer_sqlglot_schema = False
relation = <[AttributeError("'str' object has no attribute 'copy'") raised in repr()] Relation object at 0x7fc95626dad0>
dlt/dataset/lineage.py:121: in compute_columns_schema
qualify(
allow_anonymous_columns = True
allow_partial = False
dialect = 'duckdb'
expression = Select(
expressions=[
Column(
this=Identifier(this='_dlt_id', quoted=True),
table=Identifier(this='t... db=Identifier(this='read_test_202512161045373851', quoted=True),
alias=Identifier(this='t0', quoted=True))))
infer_sqlglot_schema = False
select_expression = Select(
expressions=[
Column(
this=Identifier(this='_dlt_id', quoted=True),
table=Identifier(this='t... db=Identifier(this='read_test_202512161045373851', quoted=True),
alias=Identifier(this='t0', quoted=True))))
sqlglot_schema = <sqlglot.schema.MappingSchema object at 0x7fc95646ea90>
.venv/lib/python3.11/site-packages/sqlglot/optimizer/qualify.py:84: in qualify
expression = qualify_tables(
allow_partial_qualification = False
canonicalize_table_aliases = False
catalog = None
db = None
dialect = <sqlglot.dialects.duckdb.DuckDB object at 0x7fc956622a10>
expand_alias_refs = True
expand_stars = True
expression = Select(
expressions=[
Column(
this=Identifier(this='_dlt_id', quoted=True),
table=Identifier(this='t... db=Identifier(this='read_test_202512161045373851', quoted=True),
alias=Identifier(this='t0', quoted=True))))
identify = True
infer_schema = False
isolate_tables = False
on_qualify = None
qualify_columns = True
quote_identifiers = False
schema = <sqlglot.schema.MappingSchema object at 0x7fc95646ea90>
sql = None
validate_qualify_columns = True
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
expression = Select(
expressions=[
Column(
this=Identifier(this='_dlt_id', quoted=True),
table=Identifier(this='t... db=Identifier(this='read_test_202512161045373851', quoted=True),
alias=Identifier(this='t0', quoted=True))))
db = None, catalog = None, on_qualify = None
dialect = <sqlglot.dialects.duckdb.DuckDB object at 0x7fc956622a10>
canonicalize_table_aliases = False
def qualify_tables(
expression: E,
db: t.Optional[str | exp.Identifier] = None,
catalog: t.Optional[str | exp.Identifier] = None,
on_qualify: t.Optional[t.Callable[[exp.Table], None]] = None,
dialect: DialectType = None,
canonicalize_table_aliases: bool = False,
) -> E:
"""
Rewrite sqlglot AST to have fully qualified tables. Join constructs such as
(t1 JOIN t2) AS t will be expanded into (SELECT * FROM t1 AS t1, t2 AS t2) AS t.
Examples:
>>> import sqlglot
>>> expression = sqlglot.parse_one("SELECT 1 FROM tbl")
>>> qualify_tables(expression, db="db").sql()
'SELECT 1 FROM db.tbl AS tbl'
>>>
>>> expression = sqlglot.parse_one("SELECT 1 FROM (t1 JOIN t2) AS t")
>>> qualify_tables(expression).sql()
'SELECT 1 FROM (SELECT * FROM t1 AS t1, t2 AS t2) AS t'
Args:
expression: Expression to qualify
db: Database name
catalog: Catalog name
on_qualify: Callback after a table has been qualified.
dialect: The dialect to parse catalog and schema into.
canonicalize_table_aliases: Whether to use canonical aliases (_0, _1, ...) for all sources
instead of preserving table names. Defaults to False.
Returns:
The qualified expression.
"""
dialect = Dialect.get_or_raise(dialect)
next_alias_name = name_sequence("_")
if db := db or None:
db = exp.parse_identifier(db, dialect=dialect)
db.meta["is_table"] = True
db = normalize_identifiers(db, dialect=dialect)
if catalog := catalog or None:
catalog = exp.parse_identifier(catalog, dialect=dialect)
catalog.meta["is_table"] = True
catalog = normalize_identifiers(catalog, dialect=dialect)
def _qualify(table: exp.Table) -> None:
if isinstance(table.this, exp.Identifier):
if db and not table.args.get("db"):
table.set("db", db.copy())
if catalog and not table.args.get("catalog") and table.args.get("db"):
table.set("catalog", catalog.copy())
if (db or catalog) and not isinstance(expression, exp.Query):
with_ = expression.args.get("with_") or exp.With()
cte_names = {cte.alias_or_name for cte in with_.expressions}
for node in expression.walk(prune=lambda n: isinstance(n, exp.Query)):
if isinstance(node, exp.Table) and node.name not in cte_names:
_qualify(node)
def _set_alias(
expression: exp.Expression,
canonical_aliases: t.Dict[str, str],
target_alias: t.Optional[str] = None,
scope: t.Optional[Scope] = None,
normalize: bool = False,
columns: t.Optional[t.List[t.Union[str, exp.Identifier]]] = None,
) -> None:
alias = expression.args.get("alias") or exp.TableAlias()
if canonicalize_table_aliases:
new_alias_name = next_alias_name()
canonical_aliases[alias.name or target_alias or ""] = new_alias_name
elif not alias.name:
new_alias_name = target_alias or next_alias_name()
if normalize and target_alias:
new_alias_name = normalize_identifiers(new_alias_name, dialect=dialect).name
else:
return
alias.set("this", exp.to_identifier(new_alias_name))
if columns:
alias.set("columns", [exp.to_identifier(c) for c in columns])
expression.set("alias", alias)
if scope:
scope.rename_source(None, new_alias_name)
for scope in traverse_scope(expression):
local_columns = scope.local_columns
canonical_aliases: t.Dict[str, str] = {}
for query in scope.subqueries:
subquery = query.parent
if isinstance(subquery, exp.Subquery):
subquery.unwrap().replace(subquery)
for derived_table in scope.derived_tables:
unnested = derived_table.unnest()
if isinstance(unnested, exp.Table):
joins = unnested.args.get("joins")
unnested.set("joins", None)
derived_table.this.replace(exp.select("*").from_(unnested.copy(), copy=False))
derived_table.this.set("joins", joins)
_set_alias(derived_table, canonical_aliases, scope=scope)
if pivot := seq_get(derived_table.args.get("pivots") or [], 0):
_set_alias(pivot, canonical_aliases)
table_aliases = {}
for name, source in scope.sources.items():
if isinstance(source, exp.Table):
# When the name is empty, it means that we have a non-table source, e.g. a pivoted cte
is_real_table_source = bool(name)
if pivot := seq_get(source.args.get("pivots") or [], 0):
name = source.name
table_this = source.this
table_alias = source.args.get("alias")
function_columns: t.List[t.Union[str, exp.Identifier]] = []
if isinstance(table_this, exp.Func):
if not table_alias:
function_columns = ensure_list(
dialect.DEFAULT_FUNCTIONS_COLUMN_NAMES.get(type(table_this))
)
elif columns := table_alias.columns:
function_columns = columns
elif type(table_this) in dialect.DEFAULT_FUNCTIONS_COLUMN_NAMES:
function_columns = ensure_list(source.alias_or_name)
source.set("alias", None)
name = None
_set_alias(
source,
canonical_aliases,
target_alias=name or source.name or None,
normalize=True,
columns=function_columns,
)
source_fqn = ".".join(p.name for p in source.parts)
> table_aliases[source_fqn] = source.args["alias"].this.copy()
E AttributeError: 'str' object has no attribute 'copy'
_qualify = <function qualify_tables.<locals>._qualify at 0x7fc955ca80e0>
_set_alias = <function qualify_tables.<locals>._set_alias at 0x7fc955ca8d60>
canonical_aliases = {}
canonicalize_table_aliases = False
catalog = None
db = None
dialect = <sqlglot.dialects.duckdb.DuckDB object at 0x7fc956622a10>
expression = Select(
expressions=[
Column(
this=Identifier(this='_dlt_id', quoted=True),
table=Identifier(this='t... db=Identifier(this='read_test_202512161045373851', quoted=True),
alias=Identifier(this='t0', quoted=True))))
function_columns = []
is_real_table_source = True
local_columns = [Column(
this=Identifier(this='_dlt_id', quoted=True),
table=Identifier(this='t0', quoted=True)), Column(
this=Identifier(this='id', quoted=True),
table=Identifier(this='t0', quoted=True))]
name = 't0'
next_alias_name = <function name_sequence.<locals>.<lambda> at 0x7fc955ca85e0>
on_qualify = None
pivot = None
scope = Scope<SELECT "t0"."_dlt_id", "t0"."id" FROM "read_test_202512161045373851"."items" AS "t0">
source = Table(
this=Identifier(this='items', quoted=True),
db=Identifier(this='read_test_202512161045373851', quoted=True),
alias=Identifier(this='t0', quoted=True))
source_fqn = 'read_test_202512161045373851.items'
table_alias = Identifier(this='t0', quoted=True)
table_aliases = {}
table_this = Identifier(this='items', quoted=True)
anuunchin
Metadata
Metadata
Assignees
Labels
No labels
Type
Projects
Status
Todo