|
1 | | -import contextlib |
2 | 1 | import datetime # noqa: I251 |
3 | 2 | import re |
4 | 3 | import sys |
5 | | -from typing import Any, Optional, Union, overload, TypeVar, Callable # noqa |
| 4 | +from typing import Optional, Union, overload, TypeVar, Callable # noqa |
6 | 5 |
|
7 | | -from pendulum.parsing import ( |
8 | | - parse_iso8601, |
9 | | - DEFAULT_OPTIONS as pendulum_options, |
10 | | - _parse_common as parse_datetime_common, |
11 | | -) |
12 | 6 | from pendulum.tz import UTC |
| 7 | +from pendulum.exceptions import ParserError |
13 | 8 |
|
14 | 9 | from dlt.common.pendulum import pendulum, timedelta |
15 | 10 | from dlt.common.typing import TimedeltaSeconds, TAnyDateTime |
@@ -49,27 +44,37 @@ def timestamp_before(timestamp: float, max_inclusive: Optional[float]) -> bool: |
49 | 44 | return timestamp <= (max_inclusive or FUTURE_TIMESTAMP) |
50 | 45 |
|
51 | 46 |
|
52 | | -def parse_iso_like_datetime(value: Any) -> Union[pendulum.DateTime, pendulum.Date, pendulum.Time]: |
| 47 | +def parse_iso_like_datetime( |
| 48 | + value: TAnyDateTime, |
| 49 | +) -> Union[pendulum.DateTime, pendulum.Date, pendulum.Time]: |
53 | 50 | """Parses ISO8601 string into pendulum datetime, date or time. Preserves timezone info. |
54 | 51 | Note: naive datetimes will be generated from string without timezone |
55 | 52 |
|
56 | 53 | we use internal pendulum parse function. the generic function, for example, parses string "now" as now() |
57 | 54 | it also tries to parse ISO intervals but the code is very low quality |
58 | 55 | """ |
59 | | - # only iso dates are allowed |
60 | | - dtv = None |
61 | | - with contextlib.suppress(ValueError): |
62 | | - dtv = parse_iso8601(value) |
63 | | - # now try to parse a set of ISO like dates |
64 | | - if not dtv: |
65 | | - dtv = parse_datetime_common(value, **pendulum_options) |
66 | | - if isinstance(dtv, datetime.time): |
67 | | - return pendulum.time(dtv.hour, dtv.minute, dtv.second, dtv.microsecond) |
68 | | - if isinstance(dtv, datetime.datetime): |
69 | | - return pendulum.instance(dtv, tz=dtv.tzinfo) |
70 | | - if isinstance(dtv, pendulum.Duration): |
71 | | - raise ValueError(f"Interval ISO 8601 not supported: `{value}`") |
72 | | - return pendulum.date(dtv.year, dtv.month, dtv.day) # type: ignore[union-attr] |
| 56 | + try: |
| 57 | + if isinstance(value, str): |
| 58 | + # try to parse ISO or RFC formats |
| 59 | + parsed = pendulum.parse(value, exact=True, strict=False) |
| 60 | + if isinstance(parsed, pendulum.Duration): |
| 61 | + raise ValueError(f"Interval ISO 8601 not supported: `{value}`") |
| 62 | + elif isinstance(value, (float, int)): |
| 63 | + parsed = pendulum.from_timestamp(value) |
| 64 | + elif isinstance(value, datetime.time): |
| 65 | + parsed = pendulum.time(value.hour, value.minute, value.second, value.microsecond) |
| 66 | + elif isinstance(value, datetime.datetime): |
| 67 | + parsed = pendulum.instance(value, tz=value.tzinfo) |
| 68 | + elif isinstance(value, datetime.date): |
| 69 | + parsed = pendulum.date(value.year, value.month, value.day) |
| 70 | + else: |
| 71 | + raise ValueError(f"Cannot coerce {type(value)}:{value} to pendulum compatible type") |
| 72 | + except ValueError as e: |
| 73 | + raise e |
| 74 | + except ParserError: |
| 75 | + raise ValueError(f"Failed to parse the string: `{value}`") |
| 76 | + |
| 77 | + return parsed |
73 | 78 |
|
74 | 79 |
|
75 | 80 | def ensure_pendulum_date(value: TAnyDateTime) -> pendulum.Date: |
|
0 commit comments