Skip to content

Replace pydub as dependency and use multithreaded resampling (exports 20x faster) #8

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
*.zip
*.prof
*.mp3
*.wav
*.nbs

# ffmpeg binaries
*.exe

Expand Down
3 changes: 2 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@
"editor.codeActionsOnSave": {
"source.organizeImports": "explicit"
}
}
},
"python.analysis.typeCheckingMode": "basic"
}
229 changes: 167 additions & 62 deletions nbswave/audio.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,154 @@
import math
from typing import Dict, Optional
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from typing import Any, Dict, Iterable, Optional, Sequence

import numpy as np
from pydub import AudioSegment
import samplerate as sr
import soundfile as sf


def load_sound(path: str) -> AudioSegment:
return AudioSegment.from_file(path)
def key_to_pitch(key: int) -> float:
return 2 ** ((key) / 12)


def sync(
sound: AudioSegment,
channels: int = 2,
frame_rate: int = 44100,
sample_width: int = 2,
) -> AudioSegment:
return (
sound.set_channels(channels)
.set_frame_rate(frame_rate)
.set_sample_width(sample_width)
)
def vol_to_gain(vol: float) -> float:
if vol == 0:
return -float("inf")
return math.log(vol, 10) * 20


def change_speed(sound: AudioSegment, speed: float = 1.0) -> AudioSegment:
if speed == 1.0:
return sound
def gain_to_vol(gain: float) -> float:
return 10 ** (gain / 20)

new = sound._spawn(
sound.raw_data, overrides={"frame_rate": round(sound.frame_rate * speed)}
)
return new.set_frame_rate(sound.frame_rate)

def panning_to_vol(panning: float) -> tuple[float, float]:
# Simplified panning algorithm from pydub to operate on numpy arrays
# https://github.com/jiaaro/pydub/blob/0c26b10619ee6e31c2b0ae26a8e99f461f694e5f/pydub/effects.py#L284

def key_to_pitch(key: int) -> float:
return 2 ** ((key) / 12)
max_boost_db = gain_to_vol(2.0)
boost_db = abs(panning) * max_boost_db

boost_factor = gain_to_vol(boost_db)
reduce_factor = gain_to_vol(max_boost_db) - boost_factor

def vol_to_gain(vol: float) -> float:
return math.log(max(vol, 0.0001), 10) * 20
boost_factor /= 2.0

if panning < 0:
return boost_factor, reduce_factor
else:
return reduce_factor, boost_factor


@dataclass
class OverlayOperation:
position: int
volume: float
panning: float


class AudioSegment:
# Largely inspired by pydub.AudioSegment:
# https://github.com/jiaaro/pydub/blob/v0.25.1/pydub/audio_segment.py

def __init__(
self, data: np.ndarray, frame_rate: int, sample_width: int, channels: int
):
self.data = data
self.frame_rate = frame_rate
self.sample_width = sample_width
self.channels = channels

def _spawn(self, data: np.ndarray, overrides: Dict[str, int]):
metadata = {
"sample_width": self.sample_width,
"frame_rate": self.frame_rate,
"channels": self.channels,
}
metadata.update(overrides)
return self.__class__(data=data.copy(), **metadata)

def set_sample_width(self, sample_width: int):
if sample_width == self.sample_width:
return self

new_data = self.data.astype(f"int{sample_width * 8}")
return self._spawn(new_data, {"sample_width": sample_width})

def set_frame_rate(self, frame_rate: int):
if frame_rate == self.frame_rate:
return self

ratio = frame_rate / self.frame_rate
# https://libsndfile.github.io/libsamplerate/api_misc.html#converters
new_data = sr.resample(self.data, ratio, "sinc_best")
return self._spawn(new_data, {"frame_rate": frame_rate})

def set_channels(self, channels: int):
if channels == self.channels:
return self

if channels == 1 and self.channels == 2:
new_data = np.mean(self.data, axis=1)
elif channels == 2 and self.channels == 1:
new_data = np.repeat(self.data, 2, axis=1)
else:
raise ValueError("Unsupported channel conversion")

return self._spawn(new_data, {"channels": channels})

@property
def duration_seconds(self):
return len(self.data) / (self.frame_rate * self.channels)

@property
def raw_data(self):
return self.data

def __len__(self):
return round(self.duration_seconds * 1000)

def set_speed(
self, speed: float = 1.0, frame_rate: int | None = None
) -> "AudioSegment":
if frame_rate is not None and frame_rate != self.frame_rate:
speed *= self.frame_rate / frame_rate

if speed == 1.0:
return self

new = self._spawn(
self.raw_data, overrides={"frame_rate": round(self.frame_rate * speed)}
)
return new.set_frame_rate(self.frame_rate)

def set_volume(self, volume: float) -> "AudioSegment":
return self._spawn(self.raw_data * volume, {})

