Skip to content

Commit dfa297f

Browse files
committed
improves shapes of traces
1 parent 0c5df47 commit dfa297f

File tree

8 files changed

+197
-41
lines changed

8 files changed

+197
-41
lines changed

dlt/common/pipeline.py

Lines changed: 97 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,22 @@
4141
from dlt.common.schema.typing import TColumnNames, TColumnSchema, TWriteDisposition, TSchemaContract
4242
from dlt.common.source import get_current_pipe_name
4343
from dlt.common.storages.load_storage import LoadPackageInfo
44-
from dlt.common.typing import DictStrAny, REPattern, SupportsHumanize
44+
from dlt.common.typing import DictStrAny, REPattern, StrAny, SupportsHumanize
4545
from dlt.common.jsonpath import delete_matches, TAnyJsonPath
4646
from dlt.common.data_writers.writers import DataWriterMetrics, TLoaderFileFormat
4747
from dlt.common.utils import RowCounts, merge_row_counts
4848

4949

50+
class _StepInfo(NamedTuple):
51+
pipeline: "SupportsPipeline"
52+
loads_ids: List[str]
53+
"""ids of the loaded packages"""
54+
load_packages: List[LoadPackageInfo]
55+
"""Information on loaded packages"""
56+
started_at: datetime.datetime
57+
first_run: bool
58+
59+
5060
class StepInfo(SupportsHumanize):
5161
pipeline: "SupportsPipeline"
5262
loads_ids: List[str]
@@ -56,9 +66,34 @@ class StepInfo(SupportsHumanize):
5666
started_at: datetime.datetime
5767
first_run: bool
5868

69+
def asdict(self) -> DictStrAny:
70+
# to be mixed with NamedTuple
71+
d: DictStrAny = self._asdict() # type: ignore
72+
d["pipeline"] = {"pipeline_name": self.pipeline.pipeline_name}
73+
d["load_packages"] = [package.asdict() for package in self.load_packages]
74+
return d
75+
5976
def __str__(self) -> str:
6077
return self.asstr(verbosity=0)
6178

79+
@staticmethod
80+
def job_metrics_asdict(
81+
job_metrics: Dict[str, DataWriterMetrics], key_name: str = "job_id", extend: StrAny = None
82+
) -> List[DictStrAny]:
83+
jobs = []
84+
for job_id, metrics in job_metrics.items():
85+
d = metrics._asdict()
86+
if extend:
87+
d.update(extend)
88+
d[key_name] = job_id
89+
jobs.append(d)
90+
return jobs
91+
92+
def _astuple(self) -> _StepInfo:
93+
return _StepInfo(
94+
self.pipeline, self.loads_ids, self.load_packages, self.started_at, self.first_run
95+
)
96+
6297

6398
class ExtractDataInfo(TypedDict):
6499
name: str
@@ -82,6 +117,7 @@ class ExtractMetrics(TypedDict):
82117
class _ExtractInfo(NamedTuple):
83118
pipeline: "SupportsPipeline"
84119
metrics: Dict[str, List[ExtractMetrics]]
120+
"""Metrics per load id. If many sources with the same name were extracted, there will be more than 1 element in the list"""
85121
extract_data_info: List[ExtractDataInfo]
86122
loads_ids: List[str]
87123
"""ids of the loaded packages"""
@@ -96,12 +132,46 @@ class ExtractInfo(StepInfo, _ExtractInfo):
96132

97133
def asdict(self) -> DictStrAny:
98134
"""A dictionary representation of ExtractInfo that can be loaded with `dlt`"""
99-
d = self._asdict()
100-
d["pipeline"] = {"pipeline_name": self.pipeline.pipeline_name}
101-
d["load_packages"] = [package.asdict() for package in self.load_packages]
102-
# TODO: transform and leave metrics when we have them implemented
103-
# d.pop("metrics")
135+
d = super().asdict()
104136
d.pop("extract_data_info")
137+
# transform metrics
138+
d.pop("metrics")
139+
load_metrics: Dict[str, List[Any]] = {
140+
"job_metrics": [],
141+
"table_metrics": [],
142+
"resource_metrics": [],
143+
"dag": [],
144+
"hints": [],
145+
}
146+
for load_id, metrics_list in self.metrics.items():
147+
for idx, metrics in enumerate(metrics_list):
148+
extend = {"load_id": load_id, "extract_idx": idx}
149+
load_metrics["job_metrics"].extend(
150+
self.job_metrics_asdict(metrics["job_metrics"], extend=extend)
151+
)
152+
load_metrics["table_metrics"].extend(
153+
self.job_metrics_asdict(
154+
metrics["table_metrics"], key_name="table_name", extend=extend
155+
)
156+
)
157+
load_metrics["resource_metrics"].extend(
158+
self.job_metrics_asdict(
159+
metrics["resource_metrics"], key_name="resource_name", extend=extend
160+
)
161+
)
162+
load_metrics["dag"].extend(
163+
[
164+
{**extend, "parent_name": edge[0], "resource_name": edge[1]}
165+
for edge in metrics["dag"]
166+
]
167+
)
168+
load_metrics["hints"].extend(
169+
[
170+
{**extend, "resource_name": name, **hints}
171+
for name, hints in metrics["hints"].items()
172+
]
173+
)
174+
d.update(load_metrics)
105175
return d
106176

