1- from typing import Optional , Set
1+ from typing import cast , Set , List , Dict , Any
22import os
33import tempfile
44from datetime import datetime
55from pathlib import Path
6+ import re
67
78import marimo as mo
89import pyarrow
4546 get_example_query_for_dataset ,
4647 _get_steps_data_and_status ,
4748 _get_migrations_count ,
49+ build_pipeline_execution_visualization ,
50+ _collect_load_packages_from_trace ,
51+ load_package_status_labels ,
4852 TPipelineRunStatus ,
4953 TVisualPipelineStep ,
5054)
5357 SUCCESS_PIPELINE_DUCKDB ,
5458 SUCCESS_PIPELINE_FILESYSTEM ,
5559 EXTRACT_EXCEPTION_PIPELINE ,
60+ NORMALIZE_EXCEPTION_PIPELINE ,
5661 NEVER_RAN_PIPELINE ,
5762 LOAD_EXCEPTION_PIPELINE ,
5863 NO_DESTINATION_PIPELINE ,
@@ -233,7 +238,7 @@ def test_pipeline_details(pipeline, temp_pipelines_dir):
233238 assert isinstance (result , list )
234239 if pipeline .pipeline_name in PIPELINES_WITH_LOAD :
235240 assert len (result ) == 9
236- elif pipeline .pipeline_name == LOAD_EXCEPTION_PIPELINE :
241+ elif pipeline .pipeline_name in [ LOAD_EXCEPTION_PIPELINE , NORMALIZE_EXCEPTION_PIPELINE ] :
237242 # custom destination does not support remote data info
238243 assert len (result ) == 8
239244 else :
@@ -253,10 +258,10 @@ def test_pipeline_details(pipeline, temp_pipelines_dir):
253258 else :
254259 assert details_dict ["destination" ] == "duckdb (dlt.destinations.duckdb)"
255260 assert details_dict ["dataset_name" ] == pipeline .dataset_name
256- if (
257- pipeline . pipeline_name in PIPELINES_WITH_LOAD
258- or pipeline . pipeline_name == LOAD_EXCEPTION_PIPELINE
259- ) :
261+ if pipeline . pipeline_name in PIPELINES_WITH_LOAD or pipeline . pipeline_name in [
262+ LOAD_EXCEPTION_PIPELINE ,
263+ NORMALIZE_EXCEPTION_PIPELINE ,
264+ ] :
260265 assert details_dict ["schemas" ].startswith ("fruitshop" )
261266 else :
262267 assert "schemas" not in details_dict
@@ -502,6 +507,10 @@ def test_trace(pipeline: dlt.Pipeline):
502507 if pipeline .pipeline_name == EXTRACT_EXCEPTION_PIPELINE :
503508 assert len (result ) == 1
504509 assert result [0 ]["step" ] == "extract"
510+ elif pipeline .pipeline_name == NORMALIZE_EXCEPTION_PIPELINE :
511+ assert len (result ) == 2
512+ assert result [0 ]["step" ] == "extract"
513+ assert result [1 ]["step" ] == "normalize"
505514 else :
506515 assert len (result ) == 3
507516 assert result [0 ]["step" ] == "extract"
@@ -781,10 +790,7 @@ def test_get_steps_data_and_status(
781790
782791@pytest .mark .parametrize (
783792 "pipeline" ,
784- [
785- SUCCESS_PIPELINE_DUCKDB ,
786- SUCCESS_PIPELINE_FILESYSTEM ,
787- ],
793+ PIPELINES_WITH_LOAD ,
788794 indirect = True ,
789795)
790796def test_get_migrations_count (pipeline : dlt .Pipeline ) -> None :
@@ -794,15 +800,122 @@ def test_get_migrations_count(pipeline: dlt.Pipeline) -> None:
794800 assert migrations_count == 1
795801
796802 # Trigger multiple migrations
797- pipeline .extract ([{"id" : 1 , "name" : "test" }], table_name = "migration_table" )
798- pipeline .extract (
799- [{"id" : 2 , "name" : "test2" , "new_column" : "value" }], table_name = "migration_table"
800- )
803+ pipeline .extract ([{"id" : 1 , "name" : "test" }], table_name = "my_table" )
804+ pipeline .extract ([{"id" : 2 , "name" : "test2" , "new_column" : "value" }], table_name = "my_table" )
801805 pipeline .extract (
802806 [{"id" : 3 , "name" : "test3" , "new_column" : "value" , "another_column" : 100 }],
803- table_name = "migration_table " ,
807+ table_name = "my_table " ,
804808 )
805809 pipeline .normalize ()
806810 pipeline .load ()
807811 migrations_count = _get_migrations_count (pipeline .last_trace .last_load_info )
808812 assert migrations_count == 3
813+
814+
815+ @pytest .mark .parametrize (
816+ "pipeline, expected_steps, expected_status" ,
817+ [
818+ (SUCCESS_PIPELINE_DUCKDB , {"extract" , "normalize" , "load" }, "succeeded" ),
819+ (SUCCESS_PIPELINE_FILESYSTEM , {"extract" , "normalize" , "load" }, "succeeded" ),
820+ (EXTRACT_EXCEPTION_PIPELINE , {"extract" }, "failed" ),
821+ (LOAD_EXCEPTION_PIPELINE , {"extract" , "normalize" , "load" }, "failed" ),
822+ ],
823+ indirect = ["pipeline" ],
824+ )
825+ def test_build_pipeline_execution_visualization (
826+ pipeline : dlt .Pipeline ,
827+ expected_steps : Set [TVisualPipelineStep ],
828+ expected_status : TPipelineRunStatus ,
829+ ) -> None :
830+ """Test overall pipeline execution visualization logic"""
831+
832+ trace = pipeline .last_trace
833+
834+ html = build_pipeline_execution_visualization (trace )
835+ html_str = str (html .text )
836+
837+ assert f"Last execution ID: <strong>{ trace .transaction_id [:8 ]} </strong>" in html_str
838+ total_time_match = re .search (
839+ r"<div>Total time: <strong>([\d.]+)(ms|s)?</strong></div>" , html_str
840+ )
841+ assert total_time_match is not None
842+
843+ status_badge = f"""
844+ <div style="
845+ background-color: var(--{ 'green' if expected_status == "succeeded" else 'red' } -bg);
846+ color: var(--{ 'green' if expected_status == "succeeded" else 'red' } -text);
847+ padding: 6px 16px;
848+ border-radius: 6px;
849+ ">
850+ <strong>{ expected_status } </strong>
851+ </div>
852+ """
853+ assert status_badge in html_str
854+
855+ migrations_count = _get_migrations_count (trace .last_load_info ) if trace .last_load_info else 0
856+ migration_badge = f"""
857+ <div style="
858+ background-color: var(--yellow-bg);
859+ color: var(--yellow-text);
860+ padding: 6px 16px;
861+ border-radius: 6px;
862+ ">
863+ <strong>{ migrations_count } dataset migration(s)</strong>
864+ </div>"""
865+ if migrations_count != 0 :
866+ assert migration_badge in html_str
867+ else :
868+ assert migration_badge not in html_str
869+
870+ steps_data , _ = _get_steps_data_and_status (trace .steps )
871+ for step_data in steps_data :
872+ duration_pattern = re .search (rf"{ step_data .step .capitalize ()} \s+([\d.]+)(ms|s)?" , html_str )
873+ assert duration_pattern is not None
874+
875+ if "extract" in expected_steps :
876+ assert "var(--dlt-color-lime)" in html_str
877+ if "normalize" in expected_steps :
878+ assert "var(--dlt-color-aqua)" in html_str
879+ if "load" in expected_steps :
880+ assert "var(--dlt-color-pink)" in html_str
881+
882+
883+ @pytest .mark .parametrize (
884+ "pipeline" ,
885+ [
886+ SUCCESS_PIPELINE_DUCKDB ,
887+ SUCCESS_PIPELINE_FILESYSTEM ,
888+ EXTRACT_EXCEPTION_PIPELINE ,
889+ NORMALIZE_EXCEPTION_PIPELINE ,
890+ LOAD_EXCEPTION_PIPELINE ,
891+ ],
892+ indirect = ["pipeline" ],
893+ )
894+ def test_collect_load_packages_from_trace (
895+ pipeline : dlt .Pipeline ,
896+ ) -> None :
897+ """Test getting load package status labels from trace"""
898+
899+ trace = pipeline .last_trace
900+ table = load_package_status_labels (trace )
901+
902+ list_of_load_package_info = cast (List [Dict [str , Any ]], table .data )
903+
904+ if pipeline .pipeline_name in ["success_pipeline_duckdb" , "success_pipeline_filesystem" ]:
905+ assert len (list_of_load_package_info ) == 2
906+ assert all (
907+ "loaded" in str (load_package_info ["status" ].text )
908+ for load_package_info in list_of_load_package_info
909+ )
910+
911+ elif pipeline .pipeline_name == "extract_exception_pipeline" :
912+ assert len (list_of_load_package_info ) == 1
913+ assert "new" in str (list_of_load_package_info [0 ]["status" ].text )
914+
915+ elif pipeline .pipeline_name == "load_exception_pipeline" :
916+ assert len (list_of_load_package_info ) == 1
917+ assert "aborted" in str (list_of_load_package_info [0 ]["status" ].text )
918+
919+ elif pipeline .pipeline_name == "normalize_exception_pipeline" :
920+ assert len (list_of_load_package_info ) == 1
921+ assert "pending" in str (list_of_load_package_info [0 ]["status" ].text )
0 commit comments