Skip to content

Commit dfa86b6

Browse files
committed
Test arrow load id in extract
1 parent 712b276 commit dfa86b6

File tree

1 file changed

+53
-0
lines changed

1 file changed

+53
-0
lines changed

tests/pipeline/test_arrow_sources.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -505,3 +505,56 @@ def test_empty_arrow(item_type: TPythonTableFormat) -> None:
505505
assert len(pipeline.list_extracted_resources()) == 1
506506
norm_info = pipeline.normalize()
507507
assert norm_info.row_counts["items"] == 0
508+
509+
510+
@pytest.mark.parametrize("item_type", ["pandas", "arrow-table", "arrow-batch"])
511+
def test_extract_adds_dlt_load_id(item_type: TPythonTableFormat) -> None:
512+
os.environ["NORMALIZE__PARQUET_NORMALIZER__ADD_DLT_LOAD_ID"] = "True"
513+
os.environ["DESTINATION__LOADER_FILE_FORMAT"] = "parquet"
514+
515+
item, _, _ = arrow_table_all_data_types(item_type, num_rows=5432)
516+
517+
@dlt.resource
518+
def some_data():
519+
yield item
520+
521+
pipeline: dlt.Pipeline = dlt.pipeline("arrow_" + uniq_id(), destination="duckdb")
522+
info = pipeline.extract(some_data())
523+
524+
load_id = info.loads_ids[0]
525+
jobs = info.load_packages[0].jobs['new_jobs']
526+
extracted_file = [job for job in jobs if "some_data" in job.file_path][0].file_path
527+
528+
with pa.parquet.ParquetFile(extracted_file) as pq:
529+
tbl = pq.read()
530+
assert len(tbl) == 5432
531+
532+
# Extracted file has _dlt_load_id
533+
assert pa.compute.all(pa.compute.equal(tbl["_dlt_load_id"], load_id)).as_py()
534+
535+
536+
def test_extract_json_normalize_parquet_adds_dlt_load_id():
537+
"""Extract jsonl data that gets written to parquet in normalizer. Check that _dlt_load_id is added."""
538+
os.environ["NORMALIZE__PARQUET_NORMALIZER__ADD_DLT_LOAD_ID"] = "True"
539+
540+
rows, _, _ = arrow_table_all_data_types("object", num_rows=1001)
541+
542+
@dlt.resource
543+
def some_data():
544+
yield rows
545+
546+
pipeline: dlt.Pipeline = dlt.pipeline("arrow_" + uniq_id(), destination="duckdb")
547+
548+
pipeline.extract(some_data())
549+
n_info = pipeline.normalize(loader_file_format="parquet")
550+
551+
load_id = n_info.loads_ids[0]
552+
jobs = n_info.load_packages[0].jobs['new_jobs']
553+
normalized_file = [job for job in jobs if "some_data" in job.file_path][0].file_path
554+
555+
with pa.parquet.ParquetFile(normalized_file) as pq:
556+
tbl = pq.read()
557+
assert len(tbl) == 1001
558+
559+
# Normalized file has _dlt_load_id
560+
assert pa.compute.all(pa.compute.equal(tbl["_dlt_load_id"], load_id)).as_py()

0 commit comments

Comments
 (0)