Skip to content
6 changes: 3 additions & 3 deletions dev_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,6 @@ numpy>=1.24.0,<2.0 ; platform_python_implementation == "PyPy"
redis-entraid==1.0.0
pybreaker>=1.4.0

opentelemetry-api>=1.18.0
opentelemetry-sdk>=1.18.0
opentelemetry-exporter-otlp>=1.18.0
opentelemetry-api>=1.30.0
opentelemetry-sdk>=1.30.0
opentelemetry-exporter-otlp>=1.30.0
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ circuit_breaker = [
"pybreaker>=1.4.0"
]
otel = [
"opentelemetry-api>=1.18.0",
"opentelemetry-sdk>=1.18.0",
"opentelemetry-exporter-otlp-http>=1.18.0",
"opentelemetry-api>=1.30.0",
"opentelemetry-sdk>=1.30.0",
"opentelemetry-exporter-otlp-proto-http>=1.30.0",
]

[project.urls]
Expand Down
21 changes: 0 additions & 21 deletions redis/asyncio/observability/recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,27 +253,6 @@ async def record_connection_wait_time(
)


async def record_connection_use_time(
pool_name: str,
duration_seconds: float,
) -> None:
"""
Record time a connection was in use (borrowed from pool).

Args:
pool_name: Connection pool identifier
duration_seconds: Use time in seconds
"""
collector = await _get_or_create_collector()
if collector is None:
return

collector.record_connection_use_time(
pool_name=pool_name,
duration_seconds=duration_seconds,
)


