Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dlt/_workspace/_workspace_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def local_dir(self) -> str:
def settings_dir(self) -> str:
"""Returns a path to dlt settings directory. If not overridden it resides in current working directory

The name of the setting folder is '.dlt'. The path is current working directory '.' but may be overridden by DLT_PROJECT_DIR env variable.
The name of the settings folder is '.dlt'. The path is current working directory '.' but may be overridden by DLT_PROJECT_DIR env variable.
"""
return os.path.join(self.run_dir, DOT_DLT)

Expand Down
Empty file.
46 changes: 46 additions & 0 deletions dlt/_workspace/deployment/file_selector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from typing import Iterator, Optional, List
from pathlib import Path
from pathspec import PathSpec
from pathspec.util import iter_tree_files

from dlt._workspace._workspace_context import WorkspaceRunContext


class WorkspaceFileSelector:
"""Iterates files in workspace respecting ignore patterns and excluding workspace internals.

Uses gitignore-style patterns from a configurable ignore file (default .gitignore). Additional
patterns can be provided as relative paths from workspace root. Settings directory is always excluded.
"""

def __init__(
self,
context: WorkspaceRunContext,
additional_excludes: Optional[List[str]] = None,
ignore_file: str = ".gitignore",
) -> None:
self.root_path: Path = Path(context.run_dir).resolve()
self.settings_dir: Path = Path(context.settings_dir).resolve()
self.ignore_file: str = ignore_file
self.spec: PathSpec = self._build_pathspec(additional_excludes or [])

def _build_pathspec(self, additional_excludes: List[str]) -> PathSpec:
"""Build PathSpec from ignore file + defaults + additional excludes"""
patterns: List[str] = [f"{self.settings_dir.relative_to(self.root_path)}/"]

# Load ignore file if exists
ignore_path = self.root_path / self.ignore_file
if ignore_path.exists():
with ignore_path.open("r", encoding="utf-8") as f:
patterns.extend(f.read().splitlines())

# Add caller-provided excludes
patterns.extend(additional_excludes)

return PathSpec.from_lines("gitwildmatch", patterns)

def __iter__(self) -> Iterator[Path]:
"""Yield paths of files eligible for deployment"""
for file_path in iter_tree_files(self.root_path):
if not self.spec.match_file(file_path):
yield Path(file_path)
19 changes: 19 additions & 0 deletions dlt/_workspace/deployment/manifest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from typing import List
from dlt.common.typing import TypedDict

# current version of deployment engine
DEPLOYMENT_ENGINE_VERSION = 1


class TDeploymentFileItem(TypedDict, total=False):
"""TypedDict representing a file in the deployment package"""

relative_path: str
size_in_bytes: int


class TDeploymentManifest(TypedDict, total=False):
"""TypedDict defining the deployment manifest structure"""

engine_version: int
files: List[TDeploymentFileItem]
76 changes: 76 additions & 0 deletions dlt/_workspace/deployment/package_builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from io import BytesIO
from typing import Tuple, BinaryIO, List
from pathlib import Path
import tarfile
import yaml

from dlt.common.time import precise_time
from dlt.common.utils import digest256_tar_stream

from dlt._workspace.deployment.file_selector import WorkspaceFileSelector
from dlt._workspace.deployment.manifest import (
TDeploymentFileItem,
TDeploymentManifest,
DEPLOYMENT_ENGINE_VERSION,
)

from dlt._workspace._workspace_context import WorkspaceRunContext


DEFAULT_DEPLOYMENT_FILES_FOLDER = "files"
DEFAULT_MANIFEST_FILE_NAME = "manifest.yaml"
DEFAULT_DEPLOYMENT_PACKAGE_LAYOUT = "deployment-{timestamp}.tar.gz"


class DeploymentPackageBuilder:
"""Builds gzipped deployment package from file selectors"""

def __init__(self, context: WorkspaceRunContext):
self.run_context: WorkspaceRunContext = context

def write_package_to_stream(
self, file_selector: WorkspaceFileSelector, output_stream: BinaryIO
) -> str:
"""Write deployment package to output stream, return content hash"""
manifest_files: List[TDeploymentFileItem] = []

