Skip to content

feat(audio): audio file and fragment with streaming #1200

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

shcheklein
Copy link
Member

@shcheklein shcheklein commented Jul 4, 2025

Add Audio and AudioFragment support, to make code like below work with full streaming (no file download happening):

from collections.abc import Iterator

import torch
from transformers import Pipeline, pipeline

import datachain as dc
from datachain import AudioFile, AudioFragment


def fragments(file: AudioFile) -> Iterator[AudioFragment]:
    yield file.get_fragment(300, 400)


def process(fragment: AudioFragment, pipeline: Pipeline) -> str:
    """Process audio fragment using direct numpy conversion."""
    audio_array, sample_rate = fragment.get_np()

    # Convert to mono if stereo (average the channels)
    if len(audio_array.shape) > 1 and audio_array.shape[1] > 1:
        audio_array = audio_array.mean(axis=1)

    # Pass the numpy array with exact sampling rate from fragment
    result = pipeline({
        "array": audio_array,
        "sampling_rate": sample_rate
    })
    return str(result)

(
    dc
    .read_storage("s3://ivan-test-versioned/sounds", type="audio")
    .settings(cache=False, prefetch=False)
    .gen(fragment=fragments)
    .setup(pipeline=lambda: pipeline(
        "audio-classification",
        model="superb/wav2vec2-base-superb-ks",
        torch_dtype=torch.float32,
        device="cpu"
    ))
    .map(type=process)
    .show()
)

TODO:

  • Add tests
  • Add example like above
  • Add FE support for AudioFragment model to show preview it from the middle point cc @yathomasi

Copy link
Contributor

sourcery-ai bot commented Jul 4, 2025

Reviewer's Guide

Implements end-to-end streaming support for audio files by extending the core File model with AudioFile and related DataModels, integrating torchaudio-based utilities, and ensuring seamless streaming within UDF pipelines.

Entity relationship diagram for audio data models

erDiagram
    AUDIOFILE ||--o{ AUDIOSEGMENT : contains
    AUDIOFILE ||--o{ AUDIOFRAGMENT : contains
    AUDIOFILE }|--|| AUDIO : has
    AUDIOSEGMENT }|--|| AUDIOFILE : references
    AUDIOFRAGMENT }|--|| AUDIOFILE : references
    AUDIO {
        int sample_rate
        int channels
        float duration
        int samples
        string format
        string codec
        int bit_rate
    }
Loading

Class diagram for new and updated audio-related types

classDiagram
    class File {
        +as_audio_file() AudioFile
    }
    class AudioFile {
        +get_info() Audio
        +get_segment(start: float, duration: float) AudioSegment
        +get_fragment(start: float, end: float) AudioFragment
        +get_fragments(duration: float, start: float, end: float) Iterator~AudioFragment~
    }
    class AudioSegment {
        +audio: AudioFile
        +start: float
        +end: float
        +get_np() tuple[ndarray, int]
        +read_bytes(format: str) bytes
    }
    class AudioFragment {
        +audio: AudioFile
        +start: float
        +end: float
        +get_np() tuple[ndarray, int]
        +read_bytes(format: str) bytes
        +save(output: str, format: str) AudioFile
    }
    class Audio {
        +sample_rate: int
        +channels: int
        +duration: float
        +samples: int
        +format: str
        +codec: str
        +bit_rate: int
    }
    File <|-- AudioFile
    AudioFile o-- AudioSegment : contains
    AudioFile o-- AudioFragment : contains
    AudioSegment o-- AudioFile : references
    AudioFragment o-- AudioFile : references
Loading

File-Level Changes

Change Details Files
Extend core File abstractions to support audio types and streaming
  • Add "audio" to FileType literal and get_file_type factory
  • Implement as_audio_file conversion method on File
  • Expose AudioFile and AudioFragment in package init
src/datachain/lib/file.py
src/datachain/__init__.py
Introduce AudioFile, AudioSegment, AudioFragment, and Audio metadata models
  • Define AudioFile with get_info, get_segment, get_fragment, get_fragments methods
  • Create AudioSegment and AudioFragment models with get_np, read_bytes, and save APIs
  • Add Audio DataModel for storing sample rate, channels, duration, format, codec, and bit rate
src/datachain/lib/file.py
Add torchaudio-based utilities for audio metadata and segment extraction
  • Implement audio_info to extract metadata via torchaudio.info
  • Implement audio_segment_np and audio_segment_bytes for numpy and byte extraction
  • Implement save_audio_fragment to write and upload fragment using soundfile
src/datachain/lib/audio.py
Enhance UDF handling to propagate streaming on nested File objects
  • Replace direct File check with _set_stream_recursive helper
  • Traverse DataModel fields recursively to set catalog stream on nested File