107177
def asstr(self, verbosity: int = 0) -> str:
@@ -143,19 +213,25 @@ def row_counts(self) -> RowCounts:
143213

144214
def asdict(self) -> DictStrAny:
145215
"""A dictionary representation of NormalizeInfo that can be loaded with `dlt`"""
146-
d = self._asdict()
147-
d["pipeline"] = {"pipeline_name": self.pipeline.pipeline_name}
148-
d["load_packages"] = [package.asdict() for package in self.load_packages]
149-
# list representation creates a nice table
150-
d["row_counts"] = []
151-
for load_id, metrics in self.metrics.items():
152-
assert len(metrics) == 1, "Cannot deal with more than 1 normalize metric per load_id"
153-
d["row_counts"].extend(
154-
[
155-
{"load_id": load_id, "table_name": k, "count": v.items_count}
156-
for k, v in metrics[0]["table_metrics"].items()
157-
]
158-
)
216+
d = super().asdict()
217+
# transform metrics
218+
d.pop("metrics")
219+
load_metrics: Dict[str, List[Any]] = {
220+
"job_metrics": [],
221+
"table_metrics": [],
222+
}
223+
for load_id, metrics_list in self.metrics.items():
224+
for idx, metrics in enumerate(metrics_list):
225+
extend = {"load_id": load_id, "extract_idx": idx}
226+
load_metrics["job_metrics"].extend(
227+
self.job_metrics_asdict(metrics["job_metrics"], extend=extend)
228+
)
229+
load_metrics["table_metrics"].extend(
230+
self.job_metrics_asdict(
231+
metrics["table_metrics"], key_name="table_name", extend=extend
232+
)
233+
)
234+
d.update(load_metrics)
159235
return d
160236

161237
def asstr(self, verbosity: int = 0) -> str:
@@ -192,10 +268,7 @@ class LoadInfo(StepInfo, _LoadInfo):
192268

193269
def asdict(self) -> DictStrAny:
194270
"""A dictionary representation of LoadInfo that can be loaded with `dlt`"""
195-
d = self._asdict()
196-
d["pipeline"] = {"pipeline_name": self.pipeline.pipeline_name}
197-
d["load_packages"] = [package.asdict() for package in self.load_packages]
198-
return d
271+
return super().asdict()
199272

200273
def asstr(self, verbosity: int = 0) -> str:
201274
msg = f"Pipeline {self.pipeline.pipeline_name} completed in "
@@ -273,7 +346,7 @@ def __init__(self) -> None:
273346

274347
def _step_info_start_load_id(self, load_id: str) -> None:
275348
self._current_load_id = load_id
276-
self._load_id_metrics[load_id] = []
349+
self._load_id_metrics.setdefault(load_id, [])
277350

