Skip to content

Commit 1ca6664

Browse files
committed
gz extension now visible in staged files
1 parent 3a23b66 commit 1ca6664

File tree

5 files changed

+38
-13
lines changed

5 files changed

+38
-13
lines changed

dlt/common/data_writers/buffered.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -228,9 +228,13 @@ def _buffer_items_with_row_count(self, item: TDataItems) -> int:
228228

229229
def _rotate_file(self, allow_empty_file: bool = False) -> DataWriterMetrics:
230230
metrics = self._flush_and_close_file(allow_empty_file)
231-
self._file_name = (
232-
self.file_name_template % new_file_id() + "." + self.writer_spec.file_extension
233-
)
231+
base_filename = self.file_name_template % new_file_id()
232+
file_extension = self.writer_spec.file_extension
233+
# Add .gz if compression is enabled
234+
if self.writer_spec.supports_compression and self.open == gzip.open:
235+
self._file_name = f"{base_filename}.{file_extension}.gz"
236+
else:
237+
self._file_name = f"{base_filename}.{file_extension}"
234238
self._created = time.time()
235239
return metrics
236240

dlt/common/storages/load_package.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -156,14 +156,18 @@ class ParsedLoadJobFileName(NamedTuple):
156156
file_id: str
157157
retry_count: int
158158
file_format: TJobFileFormat
159+
is_compressed: bool = False
159160

160161
def job_id(self) -> str:
161162
"""Unique identifier of the job"""
162163
return f"{self.table_name}.{self.file_id}.{self.file_format}"
163164

164165
def file_name(self) -> str:
165166
"""A name of the file with the data to be loaded"""
166-
return f"{self.table_name}.{self.file_id}.{int(self.retry_count)}.{self.file_format}"
167+
base_name = f"{self.table_name}.{self.file_id}.{int(self.retry_count)}.{self.file_format}"
168+
if self.is_compressed:
169+
return f"{base_name}.gz"
170+
return base_name
167171

168172
def with_retry(self) -> "ParsedLoadJobFileName":
169173
"""Returns a job with increased retry count"""
@@ -173,12 +177,18 @@ def with_retry(self) -> "ParsedLoadJobFileName":
173177
def parse(file_name: str) -> "ParsedLoadJobFileName":
174178
p = PurePath(file_name)
175179
parts = p.name.split(".")
176-
if len(parts) != 4:
177-
raise TerminalValueError(parts)
178180

179-
return ParsedLoadJobFileName(
180-
parts[0], parts[1], int(parts[2]), cast(TJobFileFormat, parts[3])
181-
)
181+
if len(parts) == 4:
182+
# Uncompressed
183+
return ParsedLoadJobFileName(
184+
parts[0], parts[1], int(parts[2]), cast(TJobFileFormat, parts[3]), False
185+
)
186+
elif len(parts) == 5 and parts[4] == "gz":
187+
return ParsedLoadJobFileName(
188+
parts[0], parts[1], int(parts[2]), cast(TJobFileFormat, parts[3]), True
189+
)
190+
else:
191+
raise TerminalValueError(parts)
182192

183193
@staticmethod
184194
def new_file_id() -> str:

dlt/destinations/job_impl.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,10 @@ def job_id(self) -> str:
7979

8080
class ReferenceFollowupJobRequest(FollowupJobRequestImpl):
8181
def __init__(self, original_file_name: str, remote_paths: List[str]) -> None:
82-
file_name = os.path.splitext(original_file_name)[0] + "." + "reference"
82+
# Parse the original filename to handle compressed files correctly
83+
job_info = ParsedLoadJobFileName.parse(original_file_name)
84+
# Reference files are not compressed, so we build the filename without .gz
85+
file_name = f"{job_info.table_name}.{job_info.file_id}.{job_info.retry_count}.reference"
8386
self._remote_paths = remote_paths
8487
super().__init__(file_name)
8588
self._save_text_file("\n".join(remote_paths))

dlt/destinations/path_utils.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,11 @@ def prepare_params(
139139
if job_info:
140140
table_name = job_info.table_name
141141
file_id = job_info.file_id
142-
ext = job_info.file_format
142+
# For compressed files, ext should include .gz
143+
if job_info.is_compressed:
144+
ext = f"{job_info.file_format}.gz"
145+
else:
146+
ext = job_info.file_format
143147
params.update(
144148
{
145149
"table_name": table_name,
@@ -241,7 +245,10 @@ def create_path(
241245

242246
# if extension is not defined, we append it at the end
243247
if "ext" not in placeholders:
244-
path += f".{job_info.file_format}"
248+
if job_info.is_compressed:
249+
path += f".{job_info.file_format}.gz"
250+
else:
251+
path += f".{job_info.file_format}"
245252

246253
return path
247254

dlt/load/load.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,8 @@ def get_staging_destination_client(self, schema: Schema) -> JobClientBase:
121121
return self.staging_destination.client(schema, self.initial_staging_client_config)
122122

123123
def is_staging_destination_job(self, file_path: str) -> bool:
124-
file_type = os.path.splitext(file_path)[1][1:]
124+
job_info = ParsedLoadJobFileName.parse(file_path)
125+
file_type = job_info.file_format
125126
# for now we know that reference and model jobs always go do the main destination
126127
if file_type in ["reference", "model"]:
127128
return False

0 commit comments

Comments
 (0)