-
Notifications
You must be signed in to change notification settings - Fork 414
Closed
Labels
questionFurther information is requestedFurther information is requested
Description
dlt version
1.17.1
Describe the problem
When using the newly implemented arrow_stream return_type implemented in connectorx==0.4.4, timestamp type in source becomes date in destination.
Issue was observed with follwing sources:
- oracle
- my_sql
and following destinations: - snowflake
When switching arrow_stream back to return_type="arrow", error disappears.
Expected behavior
Timestamps should be casted to timestamps
Steps to reproduce
Use following code with once arrow_stream and then with arrow return_type.
import os
import logging
import dlt
from dlt.sources.sql_database import sql_database, sql_table, Table
from dlt.pipeline.exceptions import PipelineStepFailed
from dlt.sources.credentials import ConnectionStringCredentials
import pyarrow as pa
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Global config
ORACLE_CONN_STR = "mysql://[email protected]:4497/Rfam"
# Table config with SCD2 PM keys
TABLE_CONFIG = {
"family": {"key": ["rfam_id"]},
"taxonomy": {"key": ["ncbi_id"]},
}
# function to filter data
def query_adapter_callback(query, table):
columns = [c for c in table.c if c.name.upper() != "TAX_STRING" and c.name.upper() != "RELEASE_NUMBER"]
query = query.with_only_columns(*columns) # Pass columns as positional arguments
if table.name == "family":
return query.where(table.c.type=='Gene; rRNA;')
return query
@dlt.source(name="oracle_scd2_tables_1")
def oracle_scd2_parallel_source():
resources = []
for table_name, meta in TABLE_CONFIG.items():
key = meta["key"]
if isinstance(key, str):
key = [key]
table_data = sql_table(
credentials=ConnectionStringCredentials(ORACLE_CONN_STR),
#schema=SCHEMA,
table=table_name,
backend="connectorx",
reflection_level="full_with_precision",
query_adapter_callback=query_adapter_callback,
#table_adapter_callback=arrow_table_adapter,
backend_kwargs={
# "conn": ORACLE_CONN_STR,
# "return_type": "arrow_stream",
"return_type": "arrow",
# "batch_size" : 1_000_000
},
)
@dlt.resource(
name=table_name,
write_disposition={
"disposition": "replace",
# "strategy": "scd2",
# "validity_column_names": ["valid_from", "valid_to"]
},
# primary_key=key,
# merge_key=key,
# columns={
# "valid_from": {"data_type": "timestamp", "timezone" : True , "precision": 9},
# "valid_to": {"data_type": "timestamp", "timezone" : True ,"precision": 9}
# }
)
def scd2_resource(table_data=table_data, table_name = table_name):
for batch in table_data:
yield batch
resources.append(scd2_resource())
return resources
def main():
pipeline = dlt.pipeline(
pipeline_name="o_s_test_sza",
destination="snowflake",
dataset_name="raw",
progress="log"
)
try:
logger.info("Running SCD2 pipeline...")
info = pipeline.run(oracle_scd2_parallel_source())
print(info)
logger.info("Pipeline completed.")
except PipelineStepFailed as step_failed:
print(f"Failed at step: {step_failed.step} with step info {step_failed.step_info}")
raise
if __name__ == "__main__":
main()Operating system
Linux
Runtime environment
Virtual Machine
Python version
3.11
dlt data source
mysql://[email protected]:4497/Rfam
Same problem with our internal Oracle dbs
dlt destination
Snowflake
Other deployment details
No response
Additional information
No response
Metadata
Metadata
Assignees
Labels
questionFurther information is requestedFurther information is requested
Type
Projects
Status
Done