Skip to content

Commit 00c2725

Browse files
authored
allows to run parallel pipelines in separate threads (#813)
* keeps metrics of closed data writer files * fixes handling of multiple load storages in normalize * separates immutable job id from actual job file name * allows to import files into data item storage * import parquet files into data items storage in normalize * adds buffered writer tests * bumps to alpha 0.4.1a1 * makes all injection contexts thread affine, except config providers * tests running parallel pipelines in thread pool * allows to set start method for process executor * adds thread id to dlt log * improves parallel run test * fixes None in toml config writer * adds parallel asyncio test * updates performance docs
1 parent dfc9c05 commit 00c2725

37 files changed

+860
-215
lines changed
Lines changed: 96 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from contextlib import contextmanager
2-
from typing import Dict, Iterator, Type, TypeVar
2+
import re
3+
import threading
4+
from typing import ClassVar, Dict, Iterator, Tuple, Type, TypeVar
35

46
from dlt.common.configuration.specs.base_configuration import ContainerInjectableContext
57
from dlt.common.configuration.exceptions import (
@@ -16,20 +18,33 @@ class Container:
1618
Injection context is identified by its type and available via dict indexer. The common pattern is to instantiate default context value
1719
if it is not yet present in container.
1820
21+
By default, the context is thread-affine so it is visible only from the thread that originally set it. This behavior may be changed
22+
in particular context type (spec).
23+
1924
The indexer is settable and allows to explicitly set the value. This is required by for context that needs to be explicitly instantiated.
2025
2126
The `injectable_context` allows to set a context with a `with` keyword and then restore the previous one after it gets out of scope.
2227
2328
"""
2429

25-
_INSTANCE: "Container" = None
30+
_INSTANCE: ClassVar["Container"] = None
31+
_LOCK: ClassVar[threading.Lock] = threading.Lock()
32+
_MAIN_THREAD_ID: ClassVar[int] = threading.get_ident()
33+
"""A main thread id to which get item will fallback for contexts without default"""
2634

27-
contexts: Dict[Type[ContainerInjectableContext], ContainerInjectableContext]
35+
thread_contexts: Dict[int, Dict[Type[ContainerInjectableContext], ContainerInjectableContext]]
36+
"""A thread aware mapping of injection context """
37+
main_context: Dict[Type[ContainerInjectableContext], ContainerInjectableContext]
38+
"""Injection context for the main thread"""
2839

2940
def __new__(cls: Type["Container"]) -> "Container":
3041
if not cls._INSTANCE:
3142
cls._INSTANCE = super().__new__(cls)
32-
cls._INSTANCE.contexts = {}
43+
cls._INSTANCE.thread_contexts = {}
44+
cls._INSTANCE.main_context = cls._INSTANCE.thread_contexts[
45+
Container._MAIN_THREAD_ID
46+
] = {}
47+
3348
return cls._INSTANCE
3449

3550
def __init__(self) -> None:
@@ -40,48 +55,112 @@ def __getitem__(self, spec: Type[TConfiguration]) -> TConfiguration:
4055
if not issubclass(spec, ContainerInjectableContext):
4156
raise KeyError(f"{spec.__name__} is not a context")
4257

43-
item = self.contexts.get(spec)
58+
context, item = self._thread_getitem(spec)
4459
if item is None:
4560
if spec.can_create_default:
4661
item = spec()
47-
self.contexts[spec] = item
62+
self._thread_setitem(context, spec, item)
4863
item.add_extras()
4964
else:
5065
raise ContextDefaultCannotBeCreated(spec)
5166

52-
return item # type: ignore
67+
return item # type: ignore[return-value]
5368

5469
def __setitem__(self, spec: Type[TConfiguration], value: TConfiguration) -> None:
5570
# value passed to container must be final
5671
value.resolve()
5772
# put it into context
58-
self.contexts[spec] = value
73+
self._thread_setitem(self._thread_context(spec), spec, value)
5974

6075
def __delitem__(self, spec: Type[TConfiguration]) -> None:
61-
del self.contexts[spec]
76+
context = self._thread_context(spec)
77+
self._thread_delitem(context, spec)
6278

6379
def __contains__(self, spec: Type[TConfiguration]) -> bool:
64-
return spec in self.contexts
80+
context = self._thread_context(spec)
81+
return spec in context
82+
83+
def _thread_context(
84+
self, spec: Type[TConfiguration]
85+
) -> Dict[Type[ContainerInjectableContext], ContainerInjectableContext]:
86+
if spec.global_affinity:
87+
context = self.main_context
88+
else:
89+
# thread pool names used in dlt contain originating thread id. use this id over pool id
90+
if m := re.match(r"dlt-pool-(\d+)-", threading.currentThread().getName()):
91+
thread_id = int(m.group(1))
92+
else:
93+
thread_id = threading.get_ident()
94+
# return main context for main thread
95+
if thread_id == Container._MAIN_THREAD_ID:
96+
return self.main_context
97+
# we may add a new empty thread context so lock here
98+
with Container._LOCK:
99+
context = self.thread_contexts.get(thread_id)
100+
if context is None:
101+
context = self.thread_contexts[thread_id] = {}
102+
return context
103+
104+
def _thread_getitem(
105+
self, spec: Type[TConfiguration]
106+
) -> Tuple[
107+
Dict[Type[ContainerInjectableContext], ContainerInjectableContext],
108+
ContainerInjectableContext,
109+
]:
110+
# with Container._LOCK:
111+
context = self._thread_context(spec)
112+
item = context.get(spec)
113+
# if item is None and not spec.thread_affinity and context is not self.main_context:
114+
# item = self.main_context.get(spec)
115+
return context, item
116+
117+
def _thread_setitem(
118+
self,
119+
context: Dict[Type[ContainerInjectableContext], ContainerInjectableContext],
120+
spec: Type[ContainerInjectableContext],
121+
value: TConfiguration,
122+
) -> None:
123+
# with Container._LOCK:
124+
context[spec] = value
125+
# set the global context if spec is not thread affine
126+
# if not spec.thread_affinity and context is not self.main_context:
127+
# self.main_context[spec] = value
128+
129+
def _thread_delitem(
130+
self,
131+
context: Dict[Type[ContainerInjectableContext], ContainerInjectableContext],
132+
spec: Type[ContainerInjectableContext],
133+
) -> None:
134+
del context[spec]
65135

66136
@contextmanager
67137
def injectable_context(self, config: TConfiguration) -> Iterator[TConfiguration]:
68138
"""A context manager that will insert `config` into the container and restore the previous value when it gets out of scope."""
139+
config.resolve()
69140
spec = type(config)
70141
previous_config: ContainerInjectableContext = None
71-
if spec in self.contexts:
72-
previous_config = self.contexts[spec]
142+
context, previous_config = self._thread_getitem(spec)
143+
73144
# set new config and yield context
145+
self._thread_setitem(context, spec, config)
74146
try:
75-
self[spec] = config
76147
yield config
77148
finally:
78149
# before setting the previous config for given spec, check if there was no overlapping modification
79-
if self.contexts[spec] is config:
150+
context, current_config = self._thread_getitem(spec)
151+
if current_config is config:
80152
# config is injected for spec so restore previous
81153
if previous_config is None:
82-
del self.contexts[spec]
154+
self._thread_delitem(context, spec)
83155
else:
84-
self.contexts[spec] = previous_config
156+
self._thread_setitem(context, spec, previous_config)
85157
else:
86158
# value was modified in the meantime and not restored
87-
raise ContainerInjectableContextMangled(spec, self.contexts[spec], config)
159+
raise ContainerInjectableContextMangled(spec, context[spec], config)
160+
161+
@staticmethod
162+
def thread_pool_prefix() -> str:
163+
"""Creates a container friendly pool prefix that contains starting thread id. Container implementation will automatically use it
164+
for any thread-affine contexts instead of using id of the pool thread
165+
"""
166+
return f"dlt-pool-{threading.get_ident()}-"

dlt/common/configuration/inject.py

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import inspect
2-
import threading
32
from functools import wraps
43
from typing import Callable, Dict, Type, Any, Optional, Tuple, TypeVar, overload
54
from inspect import Signature, Parameter
@@ -15,7 +14,6 @@
1514
_ORIGINAL_ARGS = "_dlt_orig_args"
1615
# keep a registry of all the decorated functions
1716
_FUNC_SPECS: Dict[int, Type[BaseConfiguration]] = {}
18-
_RESOLVE_LOCK = threading.Lock()
1917

2018
TConfiguration = TypeVar("TConfiguration", bound=BaseConfiguration)
2119

@@ -146,15 +144,14 @@ def _wrap(*args: Any, **kwargs: Any) -> Any:
146144
sections=curr_sections,
147145
merge_style=sections_merge_style,
148146
)
149-
# this may be called from many threads so make sure context is not mangled
150-
with _RESOLVE_LOCK:
151-
with inject_section(section_context):
152-
# print(f"RESOLVE CONF in inject: {f.__name__}: {section_context.sections} vs {sections}")
153-
config = resolve_configuration(
154-
config or SPEC(),
155-
explicit_value=bound_args.arguments,
156-
accept_partial=accept_partial,
157-
)
147+
# this may be called from many threads so section_context is thread affine
148+
with inject_section(section_context):
149+
# print(f"RESOLVE CONF in inject: {f.__name__}: {section_context.sections} vs {sections}")
150+
config = resolve_configuration(
151+
config or SPEC(),
152+
explicit_value=bound_args.arguments,
153+
accept_partial=accept_partial,
154+
)
158155
resolved_params = dict(config)
159156
# overwrite or add resolved params
160157
for p in sig.parameters.values():

dlt/common/configuration/providers/toml.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,14 @@ def set_value(self, key: str, value: Any, pipeline_name: str, *sections: str) ->
7272
if k not in master:
7373
master[k] = tomlkit.table()
7474
master = master[k] # type: ignore
75-
if isinstance(value, dict) and isinstance(master.get(key), dict):
76-
update_dict_nested(master[key], value) # type: ignore
77-
else:
78-
master[key] = value
75+
if isinstance(value, dict):
76+
# remove none values, TODO: we need recursive None removal
77+
value = {k: v for k, v in value.items() if v is not None}
78+
# if target is also dict then merge recursively
79+
if isinstance(master.get(key), dict):
80+
update_dict_nested(master[key], value) # type: ignore
81+
return
82+
master[key] = value
7983

8084
@property
8185
def supports_sections(self) -> bool:

dlt/common/configuration/specs/base_configuration.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,8 @@ class ContainerInjectableContext(BaseConfiguration):
395395

396396
can_create_default: ClassVar[bool] = True
397397
"""If True, `Container` is allowed to create default context instance, if none exists"""
398+
global_affinity: ClassVar[bool] = False
399+
"""If True, `Container` will create context that will be visible in any thread. If False, per thread context is created"""
398400

399401
def add_extras(self) -> None:
400402
"""Called right after context was added to the container. Benefits mostly the config provider injection context which adds extra providers using the initial ones."""

dlt/common/configuration/specs/config_providers_context.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import contextlib
22
import io
3-
from typing import List
3+
from typing import ClassVar, List
4+
45
from dlt.common.configuration.exceptions import DuplicateConfigProviderException
56
from dlt.common.configuration.providers import (
67
ConfigProvider,
@@ -34,6 +35,8 @@ class ConfigProvidersConfiguration(BaseConfiguration):
3435
class ConfigProvidersContext(ContainerInjectableContext):
3536
"""Injectable list of providers used by the configuration `resolve` module"""
3637

38+
global_affinity: ClassVar[bool] = True
39+
3740
providers: List[ConfigProvider]
3841
context_provider: ConfigProvider
3942

dlt/common/configuration/specs/run_configuration.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,7 @@ class RunConfiguration(BaseConfiguration):
1616
slack_incoming_hook: Optional[TSecretStrValue] = None
1717
dlthub_telemetry: bool = True # enable or disable dlthub telemetry
1818
dlthub_telemetry_segment_write_key: str = "a1F2gc6cNYw2plyAt02sZouZcsRjG7TD"
19-
log_format: str = (
20-
"{asctime}|[{levelname:<21}]|{process}|{name}|{filename}|{funcName}:{lineno}|{message}"
21-
)
19+
log_format: str = "{asctime}|[{levelname:<21}]|{process}|{thread}|{name}|{filename}|{funcName}:{lineno}|{message}"
2220
log_level: str = "WARNING"
2321
request_timeout: float = 60
2422
"""Timeout for http requests"""

dlt/common/data_writers/__init__.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
from dlt.common.data_writers.writers import DataWriter, TLoaderFileFormat
2-
from dlt.common.data_writers.buffered import BufferedDataWriter
1+
from dlt.common.data_writers.writers import DataWriter, DataWriterMetrics, TLoaderFileFormat
2+
from dlt.common.data_writers.buffered import BufferedDataWriter, new_file_id
33
from dlt.common.data_writers.escape import (
44
escape_redshift_literal,
55
escape_redshift_identifier,
@@ -8,8 +8,10 @@
88

99
__all__ = [
1010
"DataWriter",
11+
"DataWriterMetrics",
1112
"TLoaderFileFormat",
1213
"BufferedDataWriter",
14+
"new_file_id",
1315
"escape_redshift_literal",
1416
"escape_redshift_identifier",
1517
"escape_bigquery_identifier",

dlt/common/data_writers/buffered.py

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,28 @@
11
import gzip
22
from typing import List, IO, Any, Optional, Type, TypeVar, Generic
33

4-
from dlt.common.utils import uniq_id
54
from dlt.common.typing import TDataItem, TDataItems
65
from dlt.common.data_writers import TLoaderFileFormat
76
from dlt.common.data_writers.exceptions import (
87
BufferedDataWriterClosed,
98
DestinationCapabilitiesRequired,
109
InvalidFileNameTemplateException,
1110
)
12-
from dlt.common.data_writers.writers import DataWriter
11+
from dlt.common.data_writers.writers import DataWriter, DataWriterMetrics
1312
from dlt.common.schema.typing import TTableSchemaColumns
1413
from dlt.common.configuration import with_config, known_sections, configspec
1514
from dlt.common.configuration.specs import BaseConfiguration
1615
from dlt.common.destination import DestinationCapabilitiesContext
17-
16+
from dlt.common.utils import uniq_id
1817

1918
TWriter = TypeVar("TWriter", bound=DataWriter)
2019

2120

21+
def new_file_id() -> str:
22+
"""Creates new file id which is globally unique within table_name scope"""
23+
return uniq_id(5)
24+
25+
2226
class BufferedDataWriter(Generic[TWriter]):
2327
@configspec
2428
class BufferedDataWriterConfiguration(BaseConfiguration):
@@ -49,7 +53,7 @@ def __init__(
4953
self._caps = _caps
5054
# validate if template has correct placeholders
5155
self.file_name_template = file_name_template
52-
self.closed_files: List[str] = [] # all fully processed files
56+
self.closed_files: List[DataWriterMetrics] = [] # all fully processed files
5357
# buffered items must be less than max items in file
5458
self.buffer_max_items = min(buffer_max_items, file_max_items or buffer_max_items)
5559
self.file_max_bytes = file_max_bytes
@@ -121,10 +125,20 @@ def write_data_item(self, item: TDataItems, columns: TTableSchemaColumns) -> int
121125
return new_rows_count
122126

123127
def write_empty_file(self, columns: TTableSchemaColumns) -> None:
128+
"""Writes empty file: only header and footer without actual items"""
124129
if columns is not None:
125130
self._current_columns = dict(columns)
126131
self._flush_items(allow_empty_file=True)
127132

133+
def import_file(self, file_path: str, metrics: DataWriterMetrics) -> None:
134+
# TODO: we should separate file storage from other storages. this creates circular deps
135+
from dlt.common.storages import FileStorage
136+
137+
self._rotate_file()
138+
FileStorage.link_hard_with_fallback(file_path, self._file_name)
139+
self.closed_files.append(metrics._replace(file_path=self._file_name))
140+
self._file_name = None
141+
128142
def close(self) -> None:
129143
self._ensure_open()
130144
self._flush_and_close_file()
@@ -143,7 +157,7 @@ def __exit__(self, exc_type: Type[BaseException], exc_val: BaseException, exc_tb
143157
def _rotate_file(self) -> None:
144158
self._flush_and_close_file()
145159
self._file_name = (
146-
self.file_name_template % uniq_id(5) + "." + self._file_format_spec.file_extension
160+
self.file_name_template % new_file_id() + "." + self._file_format_spec.file_extension
147161
)
148162

149163
def _flush_items(self, allow_empty_file: bool = False) -> None:
@@ -171,9 +185,12 @@ def _flush_and_close_file(self) -> None:
171185
if self._writer:
172186
# write the footer of a file
173187
self._writer.write_footer()
174-
self._file.close()
188+
self._file.flush()
175189
# add file written to the list so we can commit all the files later
176-
self.closed_files.append(self._file_name)
190+
self.closed_files.append(
191+
DataWriterMetrics(self._file_name, self._writer.items_count, self._file.tell())
192+
)
193+
self._file.close()
177194
self._writer = None
178195
self._file = None
179196
self._file_name = None

dlt/common/data_writers/writers.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import abc
22
from dataclasses import dataclass
3-
from typing import IO, TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Type, Union
3+
from typing import IO, TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Type, NamedTuple
44

55
from dlt.common import json
66
from dlt.common.configuration import configspec, known_sections, with_config
@@ -23,6 +23,12 @@ class TFileFormatSpec:
2323
supports_compression: bool = False
2424

2525

26+
class DataWriterMetrics(NamedTuple):
27+
file_path: str
28+
items_count: int
29+
file_size: int
30+
31+
2632
class DataWriter(abc.ABC):
2733
def __init__(self, f: IO[Any], caps: DestinationCapabilitiesContext = None) -> None:
2834
self._f = f

0 commit comments

Comments
 (0)