def apply_volume_stereo(self, left_vol: float, right_vol: float) -> "AudioSegment":
left = self.data[:, 0] * left_vol
right = self.data[:, 1] * right_vol

return self._spawn(np.stack([left, right], axis=1), {})

def set_panning(self, panning: float) -> "AudioSegment":
# Simplified panning algorithm from pydub to operate on numpy arrays
# https://github.com/jiaaro/pydub/blob/0c26b10619ee6e31c2b0ae26a8e99f461f694e5f/pydub/effects.py#L284

left_vol, right_vol = panning_to_vol(panning)
return self.apply_volume_stereo(left_vol, right_vol)


def load_sound(path: str) -> AudioSegment:
data, sample_rate = sf.read(path, dtype="float32", always_2d=True)
channels = data.shape[1]

# TODO: remove channel count coercion
if channels == 1:
data = np.repeat(data, 2, axis=1)

return AudioSegment(data, sample_rate, 2, channels)


class Mixer:
Expand All @@ -47,68 +158,69 @@ def __init__(
frame_rate: int = 44100,
channels: int = 2,
length: float = 0,
max_workers: int = 8,
):
self.sample_width = sample_width
self.frame_rate = frame_rate
self.channels = channels
self.output = np.zeros(self._get_array_size(length), dtype="int32")
self.output = np.zeros(
(self._get_array_size(length), self.channels), dtype="float32"
)
self.max_workers = max_workers

def _get_array_size(self, length_in_ms: float) -> int:
frame_count = length_in_ms * (self.frame_rate / 1000.0)
array_size = frame_count * self.channels
array_size_aligned = self._get_aligned_array_size(array_size)
return array_size_aligned

def _get_aligned_array_size(self, length: int):
"""Pads an array length to the appropriate data format."""
align = self.sample_width * self.channels
length_aligned = math.ceil(length / align) * align
return length_aligned
return int(frame_count)

def overlay(self, sound, position=0):
sound_sync = self._sync(sound)
samples = np.frombuffer(sound_sync.get_array_of_samples(), dtype="int16")
def overlay(self, sound: AudioSegment, position: int = 0):
samples = sound.raw_data

frame_offset = int(self.frame_rate * position / 1000.0)
sample_offset = frame_offset * self.channels

start = sample_offset
start = frame_offset
end = start + len(samples)

output_size = len(self.output)
if end > output_size:
pad_length = self._get_aligned_array_size(end - output_size)
pad_length = self._get_array_size(end - output_size)
self.output = np.pad(
self.output, pad_width=(0, pad_length), mode="constant"
self.output, ((0, pad_length), (0, 0)), mode="constant"
)
print(f"Padded from {output_size} to {end} (added {pad_length} entries)")

self.output[start:end] += samples

return self

def _sync(self, segment: AudioSegment):
return (
segment.set_sample_width(self.sample_width)
.set_frame_rate(self.frame_rate)
.set_channels(self.channels)
)
def batch_resample(self, tasks: Iterable[tuple[AudioSegment, float, Any]]):
"""Resample multiple AudioSegments in parallel using ThreadPoolExecutor."""

def set_speed_with_context(segment: AudioSegment, speed: float, context: Any):
return segment.set_speed(speed, self.frame_rate), context

with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = [
executor.submit(set_speed_with_context, segment, speed, context)
for segment, speed, context in tasks
]
for future in as_completed(futures):
yield future.result()

def __len__(self):
return len(self.output) / ((self.frame_rate / 1000.0) * self.channels)

def append(self, sound):
def append(self, sound: AudioSegment):
self.overlay(sound, position=len(self))

def to_audio_segment(self):
peak = np.abs(self.output).max()
clipping_factor = peak / (2**15 - 1)
clipping_factor = peak / 1.0

if clipping_factor > 1:
print(
f"The output is clipping by {clipping_factor:.2f}x. Normalizing to 0dBFS"
)
normalized_signal = np.rint(self.output / clipping_factor).astype("int16")
normalized_signal = self.output / clipping_factor
else:
normalized_signal = self.output

Expand All @@ -132,7 +244,7 @@ def __init__(self, *args, **kwargs):
@classmethod
def from_audio_segment(cls, segment: AudioSegment):
return cls(
segment.get_array_of_samples(),
segment.raw_data,
sample_width=segment.sample_width,
frame_rate=segment.frame_rate,
channels=segment.channels,
Expand All @@ -158,13 +270,6 @@ def save(
else:
bitrate = target_bitrate

output_segment = sync(self, channels, frame_rate, sample_width)

outfile = output_segment.export(
filename,
format=format,
bitrate="{}k".format(bitrate),
tags=tags,
)
output_segment = self # sync(self, channels, frame_rate, sample_width)

outfile.close()
sf.write(filename, output_segment.raw_data, samplerate=frame_rate)
Loading