Skip to content

Commit 4a240ca

Browse files
committed
adds finished_at to step info
1 parent 44250c7 commit 4a240ca

File tree

8 files changed

+96
-46
lines changed

8 files changed

+96
-46
lines changed

dlt/common/pipeline.py

Lines changed: 70 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
from dlt.common.schema.typing import TColumnNames, TColumnSchema, TWriteDisposition, TSchemaContract
4242
from dlt.common.source import get_current_pipe_name
4343
from dlt.common.storages.load_storage import LoadPackageInfo
44+
from dlt.common.time import ensure_pendulum_datetime, precise_time
4445
from dlt.common.typing import DictStrAny, REPattern, StrAny, SupportsHumanize
4546
from dlt.common.jsonpath import delete_matches, TAnyJsonPath
4647
from dlt.common.data_writers.writers import DataWriterMetrics, TLoaderFileFormat
@@ -53,24 +54,61 @@ class _StepInfo(NamedTuple):
5354
"""ids of the loaded packages"""
5455
load_packages: List[LoadPackageInfo]
5556
"""Information on loaded packages"""
56-
started_at: datetime.datetime
5757
first_run: bool
58+
started_at: datetime.datetime
59+
finished_at: datetime.datetime
60+
61+
62+
class StepMetrics(TypedDict):
63+
"""Metrics for particular package processed in particular pipeline step"""
64+
65+
started_at: datetime.datetime
66+
"""Start of package processing"""
67+
finished_at: datetime.datetime
68+
"""End of package processing"""
69+
5870

71+
TStepMetricsCo = TypeVar("TStepMetricsCo", bound=StepMetrics, covariant=True)
5972

60-
class StepInfo(SupportsHumanize):
73+
74+
class StepInfo(SupportsHumanize, Generic[TStepMetricsCo]):
6175
pipeline: "SupportsPipeline"
76+
metrics: Dict[str, List[TStepMetricsCo]]
77+
"""Metrics per load id. If many sources with the same name were extracted, there will be more than 1 element in the list"""
6278
loads_ids: List[str]
6379
"""ids of the loaded packages"""
6480
load_packages: List[LoadPackageInfo]
6581
"""Information on loaded packages"""
66-
started_at: datetime.datetime
6782
first_run: bool
6883

84+
@property
85+
def started_at(self) -> datetime.datetime:
86+
"""Returns the earliest start date of all collected metrics"""
87+
if not self.metrics:
88+
return None
89+
try:
90+
return min(m["started_at"] for l_m in self.metrics.values() for m in l_m)
91+
except ValueError:
92+
return None
93+
94+
@property
95+
def finished_at(self) -> datetime.datetime:
96+
"""Returns the latest end date of all collected metrics"""
97+
if not self.metrics:
98+
return None
99+
try:
100+
return max(m["finished_at"] for l_m in self.metrics.values() for m in l_m)
101+
except ValueError:
102+
return None
103+
69104
def asdict(self) -> DictStrAny:
70105
# to be mixed with NamedTuple
71106
d: DictStrAny = self._asdict() # type: ignore
72107
d["pipeline"] = {"pipeline_name": self.pipeline.pipeline_name}
73108
d["load_packages"] = [package.asdict() for package in self.load_packages]
109+
if self.metrics:
110+
d["started_at"] = self.started_at
111+
d["finished_at"] = self.finished_at
74112
return d
75113

