Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ packages = [{include = "sources"}]

[tool.poetry.dependencies]
python = ">=3.8.1,<3.13"
dlt = {version = "0.4.3a0", allow-prereleases = true, extras = ["redshift", "bigquery", "postgres", "duckdb", "s3", "gs"]}
dlt = {version = "0.4.4", allow-prereleases = true, extras = ["redshift", "bigquery", "postgres", "duckdb", "s3", "gs"]}

[tool.poetry.group.dev.dependencies]
mypy = "1.6.1"
Expand Down
66 changes: 54 additions & 12 deletions sources/sql_database/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Source that loads tables form any SQLAlchemy supported database, supports batching requests and incremental loads."""

from typing import List, Optional, Union, Iterable, Any
from typing import Callable, List, Optional, Union, Iterable, Any
from sqlalchemy import MetaData, Table
from sqlalchemy.engine import Engine

Expand All @@ -9,6 +9,7 @@


from dlt.sources.credentials import ConnectionStringCredentials
from dlt.common.configuration.specs.config_section_context import ConfigSectionContext

from .helpers import (
table_rows,
Expand All @@ -26,7 +27,10 @@ def sql_database(
schema: Optional[str] = dlt.config.value,
metadata: Optional[MetaData] = None,
table_names: Optional[List[str]] = dlt.config.value,
chunk_size: int = 1000,
detect_precision_hints: Optional[bool] = dlt.config.value,
defer_table_reflect: Optional[bool] = dlt.config.value,
table_adapter_callback: Callable[[Table], None] = None,
) -> Iterable[DltResource]:
"""
A DLT source which loads data from an SQL database using SQLAlchemy.
Expand All @@ -37,44 +41,67 @@ def sql_database(
schema (Optional[str]): Name of the database schema to load (if different from default).
metadata (Optional[MetaData]): Optional `sqlalchemy.MetaData` instance. `schema` argument is ignored when this is used.
table_names (Optional[List[str]]): A list of table names to load. By default, all tables in the schema are loaded.
chunk_size (int): Number of rows yielded in one batch. SQL Alchemy will create additional internal rows buffer twice the chunk size.
detect_precision_hints (bool): Set column precision and scale hints for supported data types in the target schema based on the columns in the source tables.
This is disabled by default.
defer_table_reflect (bool): Will connect and reflect table schema only when yielding data. Requires table_names to be explicitly passed.
Enable this option when running on Airflow. Available on dlt 0.4.4 and later.
table_adapter_callback: (Callable): Receives each reflected table. May be used to modify the list of columns that will be selected.
Returns:
Iterable[DltResource]: A list of DLT resources for each table to be loaded.
"""

# set up alchemy engine
engine = engine_from_credentials(credentials)
engine.execution_options(stream_results=True)
engine.execution_options(stream_results=True, max_row_buffer=2 * chunk_size)
metadata = metadata or MetaData(schema=schema)

# use provided tables or all tables
if table_names:
tables = [Table(name, metadata, autoload_with=engine) for name in table_names]
tables = [
Table(name, metadata, autoload_with=None if defer_table_reflect else engine)
for name in table_names
]
else:
if defer_table_reflect:
raise ValueError("You must pass table names to defer table reflection")
metadata.reflect(bind=engine)
tables = list(metadata.tables.values())

for table in tables:
if table_adapter_callback and not defer_table_reflect:
table_adapter_callback(table)
yield dlt.resource(
table_rows,
name=table.name,
primary_key=get_primary_key(table),
spec=SqlDatabaseTableConfiguration,
columns=table_to_columns(table) if detect_precision_hints else None,
)(engine, table)


@dlt.common.configuration.with_config(
sections=("sources", "sql_database"), spec=SqlTableResourceConfiguration
)(
engine,
table,
chunk_size,
detect_precision_hints=detect_precision_hints,
defer_table_reflect=defer_table_reflect,
table_adapter_callback=table_adapter_callback,
)


@dlt.sources.config.with_config(
sections=("sources", "sql_database"),
spec=SqlTableResourceConfiguration,
sections_merge_style=ConfigSectionContext.resource_merge_style,
)
def sql_table(
credentials: Union[ConnectionStringCredentials, Engine, str] = dlt.secrets.value,
table: str = dlt.config.value,
schema: Optional[str] = dlt.config.value,
metadata: Optional[MetaData] = None,
incremental: Optional[dlt.sources.incremental[Any]] = None,
chunk_size: int = 1000,
detect_precision_hints: Optional[bool] = dlt.config.value,
defer_table_reflect: Optional[bool] = dlt.config.value,
table_adapter_callback: Callable[[Table], None] = None,
) -> DltResource:
"""
A dlt resource which loads data from an SQL database table using SQLAlchemy.
Expand All @@ -86,22 +113,37 @@ def sql_table(
metadata (Optional[MetaData]): Optional `sqlalchemy.MetaData` instance. If provided, the `schema` argument is ignored.
incremental (Optional[dlt.sources.incremental[Any]]): Option to enable incremental loading for the table.
E.g., `incremental=dlt.sources.incremental('updated_at', pendulum.parse('2022-01-01T00:00:00Z'))`
write_disposition (str): Write disposition of the resource.
chunk_size (int): Number of rows yielded in one batch. SQL Alchemy will create additional internal rows buffer twice the chunk size.
detect_precision_hints (bool): Set column precision and scale hints for supported data types in the target schema based on the columns in the source tables.
This is disabled by default.
defer_table_reflect (bool): Will connect and reflect table schema only when yielding data. Enable this option when running on Airflow. Available
on dlt 0.4.4 and later
table_adapter_callback: (Callable): Receives each reflected table. May be used to modify the list of columns that will be selected.

Returns:
DltResource: The dlt resource for loading data from the SQL database table.
"""
engine = engine_from_credentials(credentials)
engine.execution_options(stream_results=True)
engine.execution_options(stream_results=True, max_row_buffer=2 * chunk_size)
metadata = metadata or MetaData(schema=schema)

table_obj = Table(table, metadata, autoload_with=engine)
table_obj = Table(
table, metadata, autoload_with=None if defer_table_reflect else engine
)
if table_adapter_callback and not defer_table_reflect:
table_adapter_callback(table_obj)

return dlt.resource(
table_rows,
name=table_obj.name,
primary_key=get_primary_key(table_obj),
columns=table_to_columns(table_obj) if detect_precision_hints else None,
)(engine, table_obj, incremental=incremental)
)(
engine,
table_obj,
chunk_size,
incremental=incremental,
detect_precision_hints=detect_precision_hints,
defer_table_reflect=defer_table_reflect,
table_adapter_callback=table_adapter_callback,
)
45 changes: 28 additions & 17 deletions sources/sql_database/helpers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""SQL database source helpers"""

from typing import (
Callable,
cast,
Any,
List,
Expand All @@ -15,12 +16,12 @@
from dlt.sources.credentials import ConnectionStringCredentials
from dlt.common.configuration.specs import BaseConfiguration, configspec
from dlt.common.typing import TDataItem
from .settings import DEFAULT_CHUNK_SIZE
from .schema_types import table_to_columns

from sqlalchemy import Table, create_engine
from sqlalchemy.engine import Engine, Row
from sqlalchemy.engine import Engine
from sqlalchemy.sql import Select
from sqlalchemy import MetaData, Table
from sqlalchemy import Table


class TableLoader:
Expand Down Expand Up @@ -79,22 +80,32 @@ def load_rows(self) -> Iterator[List[TDataItem]]:
def table_rows(
engine: Engine,
table: Table,
chunk_size: int = DEFAULT_CHUNK_SIZE,
chunk_size: int,
incremental: Optional[dlt.sources.incremental[Any]] = None,
detect_precision_hints: bool = False,
defer_table_reflect: bool = False,
table_adapter_callback: Callable[[Table], None] = None,
) -> Iterator[TDataItem]:
"""
A DLT source which loads data from an SQL database using SQLAlchemy.
Resources are automatically created for each table in the schema or from the given list of tables.

Args:
credentials (Union[ConnectionStringCredentials, Engine, str]): Database credentials or an `sqlalchemy.Engine` instance.
schema (Optional[str]): Name of the database schema to load (if different from default).
metadata (Optional[MetaData]): Optional `sqlalchemy.MetaData` instance. `schema` argument is ignored when this is used.
table_names (Optional[List[str]]): A list of table names to load. By default, all tables in the schema are loaded.

Returns:
Iterable[DltResource]: A list of DLT resources for each table to be loaded.
"""
if defer_table_reflect:
table = Table(
table.name, table.metadata, autoload_with=engine, extend_existing=True
)
if table_adapter_callback:
table_adapter_callback(table)
# set the primary_key in the incremental
if incremental and not incremental.primary_key:
primary_key = get_primary_key(table)
if primary_key:
incremental.primary_key = primary_key
# yield empty record to set hints
yield dlt.mark.with_hints(
[],
dlt.mark.make_hints(
primary_key=get_primary_key(table),
columns=table_to_columns(table) if detect_precision_hints else None,
),
)

loader = TableLoader(engine, table, incremental=incremental, chunk_size=chunk_size)
yield from loader.load_rows()

Expand Down
3 changes: 0 additions & 3 deletions sources/sql_database/settings.py

This file was deleted.

66 changes: 59 additions & 7 deletions sources/sql_database_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@

import dlt
from dlt.sources.credentials import ConnectionStringCredentials
from dlt.common import pendulum

from sql_database import sql_database, sql_table
from sql_database import sql_database, sql_table, Table


def load_select_tables_from_database() -> None:
Expand Down Expand Up @@ -63,27 +62,77 @@ def load_entire_database() -> None:


def load_standalone_table_resource() -> None:
"""Load a few known tables with the standalone sql_table resource"""
"""Load a few known tables with the standalone sql_table resource, request full schema and deferred
table reflection"""
pipeline = dlt.pipeline(
pipeline_name="rfam_database", destination="postgres", dataset_name="rfam_data"
pipeline_name="rfam_database",
destination="duckdb",
dataset_name="rfam_data",
full_refresh=True,
)

# Load a table incrementally starting at a given date
# Adding incremental via argument like this makes extraction more efficient
# as only rows newer than the start date are fetched from the table
# we also use `detect_precision_hints` to get detailed column schema
# and defer_table_reflect to reflect schema only during execution
family = sql_table(
credentials="mysql+pymysql://[email protected]:4497/Rfam",
table="family",
incremental=dlt.sources.incremental(
"updated", initial_value=pendulum.DateTime(2022, 1, 1, 0, 0, 0)
"updated",
),
detect_precision_hints=True,
defer_table_reflect=True,
)
# columns will be empty here due to defer_table_reflect set to True
print(family.compute_table_schema())

# Load all data from another table
genome = sql_table(table="genome")
genome = sql_table(
credentials="mysql+pymysql://[email protected]:4497/Rfam",
table="genome",
detect_precision_hints=True,
defer_table_reflect=True,
)

# Run the resources together
info = pipeline.extract([family, genome], write_disposition="merge")
print(info)
# Show inferred columns
print(pipeline.default_schema.to_pretty_yaml())


def select_columns() -> None:
"""Uses table adapter callback to modify list of columns to be selected"""
pipeline = dlt.pipeline(
pipeline_name="rfam_database",
destination="duckdb",
dataset_name="rfam_data_cols",
full_refresh=True,
)

def table_adapter(table: Table) -> None:
print(table.name)
if table.name == "family":
# this is SqlAlchemy table. _columns are writable
# let's drop updated column
table._columns.remove(table.columns["updated"])

family = sql_table(
credentials="mysql+pymysql://[email protected]:4497/Rfam",
table="family",
chunk_size=10,
detect_precision_hints=True,
table_adapter_callback=table_adapter,
)

# also we do not want the whole table, so we add limit to get just one chunk (10 records)
pipeline.run(family.add_limit(1))
# only 10 rows
print(pipeline.last_trace.last_normalize_info)
# no "updated" column in "family" table
print(pipeline.default_schema.to_pretty_yaml())


def reflect_and_connector_x() -> None:
Expand Down Expand Up @@ -159,7 +208,10 @@ def read_sql_x(

if __name__ == "__main__":
# Load selected tables with different settings
load_select_tables_from_database()
# load_select_tables_from_database()

# load a table and select columns
select_columns()

# Load tables with the standalone table resource
# load_standalone_table_resource()
Expand Down
Loading