278351
def _step_info_complete_load_id(self, load_id: str, metrics: TStepMetrics) -> None:
279352
assert self._current_load_id == load_id, (

dlt/common/storages/data_item_storage.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,20 @@ def close_writers(self, load_id: str) -> None:
7676
writer.close()
7777

7878
def closed_files(self, load_id: str) -> List[DataWriterMetrics]:
79+
"""Return metrics for all fully processed (closed) files"""
7980
files: List[DataWriterMetrics] = []
8081
for name, writer in self.buffered_writers.items():
8182
if name.startswith(load_id):
8283
files.extend(writer.closed_files)
8384

8485
return files
8586

87+
def remove_closed_files(self, load_id: str) -> None:
88+
"""Remove metrics for closed files in a given `load_id`"""
89+
for name, writer in self.buffered_writers.items():
90+
if name.startswith(load_id):
91+
writer.closed_files.clear()
92+
8693
def _write_temp_job_file(
8794
self,
8895
load_id: str,

dlt/destinations/impl/filesystem/filesystem.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -145,15 +145,15 @@ def initialize_storage(self, truncate_tables: Iterable[str] = None) -> None:
145145
logger.info(f"Will truncate tables in {truncate_dir}")
146146
try:
147147
all_files = self.fs_client.ls(truncate_dir, detail=False, refresh=True)
148-
# logger.info(f"Found {len(all_files)} CANDIDATE files in {truncate_dir}")
149-
# print(f"in truncate dir {truncate_dir}: {all_files}")
148+
logger.info(f"Found {len(all_files)} CANDIDATE files in {truncate_dir}")
149+
print(f"in truncate dir {truncate_dir}: {all_files}")
150150
for item in all_files:
151151
# check every file against all the prefixes
152152
for search_prefix in truncate_prefixes:
153153
if item.startswith(search_prefix):
154154
# NOTE: deleting in chunks on s3 does not raise on access denied, file non existing and probably other errors
155155
# logger.info(f"DEL {item}")
156-
# print(f"DEL {item}")
156+
print(f"DEL {item}")
157157
self.fs_client.rm(item)
158158
except FileNotFoundError:
159159
logger.info(

dlt/extract/extract.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -217,9 +217,10 @@ def _compute_metrics(self, load_id: str, source: DltSource) -> ExtractMetrics:
217217
hints[name] = get_callable_name(hint)
218218
continue
219219
if name == "columns":
220-
hints[name] = yaml.dump(
221-
hint, allow_unicode=True, default_flow_style=False, sort_keys=False
222-
)
220+
if hint:
221+
hints[name] = yaml.dump(
222+
hint, allow_unicode=True, default_flow_style=False, sort_keys=False
223+
)
223224
continue
224225
hints[name] = hint
225226

@@ -307,6 +308,9 @@ def _extract_single_source(
307308
self.extract_storage.close_writers(load_id)
308309
# gather metrics
309310
self._step_info_complete_load_id(load_id, self._compute_metrics(load_id, source))
311+
# remove the metrics of files processed in this extract run
312+
# NOTE: there may be more than one extract run per load id: ie. the resource and then dlt state
313+
self.extract_storage.remove_closed_files(load_id)
310314

311315
def extract(
312316
self,

dlt/extract/storage.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,10 @@ def closed_files(self, load_id: str) -> List[DataWriterMetrics]:
9494
files.extend(storage.closed_files(load_id))
9595
return files
9696

97+
def remove_closed_files(self, load_id: str) -> None:
98+
for storage in self._item_storages.values():
99+
storage.remove_closed_files(load_id)
100+
97101
def commit_new_load_package(self, load_id: str, schema: Schema) -> None:
98102
self.new_packages.save_schema(load_id, schema)
99103
self.storage.rename_tree(

dlt/pipeline/state_sync.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,6 @@ def load_state_from_destination(pipeline_name: str, client: WithStateSync) -> TP
106106
if not state:
107107
return None
108108
s = decompress_state(state.state)
109-
print(f"BEFORE M {s}")
110109
return migrate_state(pipeline_name, s, s["_state_engine_version"], STATE_ENGINE_VERSION)
111110

112111

dlt/pipeline/trace.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,11 @@ def asdict(self) -> DictStrAny:
9898
d = self._asdict()
9999
if self.step_info:
100100
# name property depending on step name - generates nicer data
101-
d[f"{self.step}_info"] = d.pop("step_info")
101+
d[f"{self.step}_info"] = step_info_dict = d.pop("step_info").asdict()
102+
d["step_info"] = {}
103+
# take only the base keys
104+
for prop in self.step_info._astuple()._asdict():
105+
d["step_info"][prop] = step_info_dict.pop(prop)
102106
# replace the attributes in exception traces with json dumps
103107
if self.exception_traces:
104108
# do not modify original traces
@@ -161,7 +165,8 @@ def last_pipeline_step_trace(self, step_name: TPipelineStep) -> PipelineStepTrac
161165
def asdict(self) -> DictStrAny:
162166
"""A dictionary representation of PipelineTrace that can be loaded with `dlt`"""
163167
d = self._asdict()
164-
d["steps"] = [step.asdict() for step in self.steps]
168+
# run step is the same as load step
169+
d["steps"] = [step.asdict() for step in self.steps] # if step.step != "run"
165170
return d
166171

167172
@property

0 commit comments

Comments
 (0)