Skip to content

Commit 011d7ff

Browse files
authored
feat/1681 collects load job metrics and adds remote uri (#1708)
* collects basic load job metrics in LoadJob * adds remote uri to filesystem copy jobs metrics * adds job id to load package info * adds table name to job metrics * skips run step when serializing trace * adds trace shape test with trace schema * tests job file name too long * docs running pipelines with the same name for different envs * extracts step metrics in common, renames followup jobs * fixes tests * fixes tests * tests delta filesystem for remote_uri * adds exec_info to trace contract test * tests remote_uri for filesystem copy * fixes platform test
1 parent 7d7c14f commit 011d7ff

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+1552
-247
lines changed

dlt/common/data_writers/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
from dlt.common.data_writers.writers import (
22
DataWriter,
3-
DataWriterMetrics,
43
TDataItemFormat,
54
FileWriterSpec,
65
create_import_spec,
@@ -22,7 +21,6 @@
2221
"resolve_best_writer_spec",
2322
"get_best_writer_spec",
2423
"is_native_writer",
25-
"DataWriterMetrics",
2624
"TDataItemFormat",
2725
"BufferedDataWriter",
2826
"new_file_id",

dlt/common/data_writers/buffered.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@
33
import contextlib
44
from typing import ClassVar, Iterator, List, IO, Any, Optional, Type, Generic
55

6+
from dlt.common.metrics import DataWriterMetrics
67
from dlt.common.typing import TDataItem, TDataItems
78
from dlt.common.data_writers.exceptions import (
89
BufferedDataWriterClosed,
910
DestinationCapabilitiesRequired,
1011
FileImportNotFound,
1112
InvalidFileNameTemplateException,
1213
)
13-
from dlt.common.data_writers.writers import TWriter, DataWriter, DataWriterMetrics, FileWriterSpec
14+
from dlt.common.data_writers.writers import TWriter, DataWriter, FileWriterSpec
1415
from dlt.common.schema.typing import TTableSchemaColumns
1516
from dlt.common.configuration import with_config, known_sections, configspec
1617
from dlt.common.configuration.specs import BaseConfiguration

dlt/common/data_writers/writers.py

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
TLoaderFileFormat,
3535
ALL_SUPPORTED_FILE_FORMATS,
3636
)
37+
from dlt.common.metrics import DataWriterMetrics
3738
from dlt.common.schema.typing import TTableSchemaColumns
3839
from dlt.common.typing import StrAny
3940

@@ -59,25 +60,6 @@ class FileWriterSpec(NamedTuple):
5960
supports_compression: bool = False
6061

6162

62-
class DataWriterMetrics(NamedTuple):
63-
file_path: str
64-
items_count: int
65-
file_size: int
66-
created: float
67-
last_modified: float
68-
69-
def __add__(self, other: Tuple[object, ...], /) -> Tuple[object, ...]:
70-
if isinstance(other, DataWriterMetrics):
71-
return DataWriterMetrics(
72-
"", # path is not known
73-
self.items_count + other.items_count,
74-
self.file_size + other.file_size,
75-
min(self.created, other.created),
76-
max(self.last_modified, other.last_modified),
77-
)
78-
return NotImplemented
79-
80-
8163
EMPTY_DATA_WRITER_METRICS = DataWriterMetrics("", 0, 0, 2**32, 0.0)
8264

8365

dlt/common/destination/reference.py

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,11 @@
2424
from copy import deepcopy
2525
import inspect
2626

27-
from dlt.common import logger
27+
from dlt.common import logger, pendulum
2828
from dlt.common.configuration.specs.base_configuration import extract_inner_hint
2929
from dlt.common.destination.utils import verify_schema_capabilities
3030
from dlt.common.exceptions import TerminalValueError
31+
from dlt.common.metrics import LoadJobMetrics
3132
from dlt.common.normalizers.naming import NamingConvention
3233
from dlt.common.schema import Schema, TTableSchema, TSchemaTables
3334
from dlt.common.schema.utils import (
@@ -284,6 +285,8 @@ def __init__(self, file_path: str) -> None:
284285
# NOTE: we only accept a full filepath in the constructor
285286
assert self._file_name != self._file_path
286287
self._parsed_file_name = ParsedLoadJobFileName.parse(self._file_name)
288+
self._started_at: pendulum.DateTime = None
289+
self._finished_at: pendulum.DateTime = None
287290

288291
def job_id(self) -> str:
289292
"""The job id that is derived from the file name and does not changes during job lifecycle"""
@@ -306,6 +309,18 @@ def exception(self) -> str:
306309
"""The exception associated with failed or retry states"""
307310
pass
308311

312+
def metrics(self) -> Optional[LoadJobMetrics]:
313+
"""Returns job execution metrics"""
314+
return LoadJobMetrics(
315+
self._parsed_file_name.job_id(),
316+
self._file_path,
317+
self._parsed_file_name.table_name,
318+
self._started_at,
319+
self._finished_at,
320+
self.state(),
321+
None,
322+
)
323+
309324

310325
class RunnableLoadJob(LoadJob, ABC):
311326
"""Represents a runnable job that loads a single file
@@ -361,6 +376,7 @@ def run_managed(
361376
# filepath is now moved to running
362377
try:
363378
self._state = "running"
379+
self._started_at = pendulum.now()
364380
self._job_client.prepare_load_job_execution(self)
365381
self.run()
366382
self._state = "completed"
@@ -371,6 +387,7 @@ def run_managed(
371387
self._state = "retry"
372388
self._exception = e
373389
finally:
390+
self._finished_at = pendulum.now()
374391
# sanity check
375392
assert self._state in ("completed", "retry", "failed")
376393

@@ -391,7 +408,7 @@ def exception(self) -> str:
391408
return str(self._exception)
392409

393410

394-
class FollowupJob:
411+
class FollowupJobRequest:
395412
"""Base class for follow up jobs that should be created"""
396413

397414
@abstractmethod
@@ -403,8 +420,8 @@ def new_file_path(self) -> str:
403420
class HasFollowupJobs:
404421
"""Adds a trait that allows to create single or table chain followup jobs"""
405422

406-
def create_followup_jobs(self, final_state: TLoadJobState) -> List[FollowupJob]:
407-
"""Return list of new jobs. `final_state` is state to which this job transits"""
423+
def create_followup_jobs(self, final_state: TLoadJobState) -> List[FollowupJobRequest]:
424+
"""Return list of jobs requests for jobs that should be created. `final_state` is state to which this job transits"""
408425
return []
409426

410427

@@ -479,7 +496,7 @@ def create_table_chain_completed_followup_jobs(
479496
self,
480497
table_chain: Sequence[TTableSchema],
481498
completed_table_chain_jobs: Optional[Sequence[LoadJobInfo]] = None,
482-
) -> List[FollowupJob]:
499+
) -> List[FollowupJobRequest]:
483500
"""Creates a list of followup jobs that should be executed after a table chain is completed"""
484501
return []
485502

dlt/common/metrics.py

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
import datetime # noqa: I251
2+
from typing import Any, Dict, List, NamedTuple, Optional, Tuple, TypedDict # noqa: 251
3+
4+
5+
class DataWriterMetrics(NamedTuple):
6+
file_path: str
7+
items_count: int
8+
file_size: int
9+
created: float
10+
last_modified: float
11+
12+
def __add__(self, other: Tuple[object, ...], /) -> Tuple[object, ...]:
13+
if isinstance(other, DataWriterMetrics):
14+
return DataWriterMetrics(
15+
self.file_path if self.file_path == other.file_path else "",
16+
# self.table_name if self.table_name == other.table_name else "",
17+
self.items_count + other.items_count,
18+
self.file_size + other.file_size,
19+
min(self.created, other.created),
20+
max(self.last_modified, other.last_modified),
21+
)
22+
return NotImplemented
23+
24+
25+
class StepMetrics(TypedDict):
26+
"""Metrics for particular package processed in particular pipeline step"""
27+
28+
started_at: datetime.datetime
29+
"""Start of package processing"""
30+
finished_at: datetime.datetime
31+
"""End of package processing"""
32+
33+
34+
class ExtractDataInfo(TypedDict):
35+
name: str
36+
data_type: str
37+
38+
39+
class ExtractMetrics(StepMetrics):
40+
schema_name: str
41+
job_metrics: Dict[str, DataWriterMetrics]
42+
"""Metrics collected per job id during writing of job file"""
43+
table_metrics: Dict[str, DataWriterMetrics]
44+
"""Job metrics aggregated by table"""
45+
resource_metrics: Dict[str, DataWriterMetrics]
46+
"""Job metrics aggregated by resource"""
47+
dag: List[Tuple[str, str]]
48+
"""A resource dag where elements of the list are graph edges"""
49+
hints: Dict[str, Dict[str, Any]]
50+
"""Hints passed to the resources"""
51+
52+
53+
class NormalizeMetrics(StepMetrics):
54+
job_metrics: Dict[str, DataWriterMetrics]
55+
"""Metrics collected per job id during writing of job file"""
56+
table_metrics: Dict[str, DataWriterMetrics]
57+
"""Job metrics aggregated by table"""
58+
59+
60+
class LoadJobMetrics(NamedTuple):
61+
job_id: str
62+
file_path: str
63+
table_name: str
64+
started_at: datetime.datetime
65+
finished_at: datetime.datetime
66+
state: Optional[str]
67+
remote_uri: Optional[str]
68+
69+
70+
class LoadMetrics(StepMetrics):
71+
job_metrics: Dict[str, LoadJobMetrics]

0 commit comments

Comments
 (0)