Skip to content

Commit 1af9e7d

Browse files
committed
Add dev_mode support to pipeline state management
- Introduced `dev_mode` attribute in TPipelineLocalState to track development mode status. - Implemented logic to recreate initial pipeline state when toggling between dev and non-dev modes. - Updated dataset name generation to ensure consistency based on the current mode. - Added tests to verify dataset name behavior in different modes.
1 parent f0349d7 commit 1af9e7d

File tree

4 files changed

+32
-7
lines changed

4 files changed

+32
-7
lines changed

dlt/common/pipeline.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -479,6 +479,8 @@ class TPipelineLocalState(TypedDict, total=False):
479479
"""Run dir when pipeline was instantiated for a first time, defaults to cwd on OSS run context"""
480480
last_run_context: Optional[TLastRunContext]
481481
"""Context from the last successful pipeline run or sync"""
482+
dev_mode: bool
483+
"""Indicates whether previous run used dev_mode; used to reset state on dev->non-dev toggle"""
482484

483485

484486
class TPipelineState(TVersionedState, total=False):

dlt/pipeline/pipeline.py

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1224,6 +1224,14 @@ def _init_working_dir(self, pipeline_name: str, pipelines_dir: str) -> None:
12241224
if self.dev_mode:
12251225
self._wipe_working_folder()
12261226

1227+
def _recreate_initial_state(self) -> None:
1228+
# replace the injected state dict with a fresh default
1229+
injected_state = self._container[StateInjectableContext].state
1230+
injected_state.clear()
1231+
injected_state.update(default_pipeline_state())
1232+
self._set_dataset_name(None)
1233+
self._save_state(injected_state)
1234+
12271235
def _configure(
12281236
self, import_schema_path: str, export_schema_path: str, must_attach_to_local_pipeline: bool
12291237
) -> None:
@@ -1247,14 +1255,17 @@ def _configure(
12471255
)
12481256

12491257
self.must_attach_to_local_pipeline = must_attach_to_local_pipeline
1250-
# attach to pipeline if folder exists and contains state
1251-
if has_state:
1258+
prev_dev_mode: bool = self.get_local_state_val("dev_mode") if has_state else False
1259+
should_recreate_pipeline: bool = prev_dev_mode and not self.dev_mode
1260+
self.set_local_state_val("dev_mode", self.dev_mode) # create schema storage
1261+
if has_state and not should_recreate_pipeline:
12521262
self._attach_pipeline()
12531263
else:
1254-
# this will erase the existing working folder
12551264
self._create_pipeline()
12561265

1257-
# create schema storage
1266+
if should_recreate_pipeline:
1267+
self._recreate_initial_state()
1268+
12581269
self._schema_storage = LiveSchemaStorage(self._schema_storage_config, makedirs=True)
12591270

12601271
def _create_pipeline(self) -> None:
@@ -1521,10 +1532,12 @@ def _make_dataset_name(
15211532
) -> str:
15221533
"""Generates dataset name for the pipeline based on `new_dataset_name`
15231534
1. if name is not provided, default name is created
1524-
2. for destinations that do not need dataset names, def. name is not created
1525-
3. we add serial number in dev mode
1526-
4. we apply layout from pipeline config if present
1535+
2. if destination is not provided, and no dataset name is provided, default name is created
1536+
3. for destinations that do not need dataset names, def. name is not created
1537+
4. we add serial number in dev mode
1538+
5. we apply layout from pipeline config if present
15271539
"""
1540+
# TODO: update this function to differentiate between the new_dataset_name parameter and the dataset_name property created by the function
15281541
if not new_dataset_name:
15291542
# dataset name is required but not provided - generate the default now
15301543
destination_needs_dataset = False
@@ -1534,6 +1547,9 @@ def _make_dataset_name(
15341547
if destination_needs_dataset:
15351548
new_dataset_name = self.pipeline_name + self.DEFAULT_DATASET_SUFFIX
15361549

1550+
if not destination and not new_dataset_name:
1551+
new_dataset_name = self.pipeline_name + self.DEFAULT_DATASET_SUFFIX
1552+
15371553
if not new_dataset_name:
15381554
return new_dataset_name
15391555

dlt/pipeline/state_sync.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,5 +144,6 @@ def default_pipeline_state() -> TPipelineState:
144144
"first_run": True,
145145
# keep the initial run dir when the pipeline was created
146146
"initial_cwd": os.path.abspath(dlt.current.run_context().local_dir),
147+
"dev_mode": False,
147148
},
148149
}

tests/pipeline/test_pipeline.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,12 @@ def test_pipeline_configuration_named_section(environment) -> None:
278278
== environment["PIPELINES__NAMED__IMPORT_SCHEMA_PATH"]
279279
)
280280

281+
def test_dataset_name_consistency_with_dev_mode() -> None:
282+
p = dlt.pipeline(dev_mode=True, destination="filesystem")
283+
assert p.dataset_name.endswith(p._pipeline_instance_id)
284+
# restore this pipeline
285+
r_p = dlt.attach()
286+
assert not r_p.dataset_name.endswith(p._pipeline_instance_id)
281287

282288
def test_run_dev_mode_default_dataset() -> None:
283289
p = dlt.pipeline(dev_mode=True, destination="filesystem")

0 commit comments

Comments
 (0)