src/datachain/lib/udf.py
Update project configuration to include audio dependencies
  • Add torchaudio under new "audio" extras
  • Include "audio" extra in test dependencies
pyproject.toml

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link

Deploying datachain-documentation with  Cloudflare Pages  Cloudflare Pages

Latest commit: 2ad01e2
Status: ✅  Deploy successful!
Preview URL: https://54a0aae4.datachain-documentation.pages.dev
Branch Preview URL: https://audio-fragments-decoder.datachain-documentation.pages.dev

View logs

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @shcheklein - I've reviewed your changes and they look great!

Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments

### Comment 1
<location> `src/datachain/lib/file.py:1058` </location>
<code_context>
+        duration = self.end - self.start
+        return audio_segment_bytes(self.audio, self.start, duration, format)
+
+    def save(self, output: str, format: Optional[str] = None) -> "AudioFile":
+        """
+        Saves the audio fragment as a new audio file.
</code_context>

<issue_to_address>
AudioFragment.save may overwrite files without warning.

Consider adding a check or warning if the output file already exists to prevent accidental overwrites.
</issue_to_address>

### Comment 2
<location> `src/datachain/lib/audio.py:152` </location>
<code_context>
+    import soundfile as sf
+
+    buffer = io.BytesIO()
+    sf.write(buffer, y, sr, format=format)
+    return buffer.getvalue()
+
</code_context>

<issue_to_address>
audio_segment_bytes assumes soundfile supports all requested formats.

Validate the format argument before calling sf.write, or catch exceptions to provide a clearer error message if the format is unsupported.
</issue_to_address>

<suggested_fix>
<<<<<<< SEARCH
    import soundfile as sf

    buffer = io.BytesIO()
    sf.write(buffer, y, sr, format=format)
    return buffer.getvalue()
=======
    import soundfile as sf

    buffer = io.BytesIO()

    # Validate format before writing
    supported_formats = set(sf.available_formats().keys())
    if format.upper() not in supported_formats:
        raise ValueError(f"Unsupported audio format '{format}'. Supported formats: {', '.join(sorted(supported_formats))}")

    try:
        sf.write(buffer, y, sr, format=format)
    except RuntimeError as e:
        raise ValueError(f"Failed to write audio with format '{format}': {e}")

    return buffer.getvalue()
>>>>>>> REPLACE

</suggested_fix>

### Comment 3
<location> `src/datachain/lib/audio.py:182` </location>
<code_context>
+    if start < 0 or end < 0 or start >= end:
+        raise ValueError(f"Invalid time range: ({start:.3f}, {end:.3f})")
+
+    if format is None:
+        format = audio.get_file_ext()
+
</code_context>

<issue_to_address>
save_audio_fragment infers format from file extension, which may be unreliable.

Consider adding validation or fallback logic to handle cases where the file extension is missing, non-standard, or does not match the actual audio format.
</issue_to_address>

<suggested_fix>
<<<<<<< SEARCH
    if format is None:
        format = audio.get_file_ext()
=======
    if format is None:
        format = audio.get_file_ext()
        # Validate the inferred format
        valid_formats = {"wav", "mp3", "flac", "ogg", "aac", "m4a"}
        if not format or format.lower() not in valid_formats:
            # Fallback to default format if extension is missing or non-standard
            import warnings
            warnings.warn(
                f"Could not reliably infer audio format from file extension '{format}'. "
                "Falling back to default format 'wav'."
            )
            format = "wav"
>>>>>>> REPLACE

</suggested_fix>

### Comment 4
<location> `src/datachain/lib/udf.py:277` </location>
<code_context>
+            self._set_stream_recursive(obj, catalog, cache, download_cb)
         return obj_row

+    def _set_stream_recursive(
+        self, obj: Any, catalog: "Catalog", cache: bool, download_cb: Callback
+    ) -> None:
+        """Recursively set the catalog stream on all File objects within an object."""
+        if isinstance(obj, File):
+            obj._set_stream(catalog, caching_enabled=cache, download_cb=download_cb)
+
+        # Check all fields for nested File objects, but only for DataModel objects
+        if isinstance(obj, DataModel):
+            for field_name in obj.model_fields:
+                field_value = getattr(obj, field_name, None)
+                if isinstance(field_value, DataModel):
+                    self._set_stream_recursive(field_value, catalog, cache, download_cb)
+
</code_context>

<issue_to_address>
The recursive stream setter does not handle lists or dicts of DataModels.

Please update the recursion logic to handle lists and dictionaries containing DataModels or Files, so all nested File objects are initialized correctly.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

duration = self.end - self.start
return audio_segment_bytes(self.audio, self.start, duration, format)

def save(self, output: str, format: Optional[str] = None) -> "AudioFile":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): AudioFragment.save may overwrite files without warning.

Consider adding a check or warning if the output file already exists to prevent accidental overwrites.

Comment on lines +149 to +153
import soundfile as sf

buffer = io.BytesIO()
sf.write(buffer, y, sr, format=format)
return buffer.getvalue()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): audio_segment_bytes assumes soundfile supports all requested formats.

Validate the format argument before calling sf.write, or catch exceptions to provide a clearer error message if the format is unsupported.

Suggested change
import soundfile as sf
buffer = io.BytesIO()
sf.write(buffer, y, sr, format=format)
return buffer.getvalue()
import soundfile as sf
buffer = io.BytesIO()
# Validate format before writing
supported_formats = set(sf.available_formats().keys())
if format.upper() not in supported_formats:
raise ValueError(f"Unsupported audio format '{format}'. Supported formats: {', '.join(sorted(supported_formats))}")
try:
sf.write(buffer, y, sr, format=format)
except RuntimeError as e:
raise ValueError(f"Failed to write audio with format '{format}': {e}")
return buffer.getvalue()

Comment on lines +182 to +183
if format is None:
format = audio.get_file_ext()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): save_audio_fragment infers format from file extension, which may be unreliable.

Consider adding validation or fallback logic to handle cases where the file extension is missing, non-standard, or does not match the actual audio format.

Suggested change
if format is None:
format = audio.get_file_ext()
if format is None:
format = audio.get_file_ext()
# Validate the inferred format
valid_formats = {"wav", "mp3", "flac", "ogg", "aac", "m4a"}
if not format or format.lower() not in valid_formats:
# Fallback to default format if extension is missing or non-standard
import warnings
warnings.warn(
f"Could not reliably infer audio format from file extension '{format}'. "
"Falling back to default format 'wav'."
)
format = "wav"

Comment on lines +277 to +286
def _set_stream_recursive(
self, obj: Any, catalog: "Catalog", cache: bool, download_cb: Callback
) -> None:
"""Recursively set the catalog stream on all File objects within an object."""
if isinstance(obj, File):
obj._set_stream(catalog, caching_enabled=cache, download_cb=download_cb)

# Check all fields for nested File objects, but only for DataModel objects
if isinstance(obj, DataModel):
for field_name in obj.model_fields:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: The recursive stream setter does not handle lists or dicts of DataModels.

Please update the recursion logic to handle lists and dictionaries containing DataModels or Files, so all nested File objects are initialized correctly.


try:
with audio.open() as f:
info = torchaudio.info(f)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (code-quality): We've found these issues:

raise ValueError(f"Invalid time range: ({start:.3f}, {end:.3f})")

if format is None:
format = audio.get_file_ext()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (code-quality): Don't assign to builtin variable format (avoid-builtin-shadow)


ExplanationPython has a number of builtin variables: functions and constants that
form a part of the language, such as list, getattr, and type
(See https://docs.python.org/3/library/functions.html).
It is valid, in the language, to re-bind such variables:

list = [1, 2, 3]

However, this is considered poor practice.

  • It will confuse other developers.
  • It will confuse syntax highlighters and linters.
  • It means you can no longer use that builtin for its original purpose.

How can you solve this?

Rename the variable something more specific, such as integers.
In a pinch, my_list and similar names are colloquially-recognized
placeholders.

Copy link

codecov bot commented Jul 4, 2025

Codecov Report

Attention: Patch coverage is 23.84106% with 115 lines in your changes missing coverage. Please review.

Project coverage is 87.97%. Comparing base (dbd2c65) to head (2ad01e2).

Files with missing lines Patch % Lines
src/datachain/lib/audio.py 0.00% 68 Missing ⚠️
src/datachain/lib/file.py 38.35% 44 Missing and 1 partial ⚠️
src/datachain/lib/udf.py 80.00% 1 Missing and 1 partial ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #1200      +/-   ##
==========================================
- Coverage   88.71%   87.97%   -0.75%     
==========================================
  Files         152      153       +1     
  Lines       13557    13704     +147     
  Branches     1884     1904      +20     
==========================================
+ Hits        12027    12056      +29     
- Misses       1088     1203     +115     
- Partials      442      445       +3     
Flag Coverage Δ
datachain 87.90% <23.84%> (-0.74%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
src/datachain/__init__.py 100.00% <ø> (ø)
src/datachain/lib/udf.py 92.77% <80.00%> (-0.62%) ⬇️
src/datachain/lib/file.py 83.52% <38.35%> (-7.27%) ⬇️
src/datachain/lib/audio.py 0.00% <0.00%> (ø)

... and 1 file with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant