Skip to content
38 changes: 38 additions & 0 deletions dlt/common/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@
TSchemaEvolutionMode,
TSchemaSettings,
TSimpleRegex,
TStandaloneTableReference,
TStoredSchema,
TSchemaTables,
TTableReference,
TTableSchema,
TTableSchemaColumns,
TColumnSchema,
Expand Down Expand Up @@ -605,6 +607,42 @@ def tables(self) -> TSchemaTables:
"""Dictionary of schema tables"""
return self._schema_tables

@property
def references(self) -> list[TStandaloneTableReference]:
"""References between tables"""
all_references: list[TStandaloneTableReference] = []
for table_name, table in self.tables.items():
try:
parent_ref = utils.create_parent_child_reference(self.tables, table_name)
all_references.append(cast(TStandaloneTableReference, parent_ref))
except ValueError:
pass

try:
root_ref = utils.create_root_child_reference(self.tables, table_name)
all_references.append(cast(TStandaloneTableReference, root_ref))
except ValueError:
pass

try:
load_table_ref = utils.create_root_child_reference(self.tables, table_name)
all_references.append(cast(TStandaloneTableReference, load_table_ref))
except ValueError:
pass

refs = table.get("references")
if not refs:
continue

for ref in refs:
top_level_ref: TTableReference = ref.copy()
if top_level_ref.get("table") is None:
top_level_ref["table"] = table_name

all_references.append(cast(TStandaloneTableReference, top_level_ref))

return all_references

@property
def settings(self) -> TSchemaSettings:
return self._settings
Expand Down
78 changes: 76 additions & 2 deletions dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
NewType,
Union,
)
from typing_extensions import Never
from typing_extensions import Never, NotRequired, Required

from dlt.common.data_types import TDataType
from dlt.common.normalizers.typing import TNormalizersConfig
Expand Down Expand Up @@ -276,14 +276,88 @@ class TScd2StrategyDict(TMergeDispositionDict, total=False):
]


class TTableReference(TypedDict):
TReferenceCardinality = Literal[
"zero_to_one",
"one_to_zero",
"zero_to_many",
"many_to_zero",
"one_to_many",
"many_to_one",
"one_to_one",
"many_to_many",
]
"""Represents cardinality between `column` (left) and `referenced_column` (right)

Note that cardinality is not symmetric. For example:
- `Author, 0 to many, Book` an author can have 0 to many book
- `Book, 1 to 1, Author` a book must have exactly 1 author

The statement (Author, 0 to many, Book) doesn't imply (Book, many to 0, Author).
"""


class TTableReference(TypedDict, total=False):
"""Describes a reference to another table's columns.
`columns` corresponds to the `referenced_columns` in the referenced table and their order should match.
"""

label: Optional[str]
"""Text providing semantic information about the reference.

For example, the label "liked" describe the relationship between `user` and `post` (user.id, "liked", post.id)
"""

cardinality: Optional[TReferenceCardinality]
"""Cardinality of the relationship between `table.column` (left) and `referenced_table.referenced_column` (right)."""

table: Optional[str]
"""Name of the table.
When `TTableReference` is defined on a `TTableSchema` (i.e., "inline reference"), the `table`
value is determined by `TTableSchema["name"]`
"""

columns: Sequence[str]
"""Name of the column(s) from `table`"""

referenced_table: str
"""Name of the referenced table"""

referenced_columns: Sequence[str]
"""Name of the columns(s) from `referenced_table`"""


TInlineTableReference = TTableReference


# Compared to `TTableReference` or `TInlineTableReference`, `table` is required
class TStandaloneTableReference(TypedDict, total=False):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could deduplicate code by creating _TTableReferenceBase base that contains all common props beside table and then just add it in TTableReference and TStandaloneTableReference. or is there a problem with type checker if you do this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intended to have TStandaloneReference have Required type annotations. When using Required / NotRequired, you can't inherit and override these annotations via inheritance. Eventually, I dropped the usage of Required / NotRequired because they are poorly supported for Python <3.11 even through typing_extensions.

I think your solution is the correct one:

  • _TTableReferenceBase has common fields (no total=False clause, no Required/NotRequired)
  • TInlineTableReference has more optional fields
  • TStandaloneTableReference specifies all the required fields
  • TTableReference aliases to TInlineTableReference for backwards compatibility

"""Describes a reference to another table's columns.
`columns` corresponds to the `referenced_columns` in the referenced table and their order should match.
"""

label: Optional[str]
"""Text providing semantic information about the reference.

For example, the label "liked" describe the relationship between `user` and `post` (user.id, "liked", post.id)
"""

cardinality: Optional[TReferenceCardinality]
"""Cardinality of the relationship between `table.column` (left) and `referenced_table.referenced_column` (right)."""

table: str
"""Name of the table.
When `TTableReference` is defined on a `TTableSchema` (i.e., "inline reference"), the `table`
value is determined by `TTableSchema["name"]`
"""

columns: Sequence[str]
"""Name of the column(s) from `table`"""

referenced_table: str
"""Name of the referenced table"""

referenced_columns: Sequence[str]
"""Name of the columns(s) from `referenced_table`"""