async def record_connection_closed(
close_reason: Optional[CloseReason] = None,
error_type: Optional[Exception] = None,
Expand Down
29 changes: 22 additions & 7 deletions redis/observability/config.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import os
from typing import Dict, List, Optional, Union
from typing import Dict, List, Optional, Union, Sequence
from enum import IntFlag, auto

"""
OpenTelemetry configuration for redis-py.

This module handles configuration for OTel observability features,
including parsing environment variables and validating settings.
"""

class MetricGroup(IntFlag):
"""Metric groups that can be enabled/disabled."""
Expand All @@ -17,13 +23,11 @@ class TelemetryOption(IntFlag):
"""Telemetry options to export."""
METRICS = auto()

def default_operation_duration_buckets() -> Sequence[float]:
return [0.0001, 0.00025, 0.0005, 0.001, 0.0025, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5]

"""
OpenTelemetry configuration for redis-py.

This module handles configuration for OTel observability features,
including parsing environment variables and validating settings.
"""
def default_histogram_buckets() -> Sequence[float]:
return [0.0001, 0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5, 10]


class OTelConfig:
Expand Down Expand Up @@ -90,6 +94,11 @@ def __init__(
# Privacy controls
hide_pubsub_channel_names: bool = False,
hide_stream_names: bool = False,
# Bucket sizes
buckets_operation_duration: Sequence[float] = default_operation_duration_buckets(),
buckets_stream_processing_duration: Sequence[float] = default_histogram_buckets(),
buckets_connection_create_time: Sequence[float] = default_histogram_buckets(),
buckets_connection_wait_time: Sequence[float] = default_histogram_buckets(),
):
# Core enablement
if enabled_telemetry is None:
Expand All @@ -115,6 +124,12 @@ def __init__(
self.hide_pubsub_channel_names = hide_pubsub_channel_names
self.hide_stream_names = hide_stream_names

# Bucket sizes
self.buckets_operation_duration = buckets_operation_duration
self.buckets_stream_processing_duration = buckets_stream_processing_duration
self.buckets_connection_create_time = buckets_connection_create_time
self.buckets_connection_wait_time = buckets_connection_wait_time

def is_enabled(self) -> bool:
"""Check if any observability feature is enabled."""
return bool(self.enabled_telemetry)
Expand Down
30 changes: 5 additions & 25 deletions redis/observability/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ def _init_connection_basic_metrics(self) -> None:
name="db.client.connection.create_time",
unit="{seconds}",
description="Time to create a new connection",
explicit_bucket_boundaries_advisory=self.config.buckets_connection_create_time,
)

self.connection_relaxed_timeout = self.meter.create_up_down_counter(
Expand All @@ -144,12 +145,7 @@ def _init_connection_advanced_metrics(self) -> None:
name="db.client.connection.wait_time",
unit="{seconds}",
description="Time to obtain an open connection from the pool",
)

self.connection_use_time = self.meter.create_histogram(
name="db.client.connection.use_time",
unit="{seconds}",
description="Time between borrowing and returning a connection",
explicit_bucket_boundaries_advisory=self.config.buckets_connection_wait_time,
)

self.connection_closed = self.meter.create_counter(
Expand All @@ -165,6 +161,7 @@ def _init_command_metrics(self) -> None:
name="db.client.operation.duration",
unit="{seconds}",
description="Command execution duration",
explicit_bucket_boundaries_advisory=self.config.buckets_operation_duration,
)

def _init_pubsub_metrics(self) -> None:
Expand All @@ -180,7 +177,8 @@ def _init_streaming_metrics(self) -> None:
self.stream_lag = self.meter.create_histogram(
name="redis.client.stream.lag",
unit="{seconds}",
description="End-to-end lag per message, showing how stale are the messages when the application starts processing them."
description="End-to-end lag per message, showing how stale are the messages when the application starts processing them.",
explicit_bucket_boundaries_advisory=self.config.buckets_stream_processing_duration,
)

def _init_csc_metrics(self) -> None:
Expand Down Expand Up @@ -378,24 +376,6 @@ def record_connection_wait_time(
attrs = self.attr_builder.build_connection_attributes(pool_name=pool_name)
self.connection_wait_time.record(duration_seconds, attributes=attrs)

def record_connection_use_time(
self,
pool_name: str,
duration_seconds: float,
) -> None:
"""
Record time a connection was in use (borrowed from pool).

Args:
pool_name: Connection pool name
duration_seconds: Use time in seconds
"""
if not hasattr(self, "connection_use_time"):
return

attrs = self.attr_builder.build_connection_attributes(pool_name=pool_name)
self.connection_use_time.record(duration_seconds, attributes=attrs)

# Command execution metric recording methods

def record_operation_duration(
Expand Down
33 changes: 0 additions & 33 deletions redis/observability/recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,39 +247,6 @@ def record_connection_wait_time(
# except Exception:
# pass


def record_connection_use_time(
pool_name: str,
duration_seconds: float,
) -> None:
"""
Record time a connection was in use (borrowed from pool).

Args:
pool_name: Connection pool identifier
duration_seconds: Use time in seconds

Example:
>>> start = time.monotonic()
>>> # ... use connection ...
>>> record_connection_use_time('ConnectionPool<localhost:6379>', time.monotonic() - start)
"""
global _metrics_collector

if _metrics_collector is None:
_metrics_collector = _get_or_create_collector()
if _metrics_collector is None:
return

# try:
_metrics_collector.record_connection_use_time(
pool_name=pool_name,
duration_seconds=duration_seconds,
)
# except Exception:
# pass


def record_connection_closed(
close_reason: Optional[CloseReason] = None,
error_type: Optional[Exception] = None,
Expand Down
22 changes: 0 additions & 22 deletions tests/test_asyncio/test_observability/test_recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,27 +274,6 @@ async def test_record_connection_wait_time(self, setup_async_recorder):
assert attrs[DB_CLIENT_CONNECTION_POOL_NAME] == "test_pool"


@pytest.mark.asyncio
class TestRecordConnectionUseTime:
"""Tests for record_connection_use_time - verifies Histogram.record() calls."""

async def test_record_connection_use_time(self, setup_async_recorder):
"""Test that connection use time is recorded correctly."""
instruments = setup_async_recorder

await recorder.record_connection_use_time(
pool_name="test_pool",
duration_seconds=0.100,
)

instruments.connection_use_time.record.assert_called_once()
call_args = instruments.connection_use_time.record.call_args

assert call_args[0][0] == 0.100
attrs = call_args[1]["attributes"]
assert attrs[DB_CLIENT_CONNECTION_POOL_NAME] == "test_pool"


@pytest.mark.asyncio
class TestRecordConnectionClosed:
"""Tests for record_connection_closed - verifies Counter.add() calls."""
Expand Down Expand Up @@ -818,7 +797,6 @@ async def test_all_record_functions_safe_when_disabled(self):
await recorder.record_connection_create_time(mock_pool, 0.1)
await recorder.record_connection_timeout("pool")
await recorder.record_connection_wait_time("pool", 0.1)
await recorder.record_connection_use_time("pool", 0.1)
await recorder.record_connection_closed()
await recorder.record_connection_relaxed_timeout("pool", "MOVING", True)
await recorder.record_connection_handoff("pool")
Expand Down
Loading