76114
def __str__(self) -> str:
@@ -91,7 +129,12 @@ def job_metrics_asdict(
91129

92130
def _astuple(self) -> _StepInfo:
93131
return _StepInfo(
94-
self.pipeline, self.loads_ids, self.load_packages, self.started_at, self.first_run
132+
self.pipeline,
133+
self.loads_ids,
134+
self.load_packages,
135+
self.first_run,
136+
self.started_at,
137+
self.finished_at,
95138
)
96139

97140

@@ -100,7 +143,7 @@ class ExtractDataInfo(TypedDict):
100143
data_type: str
101144

102145

103-
class ExtractMetrics(TypedDict):
146+
class ExtractMetrics(StepMetrics):
104147
schema_name: str
105148
job_metrics: Dict[str, DataWriterMetrics]
106149
"""Metrics collected per job id during writing of job file"""
@@ -115,19 +158,19 @@ class ExtractMetrics(TypedDict):
115158

116159

117160
class _ExtractInfo(NamedTuple):
161+
"""NamedTuple cannot be part of the derivation chain so we must re-declare all fields to use it as mixin later"""
162+
118163
pipeline: "SupportsPipeline"
119164
metrics: Dict[str, List[ExtractMetrics]]
120-
"""Metrics per load id. If many sources with the same name were extracted, there will be more than 1 element in the list"""
121165
extract_data_info: List[ExtractDataInfo]
122166
loads_ids: List[str]
123167
"""ids of the loaded packages"""
124168
load_packages: List[LoadPackageInfo]
125169
"""Information on loaded packages"""
126-
started_at: datetime.datetime
127170
first_run: bool
128171

129172

130-
class ExtractInfo(StepInfo, _ExtractInfo):
173+
class ExtractInfo(StepInfo[ExtractMetrics], _ExtractInfo): # type: ignore[misc]
131174
"""A tuple holding information on extracted data items. Returned by pipeline `extract` method."""
132175

133176
def asdict(self) -> DictStrAny:
@@ -178,7 +221,10 @@ def asstr(self, verbosity: int = 0) -> str:
178221
return ""
179222

180223

181-
class NormalizeMetrics(TypedDict):
224+
# reveal_type(ExtractInfo)
225+
226+
227+
class NormalizeMetrics(StepMetrics):
182228
job_metrics: Dict[str, DataWriterMetrics]
183229
"""Metrics collected per job id during writing of job file"""
184230
table_metrics: Dict[str, DataWriterMetrics]
@@ -192,11 +238,10 @@ class _NormalizeInfo(NamedTuple):
192238
"""ids of the loaded packages"""
193239
load_packages: List[LoadPackageInfo]
194240
"""Information on loaded packages"""
195-
started_at: datetime.datetime
196241
first_run: bool
197242

198243

199-
class NormalizeInfo(StepInfo, _NormalizeInfo):
244+
class NormalizeInfo(StepInfo[NormalizeMetrics], _NormalizeInfo): # type: ignore[misc]
200245
"""A tuple holding information on normalized data items. Returned by pipeline `normalize` method."""
201246

202247
@property
@@ -244,8 +289,13 @@ def asstr(self, verbosity: int = 0) -> str:
244289
return msg
245290

246291

292+
class LoadMetrics(StepMetrics):
293+
pass
294+
295+
247296
class _LoadInfo(NamedTuple):
248297
pipeline: "SupportsPipeline"
298+
metrics: Dict[str, List[LoadMetrics]]
249299
destination_type: str
250300
destination_displayable_credentials: str
251301
destination_name: str
@@ -259,11 +309,10 @@ class _LoadInfo(NamedTuple):
259309
"""ids of the loaded packages"""
260310
load_packages: List[LoadPackageInfo]
261311
"""Information on loaded packages"""
262-
started_at: datetime.datetime
263312
first_run: bool
264313

265314

266-
class LoadInfo(StepInfo, _LoadInfo):
315+
class LoadInfo(StepInfo[LoadMetrics], _LoadInfo): # type: ignore[misc]
267316
"""A tuple holding the information on recently loaded packages. Returned by pipeline `run` and `load` methods"""
268317

269318
def asdict(self) -> DictStrAny:
@@ -329,32 +378,38 @@ def __str__(self) -> str:
329378
return self.asstr(verbosity=1)
330379

331380

332-
TStepMetrics = TypeVar("TStepMetrics")
333-
TStepInfo = TypeVar("TStepInfo", bound=StepInfo)
381+
TStepMetrics = TypeVar("TStepMetrics", bound=StepMetrics, covariant=False)
382+
TStepInfo = TypeVar("TStepInfo", bound=StepInfo[StepMetrics])
334383

335384

336385
class WithStepInfo(ABC, Generic[TStepMetrics, TStepInfo]):
337386
"""Implemented by classes that generate StepInfo with metrics and package infos"""
338387

339388
_current_load_id: str
340389
_load_id_metrics: Dict[str, List[TStepMetrics]]
390+
_current_load_started: float
341391
"""Completed load ids metrics"""
342392

343393
def __init__(self) -> None:
344394
self._load_id_metrics = {}
345395
self._current_load_id = None
396+
self._current_load_started = None
346397

347398
def _step_info_start_load_id(self, load_id: str) -> None:
348399
self._current_load_id = load_id
400+
self._current_load_started = precise_time()
349401
self._load_id_metrics.setdefault(load_id, [])
350402

351403
def _step_info_complete_load_id(self, load_id: str, metrics: TStepMetrics) -> None:
352404
assert self._current_load_id == load_id, (
353405
f"Current load id mismatch {self._current_load_id} != {load_id} when completing step"
354406
" info"
355407
)
408+
metrics["started_at"] = ensure_pendulum_datetime(self._current_load_started)
409+
metrics["finished_at"] = ensure_pendulum_datetime(precise_time())
356410
self._load_id_metrics[load_id].append(metrics)
357411
self._current_load_id = None
412+
self._current_load_started = None
358413

359414
def _step_info_metrics(self, load_id: str) -> List[TStepMetrics]:
360415
return self._load_id_metrics[load_id]
@@ -368,8 +423,6 @@ def current_load_id(self) -> str:
368423
def get_step_info(
369424
self,
370425
pipeline: "SupportsPipeline",
371-
started_at: datetime.datetime = None,
372-
completed_at: datetime.datetime = None,
373426
) -> TStepInfo:
374427
"""Returns and instance of StepInfo with metrics and package infos"""
375428
pass

dlt/extract/extract.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,8 @@ def _compute_metrics(self, load_id: str, source: DltSource) -> ExtractMetrics:
225225
hints[name] = hint
226226

227227
return {
228+
"started_at": None,
229+
"finished_at": None,
228230
"schema_name": source.schema.name,
229231
"job_metrics": {job.job_id(): metrics for job, metrics in job_metrics.items()},
230232
"table_metrics": table_metrics,
@@ -352,9 +354,7 @@ def commit_packages(self) -> None:
352354
# all load ids got processed, cleanup empty folder
353355
self.extract_storage.delete_empty_extract_folder()
354356

355-
def get_step_info(
356-
self, pipeline: SupportsPipeline, started_at: datetime = None, completed_at: datetime = None
357-
) -> ExtractInfo:
357+
def get_step_info(self, pipeline: SupportsPipeline) -> ExtractInfo:
358358
load_ids = list(self._load_id_metrics.keys())
359359
load_packages: List[LoadPackageInfo] = []
360360
metrics: Dict[str, List[ExtractMetrics]] = {}
@@ -368,6 +368,5 @@ def get_step_info(
368368
describe_extract_data(self.original_data),
369369
load_ids,
370370
load_packages,
371-
started_at,
372371
pipeline.first_run,
373372
)

dlt/load/load.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from dlt.common import sleep, logger
99
from dlt.common.configuration import with_config, known_sections
1010
from dlt.common.configuration.accessors import config
11-
from dlt.common.pipeline import LoadInfo, SupportsPipeline, WithStepInfo
11+
from dlt.common.pipeline import LoadInfo, LoadMetrics, SupportsPipeline, WithStepInfo
1212
from dlt.common.schema.utils import get_child_tables, get_top_level_table
1313
from dlt.common.storages.load_storage import LoadPackageInfo, ParsedLoadJobFileName, TJobState
1414
from dlt.common.runners import TRunMetrics, Runnable, workermethod, NullExecutor
@@ -47,7 +47,7 @@
4747
)
4848

4949

50-
class Load(Runnable[Executor], WithStepInfo[str, LoadInfo]):
50+
class Load(Runnable[Executor], WithStepInfo[LoadMetrics, LoadInfo]):
5151
pool: Executor
5252

5353
@with_config(spec=LoaderConfiguration, sections=(known_sections.LOAD,))
@@ -347,7 +347,7 @@ def complete_package(self, load_id: str, schema: Schema, aborted: bool = False)
347347
self.load_storage.complete_load_package(load_id, aborted)
348348
# TODO: Load must provide a clear interface to get last loads and metrics
349349
# TODO: get more info ie. was package aborted, schema name etc.
350-
self._step_info_complete_load_id(load_id, metrics=None)
350+
self._step_info_complete_load_id(load_id, metrics={"started_at": None, "finished_at": None})
351351
logger.info(
352352
f"All jobs completed, archiving package {load_id} with aborted set to {aborted}"
353353
)
@@ -563,12 +563,11 @@ def run(self, pool: Optional[Executor]) -> TRunMetrics:
563563
def get_step_info(
564564
self,
565565
pipeline: SupportsPipeline,
566-
started_at: datetime.datetime = None,
567-
completed_at: datetime.datetime = None,
568566
) -> LoadInfo:
569567
# TODO: LoadInfo should hold many datasets
570568
load_ids = list(self._load_id_metrics.keys())
571569
load_packages: List[LoadPackageInfo] = []
570+
metrics: Dict[str, List[LoadMetrics]] = {}
572571
# get load packages and dataset_name from the last package
573572
_dataset_name: str = None
574573
for load_id in self._load_id_metrics.keys():
@@ -579,9 +578,11 @@ def get_step_info(
579578
load_package.schema
580579
)
581580
load_packages.append(load_package)
581+
metrics[load_id] = self._step_info_metrics(load_id)
582582

583583
return LoadInfo(
584584
pipeline,
585+
metrics,
585586
Destination.normalize_type(self.initial_client_config.destination_type),
586587
str(self.initial_client_config),
587588
self.initial_client_config.destination_name,
@@ -601,6 +602,5 @@ def get_step_info(
601602
_dataset_name,
602603
list(load_ids),
603604
load_packages,
604-
started_at,
605605
pipeline.first_run,
606606
)

dlt/normalize/normalize.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,8 @@ def spool_files(
316316
self._step_info_complete_load_id(
317317
load_id,
318318
{
319+
"started_at": None,
320+
"finished_at": None,
319321
"job_metrics": {job.job_id(): metrics for job, metrics in job_metrics.items()},
320322
"table_metrics": {
321323
table_name: sum(map(lambda pair: pair[1], metrics), EMPTY_DATA_WRITER_METRICS)
@@ -388,8 +390,6 @@ def get_load_package_info(self, load_id: str) -> LoadPackageInfo:
388390
def get_step_info(
389391
self,
390392
pipeline: SupportsPipeline,
391-
started_at: datetime.datetime = None,
392-
completed_at: datetime.datetime = None,
393393
) -> NormalizeInfo:
394394
load_ids = list(self._load_id_metrics.keys())
395395
load_packages: List[LoadPackageInfo] = []
@@ -398,6 +398,4 @@ def get_step_info(
398398
load_package = self.get_load_package_info(load_id)
399399
load_packages.append(load_package)
400400
metrics[load_id] = self._step_info_metrics(load_id)
401-
return NormalizeInfo(
402-
pipeline, metrics, load_ids, load_packages, started_at, pipeline.first_run
403-
)
401+
return NormalizeInfo(pipeline, metrics, load_ids, load_packages, pipeline.first_run)

dlt/pipeline/exceptions.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from typing import Any, Dict
22
from dlt.common.exceptions import PipelineException
3-
from dlt.common.pipeline import StepInfo, SupportsPipeline
3+
from dlt.common.pipeline import StepInfo, StepMetrics, SupportsPipeline
44
from dlt.pipeline.typing import TPipelineStep
55

66

@@ -56,7 +56,7 @@ def __init__(
5656
step: TPipelineStep,
5757
load_id: str,
5858
exception: BaseException,
59-
step_info: StepInfo = None,
59+
step_info: StepInfo[StepMetrics] = None,
6060
) -> None:
6161
self.pipeline = pipeline
6262
self.step = step

0 commit comments

Comments
 (0)