# Add files to the archive
with tarfile.open(fileobj=output_stream, mode="w|gz") as tar:
for file_path in file_selector:
full_path = self.run_context.run_dir / file_path
# Use POSIX paths for tar archives (cross-platform compatibility)
posix_path = file_path.as_posix()
tar.add(
full_path,
arcname=f"{DEFAULT_DEPLOYMENT_FILES_FOLDER}/{posix_path}",
recursive=False,
)
manifest_files.append(
{
"relative_path": posix_path,
"size_in_bytes": full_path.stat().st_size,
}
)
# Create and add manifest with file metadata at the end
manifest: TDeploymentManifest = {
"engine_version": DEPLOYMENT_ENGINE_VERSION,
"files": manifest_files,
}
manifest_yaml = yaml.dump(
manifest, allow_unicode=True, default_flow_style=False, sort_keys=False
).encode("utf-8")
manifest_info = tarfile.TarInfo(name=DEFAULT_MANIFEST_FILE_NAME)
manifest_info.size = len(manifest_yaml)
tar.addfile(manifest_info, BytesIO(manifest_yaml))

return digest256_tar_stream(output_stream)

def build_package(self, file_selector: WorkspaceFileSelector) -> Tuple[Path, str]:
"""Create deployment package file, return (path, content_hash)"""
package_name = DEFAULT_DEPLOYMENT_PACKAGE_LAYOUT.format(timestamp=str(precise_time()))
package_path = Path(self.run_context.get_data_entity(package_name))

with open(package_path, "w+b") as f:
content_hash = self.write_package_to_stream(file_selector, f)

return package_path, content_hash
43 changes: 42 additions & 1 deletion dlt/common/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import TYPE_CHECKING, Any, Literal
import re
from typing import TYPE_CHECKING, Any, BinaryIO, Literal
import os
from pathlib import Path
import sys
Expand All @@ -11,6 +12,7 @@
from types import ModuleType
import traceback
import zlib
import tarfile
from importlib.metadata import version as pkg_version
from packaging.version import Version

Expand Down Expand Up @@ -119,6 +121,45 @@ def digest256(v: str) -> str:
return base64.b64encode(digest).decode("ascii")


def digest256_file_stream(stream: BinaryIO, chunk_size: int = 4096) -> str:
"""Returns a base64 encoded sha3_256 hash of a binary stream"""
stream.seek(0)
hash_obj = hashlib.sha3_256()
while chunk := stream.read(chunk_size):
hash_obj.update(chunk)
digest = hash_obj.digest()
return base64.b64encode(digest).decode("ascii")


def digest256_tar_stream(stream: BinaryIO, chunk_size: int = 8192) -> str:
"""Returns a base64 encoded sha3_256 hash of tar archive contents (ignoring metadata)

Hashes only filenames and file contents, ignoring timestamps and other metadata.
This ensures identical file contents produce identical hashes regardless of when
the tar was created.

Note: This function operates entirely in-memory using tar.extractfile() which reads
from the archive stream. No files are written to disk, preventing leakage of sensitive
data that may be contained in the archive.
"""
stream.seek(0)
hash_obj = hashlib.sha3_256()

with tarfile.open(fileobj=stream, mode="r:*") as tar:
members = sorted(tar.getmembers(), key=lambda m: m.name)

for member in members:
hash_obj.update(member.name.encode())
if member.isfile():
f = tar.extractfile(member)
if f:
while chunk := f.read(chunk_size):
hash_obj.update(chunk)

digest = hash_obj.digest()
return base64.b64encode(digest).decode("ascii")


