Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 54 additions & 14 deletions redis/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from .backoff import NoBackoff
from .credentials import CredentialProvider, UsernamePasswordCredentialProvider
from .event import AfterConnectionReleasedEvent, EventDispatcher, OnErrorEvent, OnMaintenanceNotificationEvent, \
AfterConnectionCreatedEvent
AfterConnectionCreatedEvent, AfterConnectionAcquiredEvent, AfterConnectionClosedEvent
from .exceptions import (
AuthenticationError,
AuthenticationWrongNumberOfArgsError,
Expand All @@ -56,6 +56,7 @@
)
from .observability.attributes import AttributeBuilder, DB_CLIENT_CONNECTION_STATE, ConnectionState, \
DB_CLIENT_CONNECTION_POOL_NAME
from .observability.metrics import CloseReason
from .retry import Retry
from .utils import (
CRYPTOGRAPHY_AVAILABLE,
Expand Down Expand Up @@ -194,7 +195,7 @@ def on_connect(self):
pass

@abstractmethod
def disconnect(self, *args):
def disconnect(self, *args, **kwargs):
pass

@abstractmethod
Expand Down Expand Up @@ -385,7 +386,7 @@ def read_response(
pass

@abstractmethod
def disconnect(self, *args):
def disconnect(self, *args, **kwargs):
pass

def _configure_maintenance_notifications(
Expand Down Expand Up @@ -865,7 +866,7 @@ def connect_check_health(
if retry_socket_connect:
sock = self.retry.call_with_retry(
lambda: self._connect(),
lambda error, failure_count: self.disconnect(error, failure_count),
lambda error, failure_count: self.disconnect(error=error, failure_count=failure_count),
with_failure_count=True
)
else:
Expand Down Expand Up @@ -1045,7 +1046,7 @@ def on_connect_check_health(self, check_health: bool = True):
if str_if_bytes(self.read_response()) != "OK":
raise ConnectionError("Invalid Database")

def disconnect(self, *args):
def disconnect(self, *args, **kwargs):
"Disconnects from the Redis server"
self._parser.on_disconnect()

Expand All @@ -1067,17 +1068,39 @@ def disconnect(self, *args):
except OSError:
pass

if len(args) > 0 and isinstance(args[0], Exception):
error = kwargs.get('error')
failure_count = kwargs.get('failure_count')
health_check_failed = kwargs.get('health_check_failed')

if error:
if health_check_failed:
close_reason = CloseReason.HEALTHCHECK_FAILED
else:
close_reason = CloseReason.ERROR

if args[1] <= self.retry.get_retries():
self._event_dispatcher.dispatch(
OnErrorEvent(
error=args[0],
server_address=self.host,
server_port=self.port,
retry_attempts=args[1],
retry_attempts=failure_count,
)
)

self._event_dispatcher.dispatch(
AfterConnectionClosedEvent(
close_reason=close_reason,
error=error,
)
)
else:
self._event_dispatcher.dispatch(
AfterConnectionClosedEvent(
close_reason=CloseReason.APPLICATION_CLOSE
)
)

def mark_for_reconnect(self):
self._should_reconnect = True

Expand All @@ -1095,7 +1118,7 @@ def _send_ping(self):

def _ping_failed(self, error, failure_count):
"""Function to call when PING fails"""
self.disconnect(error, failure_count)
self.disconnect(error=error, failure_count=failure_count, health_check_failed=True)

def check_health(self):
"""Check the health of the connection with a PING/PONG"""
Expand Down Expand Up @@ -1451,10 +1474,10 @@ def connect(self):
def on_connect(self):
self._conn.on_connect()

def disconnect(self, *args):
def disconnect(self, *args, **kwargs):
with self._cache_lock:
self._cache.flush()
self._conn.disconnect(*args)
self._conn.disconnect(*args, **kwargs)

def check_health(self):
self._conn.check_health()
Expand Down Expand Up @@ -2648,6 +2671,8 @@ def _checkpid(self) -> None:
def get_connection(self, command_name=None, *keys, **options) -> "Connection":
"Get a connection from the pool"

# Start timing for observability
start_time_acquired = time.monotonic()
self._checkpid()
is_created = False

Expand All @@ -2656,7 +2681,7 @@ def get_connection(self, command_name=None, *keys, **options) -> "Connection":
connection = self._available_connections.pop()
except IndexError:
# Start timing for observability
start_time = time.monotonic()
start_time_created = time.monotonic()

connection = self.make_connection()
is_created = True
Expand Down Expand Up @@ -2691,9 +2716,16 @@ def get_connection(self, command_name=None, *keys, **options) -> "Connection":
self._event_dispatcher.dispatch(
AfterConnectionCreatedEvent(
connection_pool=self,
duration_seconds=time.monotonic() - start_time,
duration_seconds=time.monotonic() - start_time_created,
)
)

self._event_dispatcher.dispatch(
AfterConnectionAcquiredEvent(
connection_pool=self,
duration_seconds=time.monotonic() - start_time_acquired,
)
)
return connection

def get_encoder(self) -> Encoder:
Expand Down Expand Up @@ -2957,6 +2989,7 @@ def get_connection(self, command_name=None, *keys, **options):
create new connections when we need to, i.e.: the actual number of
connections will only increase in response to demand.
"""
start_time_acquired = time.monotonic()
# Make sure we haven't changed process.
self._checkpid()
is_created = False
Expand All @@ -2979,7 +3012,7 @@ def get_connection(self, command_name=None, *keys, **options):
# a new connection to add to the pool.
if connection is None:
# Start timing for observability
start_time = time.monotonic()
start_time_created = time.monotonic()
connection = self.make_connection()
is_created = True
finally:
Expand Down Expand Up @@ -3014,10 +3047,17 @@ def get_connection(self, command_name=None, *keys, **options):
self._event_dispatcher.dispatch(
AfterConnectionCreatedEvent(
connection_pool=self,
duration_seconds=time.monotonic() - start_time,
duration_seconds=time.monotonic() - start_time_created,
)
)

self._event_dispatcher.dispatch(
AfterConnectionAcquiredEvent(
connection_pool=self,
duration_seconds=time.monotonic() - start_time_acquired,
)
)

return connection

def release(self, connection):
Expand Down
45 changes: 44 additions & 1 deletion redis/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
from redis.observability.attributes import PubSubDirection
from redis.observability.recorder import record_operation_duration, record_error_count, record_maint_notification_count, \
record_connection_create_time, init_connection_count, record_connection_relaxed_timeout, record_connection_handoff, \
record_pubsub_message, record_streaming_lag
record_pubsub_message, record_streaming_lag, record_connection_wait_time, record_connection_use_time, \
record_connection_closed
from redis.utils import str_if_bytes


Expand Down Expand Up @@ -113,6 +114,12 @@ def __init__(
AfterConnectionHandoffEvent: [
ExportConnectionHandoffMetric(),
],
AfterConnectionAcquiredEvent: [
ExportConnectionWaitTimeMetric(),
],
AfterConnectionClosedEvent: [
ExportConnectionClosedMetric(),
],
OnPubSubMessageEvent: [
ExportPubSubMessageMetric(),
],
Expand Down Expand Up @@ -397,6 +404,22 @@ class AfterConnectionHandoffEvent:
"""
connection_pool: "ConnectionPoolInterface"

@dataclass
class AfterConnectionAcquiredEvent:
"""
Event fired after connection is acquired from pool.
"""
connection_pool: "ConnectionPoolInterface"
duration_seconds: float

@dataclass
class AfterConnectionClosedEvent:
"""
Event fired after connection is closed.
"""
close_reason: "CloseReason"
error: Optional[Exception] = None

class AsyncOnCommandsFailEvent(OnCommandsFailEvent):
pass

Expand Down Expand Up @@ -704,3 +727,23 @@ def listen(self, event: OnStreamMessageReceivedEvent):
consumer_group=event.consumer_group,
consumer_name=event.consumer_name,
)

class ExportConnectionWaitTimeMetric(EventListenerInterface):
"""
Listener that exports connection wait time metric.
"""
def listen(self, event: AfterConnectionAcquiredEvent):
record_connection_wait_time(
pool_name=repr(event.connection_pool),
duration_seconds=event.duration_seconds,
)

class ExportConnectionClosedMetric(EventListenerInterface):
"""
Listener that exports connection closed metric.
"""
def listen(self, event: AfterConnectionClosedEvent):
record_connection_closed(
close_reason=event.close_reason,
error_type=event.error,
)
27 changes: 21 additions & 6 deletions redis/observability/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import logging
import time
from enum import Enum
from typing import Any, Dict, Optional, Callable, List

from redis.observability.attributes import AttributeBuilder, ConnectionState, REDIS_CLIENT_CONNECTION_NOTIFICATION, \
Expand All @@ -27,6 +28,22 @@
Meter = None
UpDownCounter = None

class CloseReason(Enum):
"""
Enum representing the reason why a Redis client connection was closed.

Values:
APPLICATION_CLOSE: The connection was closed intentionally by the application
(for example, during normal shutdown or explicit cleanup).
ERROR: The connection was closed due to an unexpected error
(for example, network failure or protocol error).
HEALTHCHECK_FAILED: The connection was closed because a health check
or liveness check for the connection failed.
"""
APPLICATION_CLOSE = "application_close"
ERROR = "error"
HEALTHCHECK_FAILED = "healthcheck_failed"


class RedisMetricsCollector:
"""
Expand Down Expand Up @@ -401,24 +418,22 @@ def record_operation_duration(

def record_connection_closed(
self,
pool_name: str,
close_reason: Optional[str] = None,
close_reason: Optional[CloseReason] = None,
error_type: Optional[Exception] = None,
) -> None:
"""
Record a connection closed event.

Args:
pool_name: Connection pool name
close_reason: Reason for closing (e.g., 'idle_timeout', 'error', 'shutdown')
close_reason: Reason for closing (e.g. 'error', 'application_close')
error_type: Error type if closed due to error
"""
if not hasattr(self, "connection_closed"):
return

attrs = self.attr_builder.build_connection_attributes(pool_name=pool_name)
attrs = self.attr_builder.build_connection_attributes()
if close_reason:
attrs[REDIS_CLIENT_CONNECTION_CLOSE_REASON] = close_reason
attrs[REDIS_CLIENT_CONNECTION_CLOSE_REASON] = close_reason.value

attrs.update(
self.attr_builder.build_error_attributes(
Expand Down
9 changes: 3 additions & 6 deletions redis/observability/recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from typing import Optional, Callable

from redis.observability.attributes import PubSubDirection, ConnectionState
from redis.observability.metrics import RedisMetricsCollector
from redis.observability.metrics import RedisMetricsCollector, CloseReason
from redis.observability.providers import get_observability_instance

# Global metrics collector instance (lazy-initialized)
Expand Down Expand Up @@ -251,16 +251,14 @@ def record_connection_use_time(


def record_connection_closed(
pool_name: str,
close_reason: Optional[str] = None,
close_reason: Optional[CloseReason] = None,
error_type: Optional[Exception] = None,
) -> None:
"""
Record a connection closed event.

Args:
pool_name: Connection pool identifier
close_reason: Reason for closing (e.g., 'idle_timeout', 'error', 'shutdown')
close_reason: Reason for closing (e.g. 'error', 'application_close')
error_type: Error type if closed due to error

Example:
Expand All @@ -275,7 +273,6 @@ def record_connection_closed(

# try:
_metrics_collector.record_connection_closed(
pool_name=pool_name,
close_reason=close_reason,
error_type=error_type,
)
Expand Down
Loading