-
Notifications
You must be signed in to change notification settings - Fork 414
Description
dlt version
1.17.1
Describe the problem
When using the newly implemented arrow_stream return_type implemented in connectorx==0.4.4, even after mentioning configs to file_max bytes dlt uses 1 large file at normalization and load set
Issue was observed with follwing sources:
oracle
my_sql
and following destinations:
snowflake
Expected behavior
dlt should create multiple files at the step of normalization and load
Steps to reproduce
use the below code and configs
`import os
import logging
import dlt
from dlt.sources.sql_database import sql_database, sql_table, Table
from dlt.pipeline.exceptions import PipelineStepFailed
from dlt.sources.credentials import ConnectionStringCredentials
import pyarrow as pa
Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(name)
Global config
ORACLE_CONN_STR = "mysql://[email protected]:4497/Rfam"
#SCHEMA = "speed_test"
Table config with SCD2 PM keys
TABLE_CONFIG = {
"family": {"key": ["rfam_id"]},
"taxonomy": {"key": ["ncbi_id"]},
}
@dlt.source(name="oracle_scd2_tables_1")
def oracle_scd2_parallel_source():
resources = []
for table_name, meta in TABLE_CONFIG.items():
key = meta["key"]
if isinstance(key, str):
key = [key]
table_data = sql_table(
credentials=ConnectionStringCredentials(ORACLE_CONN_STR),
#schema=SCHEMA,
table=table_name,
chunk_size=100000,
backend="connectorx",
reflection_level="full_with_precision",
backend_kwargs={
"conn": ORACLE_CONN_STR,
"return_type": "arrow_stream",
"chunk_size": 100000
},
)
@dlt.resource(
name=table_name,
write_disposition={
"disposition": "merge",
"strategy": "scd2",
"validity_column_names": ["valid_from", "valid_to"]
},
primary_key=key,
merge_key=key,
)
def scd2_resource(table_data=table_data, table_name=table_name):
for batch in table_data:
yield batch
resources.append(scd2_resource())
return resources
def main():
pipeline = dlt.pipeline(
pipeline_name="o_s_test",
destination="snowflake",
dataset_name="private",
progress="log"
)
try:
logger.info("Running SCD2 pipeline...")
info = pipeline.run(oracle_scd2_parallel_source())
print(info)
logger.info("Pipeline completed.")
except PipelineStepFailed as step_failed:
print(f"Failed at step: {step_failed.step} with step info {step_failed.step_info}")
raise
if name == "main":
main()
`
below is the config
`[normalize]
workers = 2
max_parallel_items = 2
start_method = "spawn"
loader_file_format = "parquet"
[normalize.data_writer]
file_max_items = 200
file_max_bytes = 200
buffer_max_items = 200
disable_compression = true
[normalize.parquet_normalizer]
add_dlt_load_id = true
add_dlt_id = true
max_items_per_file = 200
flavor = "spark"
compression = "snappy"
row_group_size = 200`
Operating system
Linux
Runtime environment
Virtual Machine
Python version
3.11
dlt data source
mysql://[email protected]:4497/Rfam
dlt destination
Snowflake
Other deployment details
No response
Additional information
No response
Metadata
Metadata
Assignees
Labels
Type
Projects
Status