TTableReferenceParam = Sequence[TTableReference]
Expand Down
66 changes: 61 additions & 5 deletions dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -980,12 +980,35 @@ def create_root_child_reference(tables: TSchemaTables, table_name: str) -> TTabl
root_row_key: str = get_first_column_name_with_prop(root_table, "row_key")

return TTableReference(
label="_dlt_root",
cardinality="many_to_one",
table=table_name,
columns=[child_root_key],
referenced_table=root_table["name"],
referenced_columns=[root_row_key],
)


def get_all_root_child_references_from_root(
tables: TSchemaTables, table_name: str
) -> list[TTableReference]:
root_table = tables.get(table_name)
if is_nested_table(root_table) is True:
raise ValueError(f"Table `{table_name}` is not a root table.")

children_refs = []
# skip the first table in chain, which is the root; i.e., the current one
for child_table in get_nested_tables(tables, table_name)[1:]:
# try/except because a root table may or may not have child with `root_key` enabled
try:
child_ref = create_root_child_reference(tables, child_table["name"])
children_refs.append(child_ref)
except ValueError:
pass

return children_refs


def create_parent_child_reference(tables: TSchemaTables, table_name: str) -> TTableReference:
"""Create a Reference between `{table}.{parent_key}` and `{parent}.{row_key}`"""
child_table = tables.get(table_name)
Expand All @@ -996,25 +1019,53 @@ def create_parent_child_reference(tables: TSchemaTables, table_name: str) -> TTa

parent_table_name = child_table.get("parent")
if parent_table_name is None:
raise ValueError(f"No parent table found for `{table_name=:}`")
raise ValueError(f"Table `{table_name}` is a root table and has no parent.")
parent_table = tables.get(parent_table_name)

child_parent_key: str = get_first_column_name_with_prop(child_table, "parent_key")
parent_row_key: str = get_first_column_name_with_prop(parent_table, "row_key")

return TTableReference(
label="_dlt_parent",
cardinality="many_to_one",
table=table_name,
columns=[child_parent_key],
referenced_table=parent_table_name,
referenced_columns=[parent_row_key],
)


def get_all_parent_child_references_from_root(
tables: TSchemaTables, table_name: str
) -> list[TTableReference]:
root_table = tables.get(table_name)
if is_nested_table(root_table) is True:
raise ValueError(f"Table `{table_name}` is not a root table.")

children_refs = []
# skip the first table in chain, which is the root; i.e., the current one
for child_table in get_nested_tables(tables, table_name)[1:]:
# try/except because a root table may or may not have child with `root_key` enabled
try:
child_ref = create_parent_child_reference(tables, child_table["name"])
children_refs.append(child_ref)
except ValueError:
pass

return children_refs


def create_load_table_reference(table: TTableSchema) -> TTableReference:
"""Create a Reference between `{table}._dlt_oad_id` and `_dlt_loads.load_id`"""
"""Create a Reference between `{table}._dlt_load_id` and `_dlt_loads.load_id`"""
if table["columns"].get(C_DLT_LOAD_ID) is None:
raise ValueError(f"Column `{C_DLT_LOAD_ID}` not found for `table_name={table['name']}`")
raise ValueError(
f"Table `{table['name']}` is not a root table and has no `{C_DLT_LOAD_ID}` column."
)

return TTableReference(
label="_dlt_load",
cardinality="zero_to_many",
table=table["name"],
columns=[C_DLT_LOAD_ID],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please never use non-normalized names for tables and columns. you can just paste schema.naming here and normalize those constants

referenced_table=LOADS_TABLE_NAME,
referenced_columns=[C_DLT_LOADS_TABLE_LOAD_ID],
Expand All @@ -1031,6 +1082,9 @@ def create_version_and_loads_hash_reference(tables: TSchemaTables) -> TTableRefe
raise ValueError(f"Table `{LOADS_TABLE_NAME}` not found in tables: `{list(tables.keys())}`")

return TTableReference(
label="_dlt_schema_version",
cardinality="one_to_many",
table=VERSION_TABLE_NAME,
columns=["version_hash"],
referenced_table=LOADS_TABLE_NAME,
referenced_columns=["schema_version_hash"],
Expand All @@ -1046,12 +1100,14 @@ def create_version_and_loads_schema_name_reference(tables: TSchemaTables) -> TTa
if LOADS_TABLE_NAME not in tables:
raise ValueError(f"Table `{LOADS_TABLE_NAME}` not found in tables: `{list(tables.keys())}`")

loads_and_version_hash_schema_name_ref = TTableReference(
return TTableReference(
label="_dlt_schema_name",
cardinality="many_to_many",
table=VERSION_TABLE_NAME,
columns=["schema_name"],
referenced_table=LOADS_TABLE_NAME,
referenced_columns=["schema_name"],
)
return loads_and_version_hash_schema_name_ref


def migrate_complex_types(table: TTableSchema, warn: bool = False) -> None:
Expand Down
3 changes: 2 additions & 1 deletion dlt/helpers/dbml.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ def _to_dbml_reference(
def _from_dbml_reference(reference: Reference) -> TTableReference:
"""Convert a DBML reference to a dlt table reference"""
return TTableReference(
referenced_table=reference.col2[0].table.name,
table=reference.table1.name,
referenced_table=reference.table2.name,
columns=[col.name for col in reference.col1],
referenced_columns=[col.name for col in reference.col2],
)
Expand Down
Loading
Loading