From 0894a0644bc47feca043fadb2ab4aec83fe5889b Mon Sep 17 00:00:00 2001 From: Jacob Nesbitt Date: Thu, 23 Apr 2026 17:09:53 -0400 Subject: [PATCH 01/16] Use multi-part upload on files larger than 5GB --- dandi/consts.py | 3 + dandi/files/zarr.py | 146 +++++++++++++++++++++++++++++++++++++- dandi/tests/test_files.py | 43 +++++++++-- 3 files changed, 184 insertions(+), 8 deletions(-) diff --git a/dandi/consts.py b/dandi/consts.py index 933c59aba..da4fbd7d3 100644 --- a/dandi/consts.py +++ b/dandi/consts.py @@ -199,6 +199,9 @@ def urls(self) -> Iterator[str]: #: Maximum number of Zarr directory entries to delete at once ZARR_DELETE_BATCH_SIZE = 100 +#: Zarr chunks above this size (bytes) are uploaded via multipart upload +ZARR_LARGE_CHUNK_THRESHOLD = 5 * 1024 * 1024 * 1024 # 5 GB + BIDS_DATASET_DESCRIPTION = "dataset_description.json" BIDS_IGNORE_FILE = ".bidsignore" diff --git a/dandi/files/zarr.py b/dandi/files/zarr.py index 67a9ddc86..e6ec75f4d 100644 --- a/dandi/files/zarr.py +++ b/dandi/files/zarr.py @@ -10,10 +10,13 @@ import json import os import os.path +import re from pathlib import Path +from threading import Lock from time import sleep from typing import Any, Optional import urllib.parse +from xml.etree.ElementTree import fromstring from dandischema.models import BareAsset, DigestType from pydantic import BaseModel, ConfigDict, ValidationError @@ -25,6 +28,7 @@ from dandi.consts import ( MAX_ZARR_DEPTH, ZARR_DELETE_BATCH_SIZE, + ZARR_LARGE_CHUNK_THRESHOLD, ZARR_MIME_TYPE, ZARR_UPLOAD_BATCH_SIZE, ) @@ -46,7 +50,7 @@ pre_upload_size_check, ) -from .bases import LocalDirectoryAsset +from .bases import LocalDirectoryAsset, _upload_blob_part from ..validate._types import ( ORIGIN_VALIDATION_DANDI_ZARR, Origin, @@ -741,13 +745,40 @@ def mkzarr() -> str: ): # Items to upload in this batch (may be retried e.g. due to # 403 errors because of timed-out upload URLs) - items_to_upload = list(items) + all_items = list(items) + large_items = [ + it for it in all_items if it.size > ZARR_LARGE_CHUNK_THRESHOLD + ] + items_to_upload = [ + it for it in all_items if it.size <= ZARR_LARGE_CHUNK_THRESHOLD + ] max_retries = 5 retry_count = 0 # Add all items to checksum tree (only done once) - for it in items_to_upload: + for it in all_items: zcc.add_leaf(Path(it.entry_path), it.size, it.digest) + # Upload chunks above 5GB individually via multipart upload + for it in large_items: + for status in upload_zarr_file_multipart( + item=it, + zarr_id=zarr_id, + dandiset=dandiset, + jobs=jobs, + ): + if status.get("status") == "done": + changed = True + bytes_uploaded += it.size + yield { + "status": "uploading", + "progress": 100 + * bytes_uploaded + / to_upload.total_size, + "current": bytes_uploaded, + } + else: + yield status + while items_to_upload and retry_count <= max_retries: # Prepare upload requests for current items uploading = [it.upload_request() for it in items_to_upload] @@ -902,6 +933,115 @@ def _handle_failed_items_and_raise( raise failed_items[0][1] +def upload_zarr_file_multipart( + item: UploadItem, + zarr_id: str, + dandiset: RemoteDandiset, + jobs: int | None = None, +): + # Avoid heavy import by importing within function: + from dandi.support.digests import get_dandietag + + client = dandiset.client + + yield {"status": "calculating etag"} + etagger = get_dandietag(item.filepath) + filetag = etagger.as_str() + + yield {"status": "initiating upload"} + lgr.debug("%s: Beginning upload", item.filepath) + total_size = pre_upload_size_check(item.filepath) + + resp = client.post( + "/uploads/zarr/initialize/", + json={ + "contentSize": total_size, + "digest": { + "algorithm": "dandi:dandi-etag", + "value": filetag, + }, + "zarr": { + "chunk_key": item.entry_path, + "zarr_id": zarr_id, + }, + }, + ) + + try: + upload_id = resp["upload_id"] + parts = resp["parts"] + if len(parts) != etagger.part_qty: + raise RuntimeError( + f"Server and client disagree on number of parts for upload;" + f" server says {len(parts)}, client says {etagger.part_qty}" + ) + parts_out = [] + bytes_uploaded = 0 + lgr.debug("Uploading %s in %d parts", item.filepath, len(parts)) + with RESTFullAPIClient("http://nil.nil") as storage: + with item.filepath.open("rb") as fp: + with ThreadPoolExecutor(max_workers=jobs or 5) as executor: + lock = Lock() + futures = [ + executor.submit( + _upload_blob_part, + storage_session=storage, + fp=fp, + lock=lock, + etagger=etagger, + asset_path=item.entry_path, + part=part, + ) + for part in parts + ] + for fut in as_completed(futures): + out_part = fut.result() + bytes_uploaded += out_part["size"] + yield { + "status": "uploading", + "progress": 100 * bytes_uploaded / total_size, + "current": bytes_uploaded, + } + parts_out.append(out_part) + + lgr.debug("%s: Completing upload", item.entry_path) + resp = client.post( + f"/uploads/zarr/{upload_id}/complete/", + json={"parts": parts_out}, + ) + lgr.debug( + "%s: Announcing completion to %s", + item.entry_path, + resp["complete_url"], + ) + r = storage.post(resp["complete_url"], data=resp["body"], json_resp=False) + lgr.debug( + "%s: Upload completed. Response content: %s", + item.entry_path, + r.content, + ) + rxml = fromstring(r.text) + m = re.match(r"\{.+?\}", rxml.tag) + ns = m.group(0) if m else "" + final_etag = rxml.findtext(f"{ns}ETag") + if final_etag is not None: + final_etag = final_etag.strip('"') + if final_etag != filetag: + raise RuntimeError( + "Server and client disagree on final ETag of" + f" uploaded file; server says {final_etag}," + f" client says {filetag}" + ) + # else: Error? Warning? + resp = client.post(f"/uploads/zarr/{upload_id}/validate/") + yield {"status": "done"} + except Exception: + post_upload_size_check(item.filepath, total_size, True) + raise + else: + post_upload_size_check(item.filepath, total_size, False) + + def _upload_zarr_file( storage_session: RESTFullAPIClient, dandiset: RemoteDandiset, diff --git a/dandi/tests/test_files.py b/dandi/tests/test_files.py index 6f3dff19e..5977f2c67 100644 --- a/dandi/tests/test_files.py +++ b/dandi/tests/test_files.py @@ -1,17 +1,16 @@ from __future__ import annotations -from operator import attrgetter import os -from pathlib import Path import subprocess -from unittest.mock import ANY +from operator import attrgetter +from pathlib import Path +from unittest.mock import ANY, patch -from dandischema.models import get_schema_version import numpy as np import pytest import zarr +from dandischema.models import get_schema_version -from .fixtures import SampleDandiset from .. import get_logger from ..consts import ZARR_MIME_TYPE, dandiset_metadata_file from ..dandiapi import AssetType, RemoteZarrAsset @@ -29,6 +28,7 @@ dandi_file, find_dandi_files, ) +from .fixtures import SampleDandiset lgr = get_logger() @@ -536,6 +536,39 @@ def test_upload_zarr_entry_content_type(new_dandiset, tmp_path): assert r.headers["Content-Type"] == "application/json" +@pytest.mark.ai_generated +def test_upload_zarr_large_chunks(new_dandiset, tmp_path): + """Chunks above ZARR_LARGE_CHUNK_THRESHOLD are uploaded via upload_zarr_file_multipart.""" + filepath = tmp_path / "example.zarr" + zarr.save(filepath, np.arange(1000), np.arange(1000, 0, -1)) + zf = dandi_file(filepath) + assert isinstance(zf, ZarrAsset) + + from ..files.zarr import upload_zarr_file_multipart + + real_upload_zarr_file_multipart = upload_zarr_file_multipart + called_paths: list[str] = [] + + def spy_upload_zarr_file_multipart(item, *args, **kwargs): + called_paths.append(item.entry_path) + yield from real_upload_zarr_file_multipart(item, *args, **kwargs) + + # Set threshold to 0 so every chunk is treated as "large" + with ( + patch("dandi.files.zarr.ZARR_LARGE_CHUNK_THRESHOLD", 0), + patch( + "dandi.files.zarr.upload_zarr_file_multipart", + spy_upload_zarr_file_multipart, + ), + ): + asset = zf.upload(new_dandiset.dandiset, {}) + + assert isinstance(asset, RemoteZarrAsset) + # Every chunk file in the zarr should have been routed through upload_zarr_file_multipart + remote_entries = {str(e) for e in asset.iterfiles()} + assert remote_entries == set(called_paths) + + def test_validate_deep_zarr(tmp_path: Path) -> None: zarr_path = tmp_path / "foo.zarr" zarr.save(zarr_path, np.arange(1000), np.arange(1000, 0, -1)) From b3bf063a87d8a2f511aca01f7e7dc5317ce8bd63 Mon Sep 17 00:00:00 2001 From: Jacob Nesbitt Date: Fri, 24 Apr 2026 12:17:20 -0400 Subject: [PATCH 02/16] Unify multipart upload logic between zarrs and blobs --- dandi/files/bases.py | 228 +++++++++++++++++++++----------------- dandi/files/zarr.py | 151 ++++--------------------- dandi/tests/test_files.py | 25 ++--- 3 files changed, 158 insertions(+), 246 deletions(-) diff --git a/dandi/files/bases.py b/dandi/files/bases.py index de52027f7..42af8d51e 100644 --- a/dandi/files/bases.py +++ b/dandi/files/bases.py @@ -2,7 +2,7 @@ from abc import ABC, abstractmethod from collections import deque -from collections.abc import Iterator +from collections.abc import Generator, Iterator from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass from datetime import datetime @@ -341,118 +341,26 @@ def iter_upload( ``"done"`` and an ``"asset"`` key containing the resulting `RemoteAsset`. """ - # Avoid heavy import by importing within function: - from dandi.support.digests import get_dandietag - asset_path = metadata.setdefault("path", self.path) client = dandiset.client - yield {"status": "calculating etag"} - etagger = get_dandietag(self.filepath) - filetag = etagger.as_str() - lgr.debug("Calculated dandi-etag of %s for %s", filetag, self.filepath) - digest = metadata.get("digest", {}) - if "dandi:dandi-etag" in digest: - if digest["dandi:dandi-etag"] != filetag: - raise RuntimeError( - f"{self.filepath}: File etag changed; was originally" - f" {digest['dandi:dandi-etag']} but is now {filetag}" - ) - yield {"status": "initiating upload"} - lgr.debug("%s: Beginning upload", asset_path) - total_size = pre_upload_size_check(self.filepath) + expected_etag = metadata.get("digest", {}).get("dandi:dandi-etag") try: - resp = client.post( - "/uploads/initialize/", - json={ - "contentSize": total_size, - "digest": { - "algorithm": "dandi:dandi-etag", - "value": filetag, - }, - "dandiset": dandiset.identifier, - }, + validate_resp = yield from _multipart_upload( + client=client, + filepath=self.filepath, + asset_path=asset_path, + upload_prefix="/uploads", + extra_init_fields={"dandiset": dandiset.identifier}, + expected_etag=expected_etag, + jobs=jobs, ) + blob_id = validate_resp["blob_id"] except requests.HTTPError as e: if e.response is not None and e.response.status_code == 409: lgr.debug("%s: Blob already exists on server", asset_path) blob_id = e.response.headers["Location"] else: raise - else: - try: - upload_id = resp["upload_id"] - parts = resp["parts"] - if len(parts) != etagger.part_qty: - raise RuntimeError( - f"Server and client disagree on number of parts for upload;" - f" server says {len(parts)}, client says {etagger.part_qty}" - ) - parts_out = [] - bytes_uploaded = 0 - lgr.debug("Uploading %s in %d parts", self.filepath, len(parts)) - with RESTFullAPIClient("http://nil.nil") as storage: - with self.filepath.open("rb") as fp: - with ThreadPoolExecutor(max_workers=jobs or 5) as executor: - lock = Lock() - futures = [ - executor.submit( - _upload_blob_part, - storage_session=storage, - fp=fp, - lock=lock, - etagger=etagger, - asset_path=asset_path, - part=part, - ) - for part in parts - ] - for fut in as_completed(futures): - out_part = fut.result() - bytes_uploaded += out_part["size"] - yield { - "status": "uploading", - "progress": 100 * bytes_uploaded / total_size, - "current": bytes_uploaded, - } - parts_out.append(out_part) - lgr.debug("%s: Completing upload", asset_path) - resp = client.post( - f"/uploads/{upload_id}/complete/", - json={"parts": parts_out}, - ) - lgr.debug( - "%s: Announcing completion to %s", - asset_path, - resp["complete_url"], - ) - r = storage.post( - resp["complete_url"], data=resp["body"], json_resp=False - ) - lgr.debug( - "%s: Upload completed. Response content: %s", - asset_path, - r.content, - ) - rxml = fromstring(r.text) - m = re.match(r"\{.+?\}", rxml.tag) - ns = m.group(0) if m else "" - final_etag = rxml.findtext(f"{ns}ETag") - if final_etag is not None: - final_etag = final_etag.strip('"') - if final_etag != filetag: - raise RuntimeError( - "Server and client disagree on final ETag of" - f" uploaded file; server says {final_etag}," - f" client says {filetag}" - ) - # else: Error? Warning? - resp = client.post(f"/uploads/{upload_id}/validate/") - blob_id = resp["blob_id"] - except Exception: - post_upload_size_check(self.filepath, total_size, True) - raise - else: - post_upload_size_check(self.filepath, total_size, False) lgr.debug("%s: Assigning asset blob to dandiset & version", asset_path) yield {"status": "producing asset"} if replacing is not None: @@ -696,6 +604,120 @@ def _upload_blob_part( } +def _multipart_upload( + client: RESTFullAPIClient, + filepath: Path, + asset_path: str, + upload_prefix: str, + extra_init_fields: dict, + expected_etag: str | None = None, + jobs: int | None = None, +) -> Generator[dict, None, dict]: + """Perform a full multipart upload: etag calculation, initialization, part upload, and validation. + + Yields progress dicts and returns the validate response dict. If + ``expected_etag`` is provided and does not match the computed etag, raises + ``RuntimeError``. A 409 HTTPError from the initialize call propagates + unchanged so the caller can handle the "blob already exists" case. + """ + # Avoid heavy import by importing within function: + from dandi.support.digests import get_dandietag + + yield {"status": "calculating etag"} + etagger = get_dandietag(filepath) + filetag = etagger.as_str() + lgr.debug("Calculated dandi-etag of %s for %s", filetag, filepath) + if expected_etag is not None and filetag != expected_etag: + raise RuntimeError( + f"{filepath}: File etag changed; was originally" + f" {expected_etag} but is now {filetag}" + ) + yield {"status": "initiating upload"} + lgr.debug("%s: Beginning upload", asset_path) + total_size = pre_upload_size_check(filepath) + resp = client.post( + f"{upload_prefix}/initialize/", + json={ + "contentSize": total_size, + "digest": {"algorithm": "dandi:dandi-etag", "value": filetag}, + **extra_init_fields, + }, + ) + try: + upload_id = resp["upload_id"] + parts = resp["parts"] + if len(parts) != etagger.part_qty: + raise RuntimeError( + f"Server and client disagree on number of parts for upload;" + f" server says {len(parts)}, client says {etagger.part_qty}" + ) + parts_out = [] + bytes_uploaded = 0 + lgr.debug("Uploading %s in %d parts", filepath, len(parts)) + with RESTFullAPIClient("http://nil.nil") as storage: + with filepath.open("rb") as fp: + with ThreadPoolExecutor(max_workers=jobs or 5) as executor: + lock = Lock() + futures = [ + executor.submit( + _upload_blob_part, + storage_session=storage, + fp=fp, + lock=lock, + etagger=etagger, + asset_path=asset_path, + part=part, + ) + for part in parts + ] + for fut in as_completed(futures): + out_part = fut.result() + bytes_uploaded += out_part["size"] + yield { + "status": "uploading", + "progress": 100 * bytes_uploaded / total_size, + "current": bytes_uploaded, + } + parts_out.append(out_part) + lgr.debug("%s: Completing upload", asset_path) + resp = client.post( + f"{upload_prefix}/{upload_id}/complete/", + json={"parts": parts_out}, + ) + lgr.debug( + "%s: Announcing completion to %s", + asset_path, + resp["complete_url"], + ) + r = storage.post(resp["complete_url"], data=resp["body"], json_resp=False) + lgr.debug( + "%s: Upload completed. Response content: %s", + asset_path, + r.content, + ) + rxml = fromstring(r.text) + m = re.match(r"\{.+?\}", rxml.tag) + ns = m.group(0) if m else "" + final_etag = rxml.findtext(f"{ns}ETag") + if final_etag is not None: + final_etag = final_etag.strip('"') + if final_etag != filetag: + raise RuntimeError( + "Server and client disagree on final ETag of" + f" uploaded file; server says {final_etag}," + f" client says {filetag}" + ) + # else: Error? Warning? + validate_resp = client.post(f"{upload_prefix}/{upload_id}/validate/") + except Exception: + post_upload_size_check(filepath, total_size, True) + raise + else: + post_upload_size_check(filepath, total_size, False) + + return validate_resp + + def _check_required_fields( d: dict, required: list[str], file_path: str ) -> list[ValidationResult]: diff --git a/dandi/files/zarr.py b/dandi/files/zarr.py index e6ec75f4d..de0c29c93 100644 --- a/dandi/files/zarr.py +++ b/dandi/files/zarr.py @@ -10,13 +10,10 @@ import json import os import os.path -import re from pathlib import Path -from threading import Lock from time import sleep from typing import Any, Optional import urllib.parse -from xml.etree.ElementTree import fromstring from dandischema.models import BareAsset, DigestType from pydantic import BaseModel, ConfigDict, ValidationError @@ -50,7 +47,7 @@ pre_upload_size_check, ) -from .bases import LocalDirectoryAsset, _upload_blob_part +from .bases import LocalDirectoryAsset, _multipart_upload from ..validate._types import ( ORIGIN_VALIDATION_DANDI_ZARR, Origin, @@ -760,25 +757,28 @@ def mkzarr() -> str: # Upload chunks above 5GB individually via multipart upload for it in large_items: - for status in upload_zarr_file_multipart( - item=it, - zarr_id=zarr_id, - dandiset=dandiset, + # Yield uploading status + yield from _multipart_upload( + client=client, + filepath=it.filepath, + asset_path=it.entry_path, + upload_prefix="/uploads/zarr", + extra_init_fields={ + "zarr": {"chunk_key": it.entry_path, "zarr_id": zarr_id} + }, jobs=jobs, - ): - if status.get("status") == "done": - changed = True - bytes_uploaded += it.size - yield { - "status": "uploading", - "progress": 100 - * bytes_uploaded - / to_upload.total_size, - "current": bytes_uploaded, - } - else: - yield status + ) + + # Part is finished uploading, yield final progress + changed = True + bytes_uploaded += it.size + yield { + "status": "uploading", + "progress": 100 * bytes_uploaded / to_upload.total_size, + "current": bytes_uploaded, + } + # Upload the remaining files using single part upload while items_to_upload and retry_count <= max_retries: # Prepare upload requests for current items uploading = [it.upload_request() for it in items_to_upload] @@ -933,115 +933,6 @@ def _handle_failed_items_and_raise( raise failed_items[0][1] -def upload_zarr_file_multipart( - item: UploadItem, - zarr_id: str, - dandiset: RemoteDandiset, - jobs: int | None = None, -): - # Avoid heavy import by importing within function: - from dandi.support.digests import get_dandietag - - client = dandiset.client - - yield {"status": "calculating etag"} - etagger = get_dandietag(item.filepath) - filetag = etagger.as_str() - - yield {"status": "initiating upload"} - lgr.debug("%s: Beginning upload", item.filepath) - total_size = pre_upload_size_check(item.filepath) - - resp = client.post( - "/uploads/zarr/initialize/", - json={ - "contentSize": total_size, - "digest": { - "algorithm": "dandi:dandi-etag", - "value": filetag, - }, - "zarr": { - "chunk_key": item.entry_path, - "zarr_id": zarr_id, - }, - }, - ) - - try: - upload_id = resp["upload_id"] - parts = resp["parts"] - if len(parts) != etagger.part_qty: - raise RuntimeError( - f"Server and client disagree on number of parts for upload;" - f" server says {len(parts)}, client says {etagger.part_qty}" - ) - parts_out = [] - bytes_uploaded = 0 - lgr.debug("Uploading %s in %d parts", item.filepath, len(parts)) - with RESTFullAPIClient("http://nil.nil") as storage: - with item.filepath.open("rb") as fp: - with ThreadPoolExecutor(max_workers=jobs or 5) as executor: - lock = Lock() - futures = [ - executor.submit( - _upload_blob_part, - storage_session=storage, - fp=fp, - lock=lock, - etagger=etagger, - asset_path=item.entry_path, - part=part, - ) - for part in parts - ] - for fut in as_completed(futures): - out_part = fut.result() - bytes_uploaded += out_part["size"] - yield { - "status": "uploading", - "progress": 100 * bytes_uploaded / total_size, - "current": bytes_uploaded, - } - parts_out.append(out_part) - - lgr.debug("%s: Completing upload", item.entry_path) - resp = client.post( - f"/uploads/zarr/{upload_id}/complete/", - json={"parts": parts_out}, - ) - lgr.debug( - "%s: Announcing completion to %s", - item.entry_path, - resp["complete_url"], - ) - r = storage.post(resp["complete_url"], data=resp["body"], json_resp=False) - lgr.debug( - "%s: Upload completed. Response content: %s", - item.entry_path, - r.content, - ) - rxml = fromstring(r.text) - m = re.match(r"\{.+?\}", rxml.tag) - ns = m.group(0) if m else "" - final_etag = rxml.findtext(f"{ns}ETag") - if final_etag is not None: - final_etag = final_etag.strip('"') - if final_etag != filetag: - raise RuntimeError( - "Server and client disagree on final ETag of" - f" uploaded file; server says {final_etag}," - f" client says {filetag}" - ) - # else: Error? Warning? - resp = client.post(f"/uploads/zarr/{upload_id}/validate/") - yield {"status": "done"} - except Exception: - post_upload_size_check(item.filepath, total_size, True) - raise - else: - post_upload_size_check(item.filepath, total_size, False) - - def _upload_zarr_file( storage_session: RESTFullAPIClient, dandiset: RemoteDandiset, diff --git a/dandi/tests/test_files.py b/dandi/tests/test_files.py index 5977f2c67..cccf53d74 100644 --- a/dandi/tests/test_files.py +++ b/dandi/tests/test_files.py @@ -1,16 +1,17 @@ from __future__ import annotations -import os -import subprocess from operator import attrgetter +import os from pathlib import Path +import subprocess from unittest.mock import ANY, patch +from dandischema.models import get_schema_version import numpy as np import pytest import zarr -from dandischema.models import get_schema_version +from .fixtures import SampleDandiset from .. import get_logger from ..consts import ZARR_MIME_TYPE, dandiset_metadata_file from ..dandiapi import AssetType, RemoteZarrAsset @@ -28,7 +29,6 @@ dandi_file, find_dandi_files, ) -from .fixtures import SampleDandiset lgr = get_logger() @@ -538,33 +538,32 @@ def test_upload_zarr_entry_content_type(new_dandiset, tmp_path): @pytest.mark.ai_generated def test_upload_zarr_large_chunks(new_dandiset, tmp_path): - """Chunks above ZARR_LARGE_CHUNK_THRESHOLD are uploaded via upload_zarr_file_multipart.""" + """Chunks above ZARR_LARGE_CHUNK_THRESHOLD are uploaded via multipart upload.""" filepath = tmp_path / "example.zarr" zarr.save(filepath, np.arange(1000), np.arange(1000, 0, -1)) zf = dandi_file(filepath) assert isinstance(zf, ZarrAsset) - from ..files.zarr import upload_zarr_file_multipart + from ..files.bases import _multipart_upload as real_multipart_upload - real_upload_zarr_file_multipart = upload_zarr_file_multipart called_paths: list[str] = [] - def spy_upload_zarr_file_multipart(item, *args, **kwargs): - called_paths.append(item.entry_path) - yield from real_upload_zarr_file_multipart(item, *args, **kwargs) + def spy_multipart_upload(**kwargs): + called_paths.append(kwargs["asset_path"]) + yield from real_multipart_upload(**kwargs) # Set threshold to 0 so every chunk is treated as "large" with ( patch("dandi.files.zarr.ZARR_LARGE_CHUNK_THRESHOLD", 0), patch( - "dandi.files.zarr.upload_zarr_file_multipart", - spy_upload_zarr_file_multipart, + "dandi.files.zarr._multipart_upload", + spy_multipart_upload, ), ): asset = zf.upload(new_dandiset.dandiset, {}) assert isinstance(asset, RemoteZarrAsset) - # Every chunk file in the zarr should have been routed through upload_zarr_file_multipart + # Every chunk file in the zarr should have been routed through multipart upload remote_entries = {str(e) for e in asset.iterfiles()} assert remote_entries == set(called_paths) From 4490bbb79ed1229a6b0315290e2927425c7d098e Mon Sep 17 00:00:00 2001 From: Jacob Nesbitt Date: Mon, 27 Apr 2026 13:15:08 -0400 Subject: [PATCH 03/16] Fix zarr params to multi-part upload function --- dandi/files/zarr.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dandi/files/zarr.py b/dandi/files/zarr.py index de0c29c93..e77a8270c 100644 --- a/dandi/files/zarr.py +++ b/dandi/files/zarr.py @@ -764,7 +764,8 @@ def mkzarr() -> str: asset_path=it.entry_path, upload_prefix="/uploads/zarr", extra_init_fields={ - "zarr": {"chunk_key": it.entry_path, "zarr_id": zarr_id} + "zarr_id": zarr_id, + "chunk_key": it.entry_path, }, jobs=jobs, ) From 3772cc40773130af0b68b99523c61c4865d3629d Mon Sep 17 00:00:00 2001 From: Jacob Nesbitt Date: Mon, 27 Apr 2026 13:17:46 -0400 Subject: [PATCH 04/16] Fix linting --- dandi/files/bases.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dandi/files/bases.py b/dandi/files/bases.py index 42af8d51e..c984478bf 100644 --- a/dandi/files/bases.py +++ b/dandi/files/bases.py @@ -613,7 +613,7 @@ def _multipart_upload( expected_etag: str | None = None, jobs: int | None = None, ) -> Generator[dict, None, dict]: - """Perform a full multipart upload: etag calculation, initialization, part upload, and validation. + """Perform multipart upload: etag calculation, initialization, part upload, and validation. Yields progress dicts and returns the validate response dict. If ``expected_etag`` is provided and does not match the computed etag, raises From 0ded7f99acfe2b931d5cca1dfbef7e56daf2c3f3 Mon Sep 17 00:00:00 2001 From: Jacob Nesbitt Date: Mon, 27 Apr 2026 13:29:11 -0400 Subject: [PATCH 05/16] Fix typing error --- dandi/files/bases.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/dandi/files/bases.py b/dandi/files/bases.py index c984478bf..22bfc6f77 100644 --- a/dandi/files/bases.py +++ b/dandi/files/bases.py @@ -354,6 +354,8 @@ def iter_upload( expected_etag=expected_etag, jobs=jobs, ) + if validate_resp is None: + raise RuntimeError("Expected upload response of type `dict` but received `None`") blob_id = validate_resp["blob_id"] except requests.HTTPError as e: if e.response is not None and e.response.status_code == 409: @@ -612,7 +614,7 @@ def _multipart_upload( extra_init_fields: dict, expected_etag: str | None = None, jobs: int | None = None, -) -> Generator[dict, None, dict]: +) -> Generator[dict, None, dict | None]: """Perform multipart upload: etag calculation, initialization, part upload, and validation. Yields progress dicts and returns the validate response dict. If @@ -708,7 +710,7 @@ def _multipart_upload( f" client says {filetag}" ) # else: Error? Warning? - validate_resp = client.post(f"{upload_prefix}/{upload_id}/validate/") + validate_resp: dict | None = client.post(f"{upload_prefix}/{upload_id}/validate/") except Exception: post_upload_size_check(filepath, total_size, True) raise From 6e9064b97a4f50e678995e330ad7672bc8bb7e32 Mon Sep 17 00:00:00 2001 From: Jacob Nesbitt Date: Mon, 27 Apr 2026 17:18:53 -0400 Subject: [PATCH 06/16] Move test import to top of file --- dandi/tests/test_files.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dandi/tests/test_files.py b/dandi/tests/test_files.py index cccf53d74..2fb67e082 100644 --- a/dandi/tests/test_files.py +++ b/dandi/tests/test_files.py @@ -16,6 +16,7 @@ from ..consts import ZARR_MIME_TYPE, dandiset_metadata_file from ..dandiapi import AssetType, RemoteZarrAsset from ..exceptions import UnknownAssetError +from ..files.bases import _multipart_upload as real_multipart_upload from ..files import ( BIDSDatasetDescriptionAsset, DandisetMetadataFile, @@ -544,8 +545,6 @@ def test_upload_zarr_large_chunks(new_dandiset, tmp_path): zf = dandi_file(filepath) assert isinstance(zf, ZarrAsset) - from ..files.bases import _multipart_upload as real_multipart_upload - called_paths: list[str] = [] def spy_multipart_upload(**kwargs): From 57b353f8d14accf0ef69b52babc5487ba5837489 Mon Sep 17 00:00:00 2001 From: Jacob Nesbitt Date: Tue, 28 Apr 2026 13:00:58 -0400 Subject: [PATCH 07/16] Use multi-part checksums for multi-part zarr uploads --- dandi/files/zarr.py | 10 ++++++++-- dandi/support/digests.py | 24 ++++++++++++++++++++++++ 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/dandi/files/zarr.py b/dandi/files/zarr.py index e77a8270c..0bbcc63fb 100644 --- a/dandi/files/zarr.py +++ b/dandi/files/zarr.py @@ -1037,9 +1037,15 @@ def register(self, e: LocalZarrEntry, digest: str | None = None) -> None: @staticmethod def _mkitem(e: LocalZarrEntry) -> UploadItem: # Avoid heavy import by importing within function: - from dandi.support.digests import md5file_nocache + from dandi.support.digests import md5file_nocache, multipart_md5file_nocache + + file_size = e.filepath.stat().st_size + digest = ( + md5file_nocache(e.filepath) + if file_size <= ZARR_LARGE_CHUNK_THRESHOLD + else multipart_md5file_nocache(e.filepath) + ) - digest = md5file_nocache(e.filepath) return UploadItem.from_entry(e, digest) def get_items(self, jobs: int = 5) -> Generator[UploadItem, None, None]: diff --git a/dandi/support/digests.py b/dandi/support/digests.py index 7a69a1629..018626f89 100644 --- a/dandi/support/digests.py +++ b/dandi/support/digests.py @@ -137,6 +137,30 @@ def md5file_nocache(filepath: str | Path) -> str: return Digester(["md5"])(filepath)["md5"] +def multipart_md5file_nocache(filepath: str | Path) -> str: + """ + Compute the S3 multipart ETag for a file. + + Splits the file into parts of ``part_size`` bytes, hashes each part with + MD5, then returns ``MD5(concat(part_md5s))-{num_parts}``, matching what S3 + stores as the ETag for a multipart upload. + """ + if isinstance(filepath, str): + filepath = Path(filepath) + + part_size = DandiETag(filepath.stat().st_size)._part_gen.initial_part_size + part_md5s = b"" + num_parts = 0 + with open(filepath, "rb") as f: + while True: + chunk = f.read(part_size) + if not chunk: + break + part_md5s += hashlib.md5(chunk).digest() + num_parts += 1 + return f"{hashlib.md5(part_md5s).hexdigest()}-{num_parts}" + + def checksum_zarr_dir( files: dict[str, tuple[str, int]], directories: dict[str, tuple[str, int]] ) -> str: From 9bc29766b34341799b47a8e2d40ea3baca542ea1 Mon Sep 17 00:00:00 2001 From: Jacob Nesbitt Date: Tue, 28 Apr 2026 13:01:33 -0400 Subject: [PATCH 08/16] Add test for multi-part zarr upload --- dandi/tests/test_files.py | 42 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/dandi/tests/test_files.py b/dandi/tests/test_files.py index 2fb67e082..9875d2087 100644 --- a/dandi/tests/test_files.py +++ b/dandi/tests/test_files.py @@ -567,6 +567,48 @@ def spy_multipart_upload(**kwargs): assert remote_entries == set(called_paths) +@pytest.mark.ai_generated +def test_upload_zarr_mixed_chunks(new_dandiset, tmp_path): + """Chunks above ZARR_LARGE_CHUNK_THRESHOLD go multipart; smaller ones use single-part upload.""" + filepath = tmp_path / "mixed.zarr" + store = zarr.open_group(str(filepath), mode="w") + # small array: 10 int64 elements, produces a ~96-byte chunk (compressed) + store.create_dataset("small", data=np.arange(10, dtype=np.int64), chunks=(10,)) + # large array: 200 int64 elements, produces a ~329-byte chunk (compressed) + store.create_dataset("large", data=np.arange(200, dtype=np.int64), chunks=(200,)) + + zf = dandi_file(filepath) + assert isinstance(zf, ZarrAsset) + + multipart_paths: list[str] = [] + + def spy_multipart_upload(**kwargs): + multipart_paths.append(kwargs["asset_path"]) + yield from real_multipart_upload(**kwargs) + + # Threshold sits between the two chunk sizes so only the large chunk goes multipart + mixed_threshold = 200 + with ( + patch("dandi.files.zarr.ZARR_LARGE_CHUNK_THRESHOLD", mixed_threshold), + patch("dandi.files.zarr._multipart_upload", spy_multipart_upload), + ): + asset = zf.upload(new_dandiset.dandiset, {}) + + assert isinstance(asset, RemoteZarrAsset) + + remote_entries = {str(e) for e in asset.iterfiles()} + # Only chunk files whose on-disk size exceeds the threshold should be multipart-uploaded + large_chunks = { + p + for p in remote_entries + if (filepath / p).stat().st_size > mixed_threshold + } + assert set(multipart_paths) == large_chunks + # At least one chunk must have gone each path so the test is meaningful + assert len(multipart_paths) > 0 + assert len(remote_entries) - len(multipart_paths) > 0 + + def test_validate_deep_zarr(tmp_path: Path) -> None: zarr_path = tmp_path / "foo.zarr" zarr.save(zarr_path, np.arange(1000), np.arange(1000, 0, -1)) From 6d996707ff027353bf39e21531ff397d3da6bbe7 Mon Sep 17 00:00:00 2001 From: Jacob Nesbitt Date: Tue, 28 Apr 2026 16:28:26 -0400 Subject: [PATCH 09/16] Merge multipart behavior into md5file_nocache func --- dandi/files/zarr.py | 10 ++-------- dandi/support/digests.py | 28 +++++++--------------------- dandi/tests/test_files.py | 2 ++ 3 files changed, 11 insertions(+), 29 deletions(-) diff --git a/dandi/files/zarr.py b/dandi/files/zarr.py index 0bbcc63fb..e77a8270c 100644 --- a/dandi/files/zarr.py +++ b/dandi/files/zarr.py @@ -1037,15 +1037,9 @@ def register(self, e: LocalZarrEntry, digest: str | None = None) -> None: @staticmethod def _mkitem(e: LocalZarrEntry) -> UploadItem: # Avoid heavy import by importing within function: - from dandi.support.digests import md5file_nocache, multipart_md5file_nocache - - file_size = e.filepath.stat().st_size - digest = ( - md5file_nocache(e.filepath) - if file_size <= ZARR_LARGE_CHUNK_THRESHOLD - else multipart_md5file_nocache(e.filepath) - ) + from dandi.support.digests import md5file_nocache + digest = md5file_nocache(e.filepath) return UploadItem.from_entry(e, digest) def get_items(self, jobs: int = 5) -> Generator[UploadItem, None, None]: diff --git a/dandi/support/digests.py b/dandi/support/digests.py index 018626f89..c35522984 100644 --- a/dandi/support/digests.py +++ b/dandi/support/digests.py @@ -26,6 +26,7 @@ from fscacher import PersistentCache from zarr_checksum.checksum import ZarrChecksum, ZarrChecksumManifest from zarr_checksum.tree import ZarrChecksumTree +from dandi.consts import ZARR_LARGE_CHUNK_THRESHOLD from .threaded_walk import threaded_walk from ..utils import Hasher, exclude_from_zarr @@ -133,32 +134,17 @@ def md5file_nocache(filepath: str | Path) -> str: Compute the MD5 digest of a file without caching with fscacher, which has been shown to slow things down for the large numbers of files typically present in Zarrs - """ - return Digester(["md5"])(filepath)["md5"] - -def multipart_md5file_nocache(filepath: str | Path) -> str: - """ - Compute the S3 multipart ETag for a file. - - Splits the file into parts of ``part_size`` bytes, hashes each part with - MD5, then returns ``MD5(concat(part_md5s))-{num_parts}``, matching what S3 - stores as the ETag for a multipart upload. + If the file is larger than `ZARR_LARGE_CHUNK_THRESHOLD`, the computed checksum is not a + traditional md5 checksum, but is instead an S3 multipart ETag. """ if isinstance(filepath, str): filepath = Path(filepath) - part_size = DandiETag(filepath.stat().st_size)._part_gen.initial_part_size - part_md5s = b"" - num_parts = 0 - with open(filepath, "rb") as f: - while True: - chunk = f.read(part_size) - if not chunk: - break - part_md5s += hashlib.md5(chunk).digest() - num_parts += 1 - return f"{hashlib.md5(part_md5s).hexdigest()}-{num_parts}" + if filepath.stat().st_size > ZARR_LARGE_CHUNK_THRESHOLD: + return get_dandietag(filepath).as_str() + + return Digester(["md5"])(filepath)["md5"] def checksum_zarr_dir( diff --git a/dandi/tests/test_files.py b/dandi/tests/test_files.py index 9875d2087..03ec6a213 100644 --- a/dandi/tests/test_files.py +++ b/dandi/tests/test_files.py @@ -554,6 +554,7 @@ def spy_multipart_upload(**kwargs): # Set threshold to 0 so every chunk is treated as "large" with ( patch("dandi.files.zarr.ZARR_LARGE_CHUNK_THRESHOLD", 0), + patch("dandi.support.digests.ZARR_LARGE_CHUNK_THRESHOLD", 0), patch( "dandi.files.zarr._multipart_upload", spy_multipart_upload, @@ -590,6 +591,7 @@ def spy_multipart_upload(**kwargs): mixed_threshold = 200 with ( patch("dandi.files.zarr.ZARR_LARGE_CHUNK_THRESHOLD", mixed_threshold), + patch("dandi.support.digests.ZARR_LARGE_CHUNK_THRESHOLD", mixed_threshold), patch("dandi.files.zarr._multipart_upload", spy_multipart_upload), ): asset = zf.upload(new_dandiset.dandiset, {}) From 871294b74d5b5672dd83eefc12a0f8f23da3bb96 Mon Sep 17 00:00:00 2001 From: Jacob Nesbitt Date: Wed, 29 Apr 2026 13:24:36 -0400 Subject: [PATCH 10/16] Ignore incorrect type checking error --- dandi/support/digests.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dandi/support/digests.py b/dandi/support/digests.py index c35522984..d57486a6e 100644 --- a/dandi/support/digests.py +++ b/dandi/support/digests.py @@ -142,7 +142,8 @@ def md5file_nocache(filepath: str | Path) -> str: filepath = Path(filepath) if filepath.stat().st_size > ZARR_LARGE_CHUNK_THRESHOLD: - return get_dandietag(filepath).as_str() + # For some reason the type checker treats this return as an Any type + return get_dandietag(filepath).as_str() # type: ignore [no-any-return] return Digester(["md5"])(filepath)["md5"] From a5bfdd79fc73471c38fa14e4f2d81562e09ee08a Mon Sep 17 00:00:00 2001 From: Yaroslav Halchenko Date: Wed, 29 Apr 2026 19:51:28 -0400 Subject: [PATCH 11/16] Rename zarr upload batch vars: large_items->multipart_items, items_to_upload->singlepart_items The split of a batch into "items above the 5GB threshold" and "items at or below" is really a split between items routed through the multipart upload endpoint and items uploaded with a single signed PUT. Naming them after the upload mechanism rather than the size makes the subsequent capability/gating logic read more naturally. Pure rename, no behavior change. Co-Authored-By: Claude Code 2.1.123 / Claude Opus 4.7 (1M context) --- dandi/files/zarr.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/dandi/files/zarr.py b/dandi/files/zarr.py index e77a8270c..3df279b50 100644 --- a/dandi/files/zarr.py +++ b/dandi/files/zarr.py @@ -743,10 +743,10 @@ def mkzarr() -> str: # Items to upload in this batch (may be retried e.g. due to # 403 errors because of timed-out upload URLs) all_items = list(items) - large_items = [ + multipart_items = [ it for it in all_items if it.size > ZARR_LARGE_CHUNK_THRESHOLD ] - items_to_upload = [ + singlepart_items = [ it for it in all_items if it.size <= ZARR_LARGE_CHUNK_THRESHOLD ] max_retries = 5 @@ -756,7 +756,7 @@ def mkzarr() -> str: zcc.add_leaf(Path(it.entry_path), it.size, it.digest) # Upload chunks above 5GB individually via multipart upload - for it in large_items: + for it in multipart_items: # Yield uploading status yield from _multipart_upload( client=client, @@ -780,9 +780,9 @@ def mkzarr() -> str: } # Upload the remaining files using single part upload - while items_to_upload and retry_count <= max_retries: + while singlepart_items and retry_count <= max_retries: # Prepare upload requests for current items - uploading = [it.upload_request() for it in items_to_upload] + uploading = [it.upload_request() for it in singlepart_items] if retry_count == 0: lgr.debug( @@ -814,7 +814,7 @@ def mkzarr() -> str: upload_url=signed_url, item=it, ) - for (signed_url, it) in zip(r, items_to_upload) + for (signed_url, it) in zip(r, singlepart_items) ] changed = True @@ -846,20 +846,20 @@ def mkzarr() -> str: ) # Prepare for next iteration with retry items - if items_to_upload := retry_items: + if singlepart_items := retry_items: retry_count += 1 if retry_count <= max_retries: lgr.info( "%s: %s got 403 errors, requesting new URLs", asset_path, - pluralize(len(items_to_upload), "file"), + pluralize(len(singlepart_items), "file"), ) # Small delay before retry sleep(1 * retry_count) # Check if we exhausted retries - if items_to_upload: - nfiles_str = pluralize(len(items_to_upload), "file") + if singlepart_items: + nfiles_str = pluralize(len(singlepart_items), "file") raise UploadError( f"{asset_path}: failed to upload {nfiles_str} " f"after {max_retries} retries due to repeated 403 errors" From 4b8134051ccebc260f0c3ee1d0a13135ebf224f5 Mon Sep 17 00:00:00 2001 From: Yaroslav Halchenko Date: Wed, 29 Apr 2026 20:53:37 -0400 Subject: [PATCH 12/16] Gate zarr multipart upload on server capability dandi-archive#2784 (still open) introduces the ``/uploads/zarr/initialize/`` endpoint required to upload zarr chunks larger than 5 GiB. Until that lands and is deployed, posting to the endpoint returns 404 and the multi-part code path here would surface that as an opaque ``HTTP404Error`` from inside ``_multipart_upload``. Add a lazy ``DandiAPIClient.supports_zarr_multipart_upload`` cached property that probes the endpoint once with an empty POST body: 404 means the server lacks the feature, anything else (400 for the bogus payload, 401/403 for auth) means it is present. In ``ZarrAsset.iter_upload``, right after the batch is split into ``multipart_items`` and ``singlepart_items``, raise an ``UploadError`` when there are multipart items and the server does not support them. The error reports the count and aggregate size of the oversized chunks plus the path of the largest one, mirroring the wording proposed in dandi/dandi-cli#1827. Marked with a TODO: the check (and the cached property) can be removed once all supported servers ship dandi-archive#2784 (> 0.23.0). Co-Authored-By: Claude Code 2.1.123 / Claude Opus 4.7 (1M context) --- dandi/dandiapi.py | 22 ++++++++++++++++++++++ dandi/files/zarr.py | 18 ++++++++++++++++++ 2 files changed, 40 insertions(+) diff --git a/dandi/dandiapi.py b/dandi/dandiapi.py index 61566425b..f2d0cd49d 100644 --- a/dandi/dandiapi.py +++ b/dandi/dandiapi.py @@ -19,6 +19,7 @@ from datetime import datetime from enum import Enum from fnmatch import fnmatchcase +from functools import cached_property import json import os.path from pathlib import Path, PurePosixPath @@ -565,6 +566,27 @@ def _get_keyring_ids(self) -> tuple[str, str]: def _instance_id(self) -> str: return self.dandi_instance.name.upper() + @cached_property + def supports_zarr_multipart_upload(self) -> bool: + """ + Whether the server exposes the zarr multipart upload endpoints + introduced in dandi-archive#2784 (``/uploads/zarr/initialize/`` and + friends). + + Probed once per client by POSTing an empty body to + ``/uploads/zarr/initialize/``: if the route does not exist the server + returns 404; any other status (400 for the bad payload, 401/403 for + auth, etc.) means the route is present and the server supports + multipart zarr uploads. + """ + try: + self.post("/uploads/zarr/initialize/", json={}) + except HTTP404Error: + return False + except requests.HTTPError: + return True + return True + def get_dandiset( self, dandiset_id: str, version_id: str | None = None, lazy: bool = True ) -> RemoteDandiset: diff --git a/dandi/files/zarr.py b/dandi/files/zarr.py index 3df279b50..6f1a15bc2 100644 --- a/dandi/files/zarr.py +++ b/dandi/files/zarr.py @@ -749,6 +749,24 @@ def mkzarr() -> str: singlepart_items = [ it for it in all_items if it.size <= ZARR_LARGE_CHUNK_THRESHOLD ] + # TODO: remove once all servers are > 0.23.0 (i.e. ship + # dandi-archive#2784) and the multipart zarr upload + # endpoints are universally available; the capability + # check would then be unnecessary. + if multipart_items and not client.supports_zarr_multipart_upload: + largest = max(multipart_items, key=lambda it: it.size) + total_large_size = sum(it.size for it in multipart_items) + raise UploadError( + f"{asset_path}:" + f" {pluralize(len(multipart_items), 'Zarr chunk')}" + f" totaling {total_large_size / 1024**3:.2f} GiB" + f" exceed the S3 single-part upload limit of" + f" {ZARR_LARGE_CHUNK_THRESHOLD / 1024**3:.0f} GiB" + f" (largest: {largest.entry_path}," + f" {largest.size / 1024**3:.2f} GiB);" + f" the server does not support multipart zarr" + f" uploads (dandi-archive#2784)." + ) max_retries = 5 retry_count = 0 # Add all items to checksum tree (only done once) From 39cc742d44a89c02d3be663ca345132fd97f7824 Mon Sep 17 00:00:00 2001 From: Yaroslav Halchenko Date: Wed, 29 Apr 2026 20:54:11 -0400 Subject: [PATCH 13/16] Translate per-file ``current`` from _multipart_upload to whole-zarr cumulative MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ``_multipart_upload`` (used here to upload zarr chunks larger than 5 GiB) yields progress dicts whose ``current`` is the number of bytes uploaded *within the chunk currently being transferred*. The surrounding ``ZarrAsset.iter_upload`` loop reports ``current`` as the number of bytes uploaded *across the whole zarr*. Yielded as-is, the inner per-file values caused ``current`` to jump backwards each time a new multipart chunk started, breaking progress bars driven from that field. Wrap the inner generator: when an "uploading" status carries a ``current`` field, translate it by the cumulative bytes uploaded so far before re-yielding; pass other status dicts through unchanged. The trailing post-loop yield is dropped — the last per-part translated yield already reports the full cumulative size for that chunk. Co-Authored-By: Claude Code 2.1.123 / Claude Opus 4.7 (1M context) --- dandi/files/zarr.py | 31 ++++++++++++++++++++----------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/dandi/files/zarr.py b/dandi/files/zarr.py index 6f1a15bc2..91dbdda84 100644 --- a/dandi/files/zarr.py +++ b/dandi/files/zarr.py @@ -773,10 +773,14 @@ def mkzarr() -> str: for it in all_items: zcc.add_leaf(Path(it.entry_path), it.size, it.digest) - # Upload chunks above 5GB individually via multipart upload + # Upload chunks above 5GB individually via multipart upload. + # ``_multipart_upload`` reports ``current`` as bytes within + # the single chunk being uploaded; translate it to bytes + # uploaded across the whole zarr so progress reporting + # stays monotonic for downstream consumers. for it in multipart_items: - # Yield uploading status - yield from _multipart_upload( + cumulative_before = bytes_uploaded + for status in _multipart_upload( client=client, filepath=it.filepath, asset_path=it.entry_path, @@ -786,16 +790,21 @@ def mkzarr() -> str: "chunk_key": it.entry_path, }, jobs=jobs, - ) - - # Part is finished uploading, yield final progress + ): + if ( + status.get("status") == "uploading" + and "current" in status + ): + cumulative = cumulative_before + status["current"] + yield { + "status": "uploading", + "progress": 100 * cumulative / to_upload.total_size, + "current": cumulative, + } + else: + yield status changed = True bytes_uploaded += it.size - yield { - "status": "uploading", - "progress": 100 * bytes_uploaded / to_upload.total_size, - "current": bytes_uploaded, - } # Upload the remaining files using single part upload while singlepart_items and retry_count <= max_retries: From 684da5ccae0f2371fab2e8dec451e9595561785d Mon Sep 17 00:00:00 2001 From: Yaroslav Halchenko Date: Wed, 29 Apr 2026 21:15:40 -0400 Subject: [PATCH 14/16] Skip zarr multipart-upload tests when server lacks the endpoint MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The two ``ai_generated`` tests added in 9bc29766 patch ``ZARR_LARGE_CHUNK_THRESHOLD`` down so chunks are routed through ``_multipart_upload`` against whatever server backs ``new_dandiset``. The local docker dandi-archive image does not yet ship dandi-archive#2784, so the new ``supports_zarr_multipart_upload`` capability check raises ``UploadError`` before ``_multipart_upload`` is ever called and the tests fail without exercising what they claim to. Probe the live client for capability and ``pytest.skip`` only when the endpoint is missing — keeps the tests effective on setups whose server already carries dandi-archive#2784. Co-Authored-By: Claude Code 2.1.123 / Claude Opus 4.7 (1M context) --- dandi/tests/test_files.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/dandi/tests/test_files.py b/dandi/tests/test_files.py index 03ec6a213..95e927960 100644 --- a/dandi/tests/test_files.py +++ b/dandi/tests/test_files.py @@ -16,7 +16,6 @@ from ..consts import ZARR_MIME_TYPE, dandiset_metadata_file from ..dandiapi import AssetType, RemoteZarrAsset from ..exceptions import UnknownAssetError -from ..files.bases import _multipart_upload as real_multipart_upload from ..files import ( BIDSDatasetDescriptionAsset, DandisetMetadataFile, @@ -30,6 +29,7 @@ dandi_file, find_dandi_files, ) +from ..files.bases import _multipart_upload as real_multipart_upload lgr = get_logger() @@ -540,6 +540,11 @@ def test_upload_zarr_entry_content_type(new_dandiset, tmp_path): @pytest.mark.ai_generated def test_upload_zarr_large_chunks(new_dandiset, tmp_path): """Chunks above ZARR_LARGE_CHUNK_THRESHOLD are uploaded via multipart upload.""" + if not new_dandiset.client.supports_zarr_multipart_upload: + pytest.skip( + "Server does not expose the zarr multipart upload endpoints" + " (dandi-archive#2784)" + ) filepath = tmp_path / "example.zarr" zarr.save(filepath, np.arange(1000), np.arange(1000, 0, -1)) zf = dandi_file(filepath) @@ -571,6 +576,11 @@ def spy_multipart_upload(**kwargs): @pytest.mark.ai_generated def test_upload_zarr_mixed_chunks(new_dandiset, tmp_path): """Chunks above ZARR_LARGE_CHUNK_THRESHOLD go multipart; smaller ones use single-part upload.""" + if not new_dandiset.client.supports_zarr_multipart_upload: + pytest.skip( + "Server does not expose the zarr multipart upload endpoints" + " (dandi-archive#2784)" + ) filepath = tmp_path / "mixed.zarr" store = zarr.open_group(str(filepath), mode="w") # small array: 10 int64 elements, produces a ~96-byte chunk (compressed) @@ -601,9 +611,7 @@ def spy_multipart_upload(**kwargs): remote_entries = {str(e) for e in asset.iterfiles()} # Only chunk files whose on-disk size exceeds the threshold should be multipart-uploaded large_chunks = { - p - for p in remote_entries - if (filepath / p).stat().st_size > mixed_threshold + p for p in remote_entries if (filepath / p).stat().st_size > mixed_threshold } assert set(multipart_paths) == large_chunks # At least one chunk must have gone each path so the test is meaningful From eb3c470ac868b47852a46157eae1a141fc2bdef6 Mon Sep 17 00:00:00 2001 From: Yaroslav Halchenko Date: Thu, 30 Apr 2026 14:55:12 -0400 Subject: [PATCH 15/16] CI: cross-repo PR tests workflow Add a standalone GitHub Actions workflow, `.github/workflows/cross-repo-pr-tests.yml`, that runs an extra integration matrix when a dandi-cli PR description includes lines such as dandi-archive PR: https://github.com/dandi/dandi-archive/pull/2784 dandi-schema PR: #321 The workflow has three jobs: * `discover-cross-repo-prs` parses the PR body and resolves each marker to repo+sha (handles forks via `gh api`). * `build-archive-image` checks out the dandi-archive PR head (or master if only a schema PR is given), optionally rewrites the pinned `dandischema` line in `pyproject.toml` to a git URL pointing at the schema PR head, builds with `dev/django-public.Dockerfile`, and exports the image as a workflow artifact. * `test-cross-repo` loads that image, force-installs `dandischema` from the PR head into the runner venv, sets `DANDI_TESTS_PULL_DOCKER_COMPOSE=0` so docker compose uses the loaded image instead of pulling Docker Hub, and runs the `--dandi-api` test suite. Triggered on `pull_request: types: [opened, edited, synchronize, reopened]` so adding or removing a marker re-fires *only* this workflow, not the full test matrix in `run-tests.yml`. A per-PR `concurrency:` group cancels older in-flight runs when the body or commits change. A PR template advertises the marker convention; the design notes under `.specify/specs/cross-repo-pr-matrix.md` cover the rationale and edge cases (forks, `pull_request` vs `pull_request_target`, schema-version pin coordination, future generalization to an organization-level reusable workflow). Pattern adapted from bids-standard/bids-examples `.github/workflows/validate_datasets.yml`. Co-Authored-By: Claude Code 2.1.114 / Claude Opus 4.7 (1M context) --- .github/PULL_REQUEST_TEMPLATE.md | 30 ++ .github/workflows/cross-repo-pr-tests.yml | 407 ++++++++++++++++++++++ .specify/specs/cross-repo-pr-matrix.md | 334 ++++++++++++++++++ 3 files changed, 771 insertions(+) create mode 100644 .github/PULL_REQUEST_TEMPLATE.md create mode 100644 .github/workflows/cross-repo-pr-tests.yml create mode 100644 .specify/specs/cross-repo-pr-matrix.md diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md new file mode 100644 index 000000000..aeddabcce --- /dev/null +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -0,0 +1,30 @@ + diff --git a/.github/workflows/cross-repo-pr-tests.yml b/.github/workflows/cross-repo-pr-tests.yml new file mode 100644 index 000000000..80d040df7 --- /dev/null +++ b/.github/workflows/cross-repo-pr-tests.yml @@ -0,0 +1,407 @@ +name: Cross-repo PR tests + +# ---------------------------------------------------------------------------- +# Cross-repo PR matrix. +# +# A PR description may include marker lines like: +# dandi-archive PR: #2784 +# dandi-archive PR: https://github.com/dandi/dandi-archive/pull/2784 +# dandi-schema PR: #321 +# dandi-schema PR: https://github.com/dandi/dandi-schema/pull/321 +# When such a marker is present, three jobs run: +# * `discover-cross-repo-prs` parses the body and resolves each marker to +# repo+sha (handles forks via `gh api`). +# * `build-archive-image` checks out the dandi-archive PR head (or master +# if only a schema PR is given), optionally rewrites the pinned +# `dandischema` line in `pyproject.toml` to a git URL pointing at the +# schema PR head, builds with `dev/django-public.Dockerfile`, and +# exports the image as a workflow artifact. +# * `test-cross-repo` loads that image, force-installs `dandischema` from +# the PR head into the runner venv, sets `DANDI_TESTS_PULL_DOCKER_COMPOSE=0` +# so docker compose uses the loaded image instead of pulling Docker Hub, +# and runs the `--dandi-api` test suite. +# +# Pattern adapted from bids-standard/bids-examples +# `.github/workflows/validate_datasets.yml`. +# See `.specify/specs/cross-repo-pr-matrix.md` for full design notes. +# +# This workflow is split out from `run-tests.yml` so it can react to PR-body +# edits (`pull_request: types: edited`) — adding or removing a marker +# re-fires only this workflow, not the full test matrix. +# +# TODO(generalize): lift this into an organization-level reusable workflow +# at e.g. `dandi/.github/.github/workflows/cross-repo-pr-matrix.yml` (or a +# composite action), so dandi-cli, dandi-schema, and dandi-archive can each +# `uses:` it to run the *same* dandi-cli-against-tandem integration test. +# Behaviour should depend on the calling repo: +# - inside dandi-cli PR: head of *this* PR + markers for archive/schema +# - inside dandi-schema PR: dandi-cli master (or `dandi-cli PR:` marker) +# against dandi-archive master (or `dandi-archive PR:` marker), with +# dandischema pinned to *this* PR's head automatically +# - inside dandi-archive PR: archive image built from *this* PR's head, +# dandi-cli master (or marker), dandi-schema master (or marker) +# Then `${{ github.repository }}` / `${{ github.event.pull_request.head.sha }}` +# become the implicit "self" override and the marker parsing only handles +# the *other* two. Could generalize further to an arbitrary list of +# `repo -> override` triples (Docker image override vs pip override) so +# e.g. NWB/HDMF stacks could reuse the pattern. Out of scope for the +# initial landing here — first prove the in-repo version works end-to-end +# against dandi-cli#1839 + dandi-archive#2784. +# ---------------------------------------------------------------------------- + +on: + pull_request: + # `edited` re-fires when the PR body / title / base is changed, so adding + # or removing a `dandi-archive PR:` / `dandi-schema PR:` marker re-runs + # the cross-repo flow without touching the rest of CI. + types: [opened, edited, synchronize, reopened] + workflow_dispatch: + inputs: + dandi_cli_pr: + description: >- + dandi-cli PR (number, #N, or full URL). Leave blank to test + against the dispatched ref of dandi/dandi-cli (master by + default). Mutually exclusive with dandi_cli_branch. + required: false + default: '' + dandi_cli_branch: + description: >- + dandi-cli branch in dandi/dandi-cli (e.g. "master" or a + feature branch). Mutually exclusive with dandi_cli_pr. + required: false + default: '' + dandi_schema_pr: + description: >- + dandi-schema PR (number, #N, or full URL). At least one + schema/archive override (PR or branch) must be supplied for + the run to do anything. + required: false + default: '' + dandi_schema_branch: + description: >- + dandi-schema branch in dandi/dandi-schema. Mutually + exclusive with dandi_schema_pr. + required: false + default: '' + dandi_archive_pr: + description: >- + dandi-archive PR (number, #N, or full URL). At least one + schema/archive override (PR or branch) must be supplied for + the run to do anything. + required: false + default: '' + dandi_archive_branch: + description: >- + dandi-archive branch in dandi/dandi-archive (e.g. "master" + to test against unreleased master). Mutually exclusive with + dandi_archive_pr. + required: false + default: '' + +# Cancel an in-flight cross-repo run when the PR is updated again — picking +# up the newest body/commits matters more than finishing the old one. +concurrency: + group: cross-repo-pr-tests-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + +defaults: + run: + shell: bash + +jobs: + discover-cross-repo-prs: + runs-on: ubuntu-latest + outputs: + cli_repo: ${{ steps.parse.outputs.cli_repo }} + cli_ref: ${{ steps.parse.outputs.cli_ref }} + archive_repo: ${{ steps.parse.outputs.archive_repo }} + archive_ref: ${{ steps.parse.outputs.archive_ref }} + schema_repo: ${{ steps.parse.outputs.schema_repo }} + schema_ref: ${{ steps.parse.outputs.schema_ref }} + should_run: ${{ steps.parse.outputs.should_run }} + steps: + - name: Resolve cross-repo refs + id: parse + env: + EVENT: ${{ github.event_name }} + BODY: ${{ github.event.pull_request.body }} + IN_CLI_PR: ${{ inputs.dandi_cli_pr }} + IN_CLI_BRANCH: ${{ inputs.dandi_cli_branch }} + IN_SCHEMA_PR: ${{ inputs.dandi_schema_pr }} + IN_SCHEMA_BRANCH: ${{ inputs.dandi_schema_branch }} + IN_ARCHIVE_PR: ${{ inputs.dandi_archive_pr }} + IN_ARCHIVE_BRANCH: ${{ inputs.dandi_archive_branch }} + GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} + run: | + set -euo pipefail + + # Two event sources: + # * pull_request: parse `dandi-{cli,schema,archive} PR:` + # and `... branch:` markers from the PR + # body. The dandi-cli source is implicit + # from the event ref unless an explicit + # `dandi-cli` marker is given. + # * workflow_dispatch: read the six dandi_*_pr / dandi_*_branch + # inputs. At least one schema/archive + # override is required for the run to + # do anything. + CLI_PR=; CLI_BRANCH= + SCHEMA_PR=; SCHEMA_BRANCH= + ARCH_PR=; ARCH_BRANCH= + + extract_num() { + # Accept "N", "#N", or a full GitHub PR URL; print bare number. + printf '%s' "${1:-}" \ + | grep -oiP '(https?://github\.com/dandi/dandi-[a-z]+/pulls?/|dandi/dandi-[a-z]+#|#)?\K[0-9]+' \ + | head -1 || true + } + + if [ "$EVENT" = "workflow_dispatch" ]; then + CLI_PR=$(extract_num "$IN_CLI_PR") + SCHEMA_PR=$(extract_num "$IN_SCHEMA_PR") + ARCH_PR=$(extract_num "$IN_ARCHIVE_PR") + CLI_BRANCH="${IN_CLI_BRANCH:-}" + SCHEMA_BRANCH="${IN_SCHEMA_BRANCH:-}" + ARCH_BRANCH="${IN_ARCHIVE_BRANCH:-}" + elif [ "$EVENT" = "pull_request" ]; then + printf '%s' "${BODY:-}" > /tmp/pr_body.txt + # Branch names accept word chars, dots, slashes, dashes. + CLI_PR=$(grep -oiP 'dandi-cli\s+PR:\s*(https://github\.com/dandi/dandi-cli/pulls?/|dandi/dandi-cli#)?\K[0-9]+' /tmp/pr_body.txt | head -1 || true) + CLI_BRANCH=$(grep -oiP 'dandi-cli\s+branch:\s*\K[\w./-]+' /tmp/pr_body.txt | head -1 || true) + SCHEMA_PR=$(grep -oiP 'dandi-schema\s+PR:\s*(https://github\.com/dandi/dandi-schema/pulls?/|dandi/dandi-schema#)?\K[0-9]+' /tmp/pr_body.txt | head -1 || true) + SCHEMA_BRANCH=$(grep -oiP 'dandi-schema\s+branch:\s*\K[\w./-]+' /tmp/pr_body.txt | head -1 || true) + ARCH_PR=$(grep -oiP 'dandi-archive\s+PR:\s*(https://github\.com/dandi/dandi-archive/pulls?/|dandi/dandi-archive#)?\K[0-9]+' /tmp/pr_body.txt | head -1 || true) + ARCH_BRANCH=$(grep -oiP 'dandi-archive\s+branch:\s*\K[\w./-]+' /tmp/pr_body.txt | head -1 || true) + fi + + # Mutual exclusion: PR *or* branch per repo, never both. + assert_not_both() { + local repo=$1 pr=$2 branch=$3 + if [ -n "$pr" ] && [ -n "$branch" ]; then + echo "::error::$repo: specify either a PR or a branch, not both (got PR='$pr' branch='$branch')" + exit 1 + fi + } + assert_not_both dandi-cli "$CLI_PR" "$CLI_BRANCH" + assert_not_both dandi-schema "$SCHEMA_PR" "$SCHEMA_BRANCH" + assert_not_both dandi-archive "$ARCH_PR" "$ARCH_BRANCH" + + # The whole point of this workflow is validating against an + # *unreleased* dandi-schema or dandi-archive ref. If neither + # has any kind of pointer (PR or branch) the regular matrix + # already covers everything — skip with no `gh api` calls and + # no image rebuild. + if [ -z "$SCHEMA_PR$SCHEMA_BRANCH$ARCH_PR$ARCH_BRANCH" ]; then + echo "No dandi-schema / dandi-archive pointer supplied — skipping." + echo "should_run=false" >> "$GITHUB_OUTPUT" + exit 0 + fi + + # Security: refuse to build/run code from forks. + # + # The cross-repo workflow runs in a context that may have access + # to repository secrets (when triggered by a maintainer or by + # `workflow_dispatch`). Building a Docker image from an arbitrary + # fork would let a malicious user execute their code in that + # context — a classic supply-chain hole. So PR head repos are + # required to live under the dandi/ org. Branches are inherently + # safe because we only ever query `repos/dandi//branches/...`. + assert_dandi_org() { + local label=$1 repo=$2 + case "$repo" in + dandi/*) ;; + *) + echo "::error::$label: refusing to build from non-dandi repo '$repo' — cross-repo overrides must come from the dandi/ org to avoid running untrusted code with workflow secrets." + exit 1 + ;; + esac + } + + resolve_pr() { + # $1=output-key, $2=full-repo (dandi/dandi-X), $3=PR number + local key=$1 fullrepo=$2 num=$3 + [ -n "$num" ] || return 0 + echo "Resolving $fullrepo PR #$num" + read -r REPO SHA < <(gh api "repos/$fullrepo/pulls/$num" \ + --jq '[.head.repo.full_name, .head.sha] | @tsv') + assert_dandi_org "$fullrepo PR #$num" "$REPO" + echo "${key}_repo=$REPO" >> "$GITHUB_OUTPUT" + echo "${key}_ref=$SHA" >> "$GITHUB_OUTPUT" + echo " -> $REPO @ $SHA (PR #$num)" + } + resolve_branch() { + # $1=output-key, $2=full-repo (dandi/dandi-X), $3=branch name. + # We always resolve under dandi/, so the source is trusted + # by construction. Pin to a SHA for reproducibility. + local key=$1 fullrepo=$2 branch=$3 + [ -n "$branch" ] || return 0 + echo "Resolving $fullrepo branch '$branch'" + SHA=$(gh api "repos/$fullrepo/branches/$branch" --jq '.commit.sha' 2>/dev/null || true) + if [ -z "$SHA" ]; then + echo "::error::$fullrepo: branch '$branch' not found in dandi/ org repo" + exit 1 + fi + echo "${key}_repo=$fullrepo" >> "$GITHUB_OUTPUT" + echo "${key}_ref=$SHA" >> "$GITHUB_OUTPUT" + echo " -> $fullrepo @ $SHA (branch $branch)" + } + + resolve_pr cli dandi/dandi-cli "$CLI_PR" + resolve_branch cli dandi/dandi-cli "$CLI_BRANCH" + resolve_pr schema dandi/dandi-schema "$SCHEMA_PR" + resolve_branch schema dandi/dandi-schema "$SCHEMA_BRANCH" + resolve_pr archive dandi/dandi-archive "$ARCH_PR" + resolve_branch archive dandi/dandi-archive "$ARCH_BRANCH" + + # We've already early-exited above when neither schema nor + # archive has any pointer, so reaching this point means there's + # a real override and the build/test jobs should run. + echo "should_run=true" >> "$GITHUB_OUTPUT" + + build-archive-image: + needs: discover-cross-repo-prs + if: needs.discover-cross-repo-prs.outputs.should_run == 'true' + runs-on: ubuntu-24.04 + steps: + - name: Check out dandi-archive (PR head, or master) + uses: actions/checkout@v6 + with: + repository: ${{ needs.discover-cross-repo-prs.outputs.archive_repo || 'dandi/dandi-archive' }} + ref: ${{ needs.discover-cross-repo-prs.outputs.archive_ref || 'master' }} + fetch-depth: 0 + + - name: Patch pyproject.toml to use dandi-schema PR head + if: needs.discover-cross-repo-prs.outputs.schema_repo != '' + env: + SCHEMA_REPO: ${{ needs.discover-cross-repo-prs.outputs.schema_repo }} + SCHEMA_REF: ${{ needs.discover-cross-repo-prs.outputs.schema_ref }} + run: | + set -euo pipefail + python - <<'PY' + import os, pathlib, re + repo = os.environ["SCHEMA_REPO"] + ref = os.environ["SCHEMA_REF"] + spec = f'"dandischema @ git+https://github.com/{repo}@{ref}"' + p = pathlib.Path("pyproject.toml") + before = p.read_text() + after = re.sub(r'"dandischema[^"]*"', spec, before, count=1) + if before == after: + raise SystemExit("Failed to find dandischema dependency line in pyproject.toml") + p.write_text(after) + print(f"Replaced dandischema with {spec}") + PY + # uv resolves direct refs at sync time; a stale uv.lock would block it. + if [ -f uv.lock ]; then + pip install --quiet uv + uv lock + fi + + - name: Build Docker image + run: | + docker build \ + -t dandiarchive/dandiarchive-api \ + -f dev/django-public.Dockerfile \ + . + + - name: Export Docker image + run: docker image save -o dandiarchive-api.tgz dandiarchive/dandiarchive-api + + - name: Upload Docker image tarball + uses: actions/upload-artifact@v7 + with: + name: dandiarchive-api-cross-repo + path: dandiarchive-api.tgz + retention-days: 1 + + test-cross-repo: + needs: [discover-cross-repo-prs, build-archive-image] + if: needs.discover-cross-repo-prs.outputs.should_run == 'true' && needs.build-archive-image.result == 'success' + runs-on: ubuntu-latest + env: + NO_ET: 1 + DANDI_PAGINATION_DISABLE_FALLBACK: "1" + DANDI_TESTS_PERSIST_DOCKER_COMPOSE: "1" + # Skip `docker compose pull` so the locally loaded image is used. + DANDI_TESTS_PULL_DOCKER_COMPOSE: "0" + steps: + # Empty `repository` / `ref` fall back to actions/checkout's defaults + # (= the event-supplied refs for `pull_request`, the dispatched ref + # for `workflow_dispatch`). Setting them is what lets a manual run + # target a specific dandi-cli PR via the `dandi_cli_pr` input. + - name: Check out dandi-cli (PR head, dispatched ref, or master) + uses: actions/checkout@v6 + with: + repository: ${{ needs.discover-cross-repo-prs.outputs.cli_repo }} + ref: ${{ needs.discover-cross-repo-prs.outputs.cli_ref }} + fetch-depth: 0 + + - name: Set up Python 3.13 + uses: actions/setup-python@v6 + with: + python-version: '3.13' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip wheel + pip install ".[extras,test]" + + - name: Override dandischema with PR head + if: needs.discover-cross-repo-prs.outputs.schema_repo != '' + env: + SCHEMA_REPO: ${{ needs.discover-cross-repo-prs.outputs.schema_repo }} + SCHEMA_REF: ${{ needs.discover-cross-repo-prs.outputs.schema_ref }} + run: | + pip install --upgrade --force-reinstall \ + "dandischema @ git+https://github.com/${SCHEMA_REPO}@${SCHEMA_REF}" + + - name: Download prebuilt dandi-archive image + uses: actions/download-artifact@v8 + with: + name: dandiarchive-api-cross-repo + + - name: Load Docker image + run: docker image load -i dandiarchive-api.tgz + + - name: Show cross-repo overrides + run: | + echo "event=${{ github.event_name }}" + echo "cli_repo=${{ needs.discover-cross-repo-prs.outputs.cli_repo }}" + echo "cli_ref=${{ needs.discover-cross-repo-prs.outputs.cli_ref }}" + echo "archive_repo=${{ needs.discover-cross-repo-prs.outputs.archive_repo }}" + echo "archive_ref=${{ needs.discover-cross-repo-prs.outputs.archive_ref }}" + echo "schema_repo=${{ needs.discover-cross-repo-prs.outputs.schema_repo }}" + echo "schema_ref=${{ needs.discover-cross-repo-prs.outputs.schema_ref }}" + git rev-parse HEAD + python -c "import dandischema; print('dandischema', dandischema.__version__)" + docker image inspect dandiarchive/dandiarchive-api --format '{{.Id}} {{.Created}}' + + - name: Run DANDI API tests against PR archive image + run: | + export DANDI_TESTS_AUDIT_CSV=/tmp/audit.csv + python -m pytest -s -v --cov=dandi --cov-report=xml --dandi-api dandi + if [ ! -e /tmp/audit.csv ]; then + echo "Audit file not created" + exit 1 + fi + + - name: Dump Docker Compose logs + if: failure() + run: | + docker compose \ + -f dandi/tests/data/dandiarchive-docker/docker-compose.yml \ + logs --timestamps + + - name: Shut down Docker Compose + if: always() + run: | + docker compose \ + -f dandi/tests/data/dandiarchive-docker/docker-compose.yml \ + down -v + + - name: Upload coverage to Codecov + uses: codecov/codecov-action@v6 + with: + token: ${{ secrets.CODECOV_TOKEN }} + files: ./coverage.xml + flags: cross-repo + fail_ci_if_error: false diff --git a/.specify/specs/cross-repo-pr-matrix.md b/.specify/specs/cross-repo-pr-matrix.md new file mode 100644 index 000000000..e5e7e55b0 --- /dev/null +++ b/.specify/specs/cross-repo-pr-matrix.md @@ -0,0 +1,334 @@ +# Cross-repo PR matrix testing in dandi-cli CI + +## Goal + +Allow a dandi-cli PR description to declare it depends on a PR in +`dandi/dandi-schema` and/or `dandi/dandi-archive`. When such a marker is +detected, CI adds a new matrix run that exercises the dandi-cli changes +**against the referenced PR(s)** of those repos, in addition to the regular +matrix (which uses released `dandischema` from PyPI and the prebuilt +`dandiarchive/dandiarchive-api` image from Docker Hub). + +Reference pattern: bids-standard/bids-examples +[validate_datasets.yml](https://github.com/bids-standard/bids-examples/blob/master/.github/workflows/validate_datasets.yml#L35). + +Immediate target: +- dandi/dandi-cli#1839 (this branch) tested against dandi/dandi-archive#2784. + +## Background — what currently happens + +`.github/workflows/run-tests.yml` defines a single `test` job with a Python × +OS × `mode` matrix. Test runs that need a live archive use the +`dandi-api` mode. The archive is brought up in +`dandi/tests/fixtures.py::docker_compose_setup` via +`docker compose` against `dandi/tests/data/dandiarchive-docker/docker-compose.yml`, +which references the prebuilt image: + +```yaml +services: + django: + image: dandiarchive/dandiarchive-api + celery: + image: dandiarchive/dandiarchive-api +``` + +The image is built by `dandi/dandi-archive` from `dev/django-public.Dockerfile` +(`uv sync --extra development` over the checked-out tree) and pushed to Docker +Hub. `dandischema` is an ordinary PyPI dependency in **both** repos +(`dandi-cli/pyproject.toml`: `dandischema ~= 0.12.0`; +`dandi-archive/pyproject.toml`: `dandischema==0.12.1`). dandi-archive itself +already does the build-image-from-source dance for its own +`cli-integration.yml` workflow — we mirror that here. + +## PR-body marker syntax + +Two parallel marker forms per repo — pick one, never both. Labels are +matched case-insensitively, whitespace-tolerant. + +PR markers (a `#NNN` number or a full URL): + +``` +dandi-archive PR: #2784 +dandi-archive PR: https://github.com/dandi/dandi-archive/pull/2784 +dandi-schema PR: #321 +dandi-schema PR: https://github.com/dandi/dandi-schema/pull/321 +dandi-cli PR: #1839 (rare; see "implicit cli source" below) +``` + +Branch markers (the branch name in `dandi/`): + +``` +dandi-archive branch: master +dandi-archive branch: feature/foo +dandi-schema branch: maint-0.12 +dandi-cli branch: master +``` + +Branches use `[\w./-]+`, so `feature/foo`, `maint-0.12.x`, etc. are accepted. +The discover step `gh api repos/dandi//branches/`s the branch to +pin to a SHA so the build is reproducible. + +**Implicit cli source.** On `pull_request` events, the dandi-cli code under +test defaults to the PR's own head ref (via `actions/checkout`'s default +behaviour). The `dandi-cli` markers above only matter when you want to +deliberately *override* that — e.g. test a dandi-cli PR's archive interaction +against a *different* dandi-cli ref. Most users won't need them. + +## Fork rejection (security) + +PR resolution returns `head.repo.full_name`. The discover step refuses to +proceed if that's not under `dandi/`: + +``` +::error::dandi/dandi-archive PR #N: refusing to build from non-dandi repo 'attacker/dandi-archive' — cross-repo overrides must come from the dandi/ org to avoid running untrusted code with workflow secrets. +``` + +Why: `pull_request` triggered from a fork of dandi-cli already runs in +sandboxed mode (no secrets, read-only `GITHUB_TOKEN`). But `workflow_dispatch` +runs from the dispatched ref *with full secrets*, and a maintainer triggering +a manual run on a fork-PR pointer would otherwise be running attacker code. +Rejecting forks unconditionally — including on `pull_request` — is the +simplest invariant and avoids the +[`pull_request_target` foot-gun](https://securitylab.github.com/research/github-actions-preventing-pwn-requests/). +Branches are inherently safe because we resolve them under `dandi/` only; +a fork's branch name happens to match an upstream branch is irrelevant. + +## Manual `workflow_dispatch` mode + +The same workflow is also runnable from the Actions tab with six string +inputs (one PR + one branch per repo, mutually exclusive within a repo): + +- `dandi_cli_pr` / `dandi_cli_branch` +- `dandi_schema_pr` / `dandi_schema_branch` +- `dandi_archive_pr` / `dandi_archive_branch` + +PR inputs accept a number, `#N`, or full GitHub URL. Branch inputs accept +the branch name as it appears in `dandi/`. If you supply both a PR +and a branch for the same repo, the discover step `::error::`s and +exits. + +If neither `dandi_schema_pr` nor `dandi_archive_pr` is supplied (so +there is no override that diverges from the regular test matrix), the +discover job sets `should_run=false` and the downstream `build-archive-image` +/ `test-cross-repo` jobs are silently skipped. A `dandi_cli_pr`-only +dispatch is treated the same way — that combination would just retest +released schema + released archive, which the regular `run-tests.yml` +matrix already covers. There is no PR body in this mode, so the +parsing branch is skipped entirely. + +The same skip semantics apply to `pull_request` events with no markers +in the body: the workflow runs `discover-cross-repo-prs` (cheap), sees +no pointers, and short-circuits — so adding this workflow doesn't add +any meaningful CI cost to PRs that don't opt in. + +## Workflow changes (`.github/workflows/run-tests.yml`) + +### 1. New `discover-cross-repo-prs` job (runs only on `pull_request`) + +Outputs: + +- `archive_repo` (e.g. `dandi/dandi-archive` or fork) +- `archive_ref` (head ref / SHA of that PR) +- `schema_repo` +- `schema_ref` +- `extra_matrix` — a JSON list of matrix `include` entries (empty when no + markers found) + +Implementation sketch: + +```yaml +discover-cross-repo-prs: + if: github.event_name == 'pull_request' + runs-on: ubuntu-latest + outputs: + extra_matrix: ${{ steps.build.outputs.extra_matrix }} + archive_repo: ${{ steps.parse.outputs.archive_repo }} + archive_ref: ${{ steps.parse.outputs.archive_ref }} + schema_repo: ${{ steps.parse.outputs.schema_repo }} + schema_ref: ${{ steps.parse.outputs.schema_ref }} + steps: + - name: Parse PR body + id: parse + env: + BODY: ${{ github.event.pull_request.body }} + GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} + run: | + printf '%s' "$BODY" > /tmp/pr_body.txt + # numeric id, with or without URL prefix + ARCH_PR=$(grep -oiP 'dandi-archive\s+PR:\s*(https://github\.com/dandi/dandi-archive/pulls?/|dandi/dandi-archive#)*\K[0-9]+' /tmp/pr_body.txt | head -1 || true) + SCHEMA_PR=$(grep -oiP 'dandi-schema\s+PR:\s*(https://github\.com/dandi/dandi-schema/pulls?/|dandi/dandi-schema#)*\K[0-9]+' /tmp/pr_body.txt | head -1 || true) + + resolve() { # repo, prnum -> "owner/reporefsha" + gh api "repos/$1/pulls/$2" \ + --jq '[.head.repo.full_name, .head.ref, .head.sha] | @tsv' + } + if [ -n "$ARCH_PR" ]; then + IFS=$'\t' read -r REPO REF SHA < <(resolve dandi/dandi-archive "$ARCH_PR") + echo "archive_repo=$REPO" >> "$GITHUB_OUTPUT" + echo "archive_ref=$SHA" >> "$GITHUB_OUTPUT" + fi + if [ -n "$SCHEMA_PR" ]; then + IFS=$'\t' read -r REPO REF SHA < <(resolve dandi/dandi-schema "$SCHEMA_PR") + echo "schema_repo=$REPO" >> "$GITHUB_OUTPUT" + echo "schema_ref=$SHA" >> "$GITHUB_OUTPUT" + fi + + - name: Build extra matrix entry + id: build + run: | + if [ -n "${{ steps.parse.outputs.archive_repo }}${{ steps.parse.outputs.schema_repo }}" ]; then + cat <<'EOF' >> "$GITHUB_OUTPUT" + extra_matrix=[{"os":"ubuntu-latest","python":"3.13","mode":"dandi-api","cross_repo":"true"}] + EOF + else + echo 'extra_matrix=[]' >> "$GITHUB_OUTPUT" + fi +``` + +### 2. Pre-build the dandi-archive Docker image (only if archive PR was found) + +A second new job, gated on `discover-cross-repo-prs`, checks out the PR head +(possibly from a fork) and builds the image with `dev/django-public.Dockerfile`, +optionally swapping `dandischema` to the schema PR head, then exports it as a +tarball artifact (same trick dandi-archive's own `cli-integration.yml` uses): + +```yaml +build-archive-image: + needs: discover-cross-repo-prs + if: needs.discover-cross-repo-prs.outputs.archive_repo != '' + runs-on: ubuntu-24.04 + steps: + - uses: actions/checkout@v6 + with: + repository: ${{ needs.discover-cross-repo-prs.outputs.archive_repo }} + ref: ${{ needs.discover-cross-repo-prs.outputs.archive_ref }} + fetch-depth: 0 + - name: Patch pyproject for dandi-schema PR + if: needs.discover-cross-repo-prs.outputs.schema_repo != '' + run: | + # Replace the pinned dandischema with a git URL pointing at the PR head. + REPO='${{ needs.discover-cross-repo-prs.outputs.schema_repo }}' + REF='${{ needs.discover-cross-repo-prs.outputs.schema_ref }}' + python - <"` after `pip install .[extras,test]` | +| `dandischema` inside the dandi-archive container | `/opt/django/.venv` populated by `uv sync` | rewrite `pyproject.toml` before `docker build` so `uv` resolves the git URL — same result, different toolchain | +| `dandi-archive` itself | the `dandiarchive/dandiarchive-api` image | replace the Docker Hub pull with a locally built image: `docker build -f dev/django-public.Dockerfile -t dandiarchive/dandiarchive-api .` against the PR's checkout, exported via `docker image save` and shared between jobs as an artifact | + +The "tag-collision trick" — keeping the same `image: dandiarchive/dandiarchive-api` +name in `docker-compose.yml` and just *loading* a same-named image into the +local Docker daemon before compose runs — means **no edits** to the +compose file are required. Compose will use the locally loaded image +because it already exists by that name and pull is disabled. + +## Edge cases / risks + +1. **Forks.** PRs from forks send `secrets.GITHUB_TOKEN` as read-only; we + only need read access to call `gh api` for the resolution step, so this + works. Pulling code from forks via `actions/checkout` with `repository:` + does not need a token for public repos. +2. **Untrusted code execution.** Building an image from an unmerged + dandi-archive PR runs that PR's code in CI. This is identical to the + exposure dandi-archive's own `cli-integration.yml` already accepts. No + new secrets are exposed because we use the default `GITHUB_TOKEN` and + the new job doesn't push images anywhere. +3. **`pull_request_target` is NOT used.** Stick with `pull_request` so PRs + from forks run against their own code, never against `master` with + write secrets. +4. **`uv.lock` drift in dandi-archive.** If dandi-archive ever starts + committing a `uv.lock`, the schema-override patch must regenerate it + (`uv lock`); otherwise `uv sync` will reject the URL spec. The plan + above includes `uv lock || true` defensively. +5. **`DJANGO_DANDI_SCHEMA_VERSION`.** The fixture at + `dandi/tests/fixtures.py:395` injects `DJANGO_DANDI_SCHEMA_VERSION` from + `dandi.dandi_schema_version`; if the schema PR bumps the version, the + new dandi-cli pin must be present (or the dandi-cli PR must include the + bump too). Document this in the PR-marker convention: when overriding + schema, the dandi-cli PR is responsible for any required pin bumps. +6. **Stale matrix.** If a contributor leaves the marker in the body after + the upstream PR merges, CI will keep building from the (now-merged) + branch ref — harmless but wasteful. Mitigation: also accept the marker + `dandi-archive PR: none` to explicitly disable, and delete markers in + reviewer checklists. +7. **macOS / Windows runners.** The cross-repo entry runs `ubuntu-latest` + only — Docker is unavailable on the others, matching today's + `dandi-api` rows. + +## Concrete steps to land this + +1. Add `.github/workflows/run-tests.yml` changes per §1–§3 above. +2. Update PR template (`.github/PULL_REQUEST_TEMPLATE.md` if present, else + create) to mention the optional `dandi-archive PR:` / + `dandi-schema PR:` markers. +3. Verify locally with `act` or by pushing a dummy PR with the marker. +4. Land 1839's body update so it carries + `dandi-archive PR: https://github.com/dandi/dandi-archive/pull/2784`, + confirm the new matrix row goes green (or surfaces real incompatibilities). + +## Out of scope + +- Triggering a *return* run on dandi-archive/dandi-schema PRs from a + dandi-cli PR. dandi-archive's `cli-integration.yml` already covers + release+master; symmetric cross-triggering can be a follow-up. +- Caching the built dandi-archive image across PR pushes (we rebuild each + run for now; the build is ~minutes, acceptable). From 39f51d9a826642472c9e9362ed277209d03421e8 Mon Sep 17 00:00:00 2001 From: Yaroslav Halchenko Date: Thu, 30 Apr 2026 14:56:02 -0400 Subject: [PATCH 16/16] CI [drop-before-merge]: disable existing test/lint/typing/codeql/docs jobs Set `if: false` on the top-level job of `run-tests.yml`, `lint.yml`, `typing.yml`, `codeql.yml`, and `docs.yml` so we don't burn CI minutes on the full PR check suite while iterating on the new `cross-repo-pr-tests.yml` workflow. The cross-repo workflow itself is unaffected. This commit is intended to be dropped before this PR (or its successor) lands on master. Co-Authored-By: Claude Code 2.1.114 / Claude Opus 4.7 (1M context) --- .github/workflows/codeql.yml | 3 +++ .github/workflows/docs.yml | 3 +++ .github/workflows/lint.yml | 3 +++ .github/workflows/run-tests.yml | 4 ++++ .github/workflows/typing.yml | 3 +++ 5 files changed, 16 insertions(+) diff --git a/.github/workflows/codeql.yml b/.github/workflows/codeql.yml index e2d0e80df..71271557e 100644 --- a/.github/workflows/codeql.yml +++ b/.github/workflows/codeql.yml @@ -11,6 +11,9 @@ on: jobs: analyze: name: Analyze + # TODO(remove-before-merge): temporarily disabled to expedite iteration + # on the cross-repo PR matrix workflow. + if: false runs-on: ubuntu-latest permissions: actions: read diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index fff16470b..7ef1f1431 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -8,6 +8,9 @@ on: jobs: docs: + # TODO(remove-before-merge): temporarily disabled to expedite iteration + # on the cross-repo PR matrix workflow. + if: false runs-on: ubuntu-latest strategy: fail-fast: false diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 5b6ad2c62..f52602aed 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -6,6 +6,9 @@ on: jobs: lint: + # TODO(remove-before-merge): temporarily disabled to expedite iteration + # on the cross-repo PR matrix workflow. + if: false runs-on: ubuntu-latest steps: - name: Set up environment diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index 15753f538..2e0db721f 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -19,6 +19,10 @@ defaults: jobs: test: + # TODO(remove-before-merge): temporarily disabled to expedite iteration + # on the cross-repo PR matrix workflow. Drop this `if: false` (and the + # whole commit that introduced it) before merging. + if: false runs-on: ${{ matrix.os }} env: NO_ET: 1 diff --git a/.github/workflows/typing.yml b/.github/workflows/typing.yml index c02787d41..e982b015f 100644 --- a/.github/workflows/typing.yml +++ b/.github/workflows/typing.yml @@ -6,6 +6,9 @@ on: jobs: typing: + # TODO(remove-before-merge): temporarily disabled to expedite iteration + # on the cross-repo PR matrix workflow. + if: false runs-on: ubuntu-latest steps: - name: Check out repository