def str2bool(v: str) -> bool:
if isinstance(v, bool):
return v
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ workspace = [
"pyarrow>=16.0.0",
"marimo>=0.14.5",
"mcp>=1.2.1 ; python_version >= '3.10'",
"pathspec>=0.11.2",
]
dbml = [
"pydbml"
Expand Down
1 change: 1 addition & 0 deletions tests/workspace/cases/workspaces/default/.ignorefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/empty_file.py
3 changes: 3 additions & 0 deletions tests/workspace/cases/workspaces/default/ducklake_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import dlt

pipeline = dlt.pipeline(pipeline_name="ducklake_pipeline")
Empty file.
31 changes: 31 additions & 0 deletions tests/workspace/deployment/test_file_selector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import os
import pytest

from dlt._workspace.deployment.file_selector import WorkspaceFileSelector

from tests.workspace.utils import isolated_workspace


@pytest.mark.parametrize(
"with_additional_exclude",
[True, False],
ids=["with_additional_exclude", "without_additional_exclude"],
)
def test_file_selector_respects_gitignore(with_additional_exclude: bool) -> None:
"""Test that .gitignore patterns are respected with and without additional excludes."""

additional_excludes = ["additional_exclude/"] if with_additional_exclude else None
expected_files = {
"additional_exclude/empty_file.py",
"ducklake_pipeline.py",
".ignorefile",
}
if with_additional_exclude:
expected_files.remove("additional_exclude/empty_file.py")

with isolated_workspace("default") as ctx:
selector = WorkspaceFileSelector(
ctx, additional_excludes=additional_excludes, ignore_file=".ignorefile"
)
files = set([f.as_posix() for f in selector])
assert files == expected_files
83 changes: 83 additions & 0 deletions tests/workspace/deployment/test_package_builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import os
import tarfile
import yaml
from io import BytesIO
from pathlib import Path
import time

from dlt._workspace.deployment.package_builder import (
DeploymentPackageBuilder,
DEFAULT_DEPLOYMENT_FILES_FOLDER,
DEFAULT_MANIFEST_FILE_NAME,
)
from dlt._workspace.deployment.file_selector import WorkspaceFileSelector
from dlt._workspace.deployment.manifest import DEPLOYMENT_ENGINE_VERSION

from tests.workspace.utils import isolated_workspace


def test_write_package_to_stream() -> None:
"""Test building deployment package to a stream and verify structure."""

with isolated_workspace("default") as ctx:
builder = DeploymentPackageBuilder(ctx)
selector = WorkspaceFileSelector(ctx, ignore_file=".ignorefile")

stream = BytesIO()
content_hash = builder.write_package_to_stream(selector, stream)

assert content_hash
assert len(content_hash) == 44 # sha3_256 base64 string

expected_workspace_files = [
"additional_exclude/empty_file.py",
"ducklake_pipeline.py",
".ignorefile",
]

# Verify tar.gz structure
stream.seek(0)
with tarfile.open(fileobj=stream, mode="r:gz") as tar:
members = tar.getnames()

# Tar contains files under "files/" prefix + manifest
assert DEFAULT_MANIFEST_FILE_NAME in members
tar_files = [m for m in members if m.startswith(DEFAULT_DEPLOYMENT_FILES_FOLDER)]
assert set(tar_files) == {
f"{DEFAULT_DEPLOYMENT_FILES_FOLDER}/{f}" for f in expected_workspace_files
}

# Verify manifest structure
manifest_member = tar.extractfile(DEFAULT_MANIFEST_FILE_NAME)
manifest = yaml.safe_load(manifest_member)

assert manifest["engine_version"] == DEPLOYMENT_ENGINE_VERSION
assert all(
"relative_path" in file_item and "size_in_bytes" in file_item
for file_item in manifest["files"]
)

# Manifest has workspace-relative paths (no "files/" prefix)
manifest_paths = [f["relative_path"] for f in manifest["files"]]
assert set(manifest_paths) == set(expected_workspace_files)


def test_build_package() -> None:
"""Test that deployment packages are content-addressable with reproducible hashes."""

with isolated_workspace("default") as ctx:
builder = DeploymentPackageBuilder(ctx)
selector = WorkspaceFileSelector(ctx)

package_path, content_hash = builder.build_package(selector)
assert str(package_path).startswith(f"{ctx.data_dir}{os.sep}deployment-")
assert len(content_hash) == 44 # sha3_256 base64 string

# NOTE: Sleep ensures tarballs have different timestamps in their metadata, proving
# digest256_tar_stream produces identical hashes despite different creation times
time.sleep(0.2)

package_path_2, content_hash_2 = builder.build_package(selector)

assert package_path != package_path_2
assert content_hash == content_hash_2
2 changes: 2 additions & 0 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading