Skip to content

Commit 9dc37a8

Browse files
Added export of connection advanced metrics (#3910)
* Added export of connection advanced metrics * Update tests/test_connection_pool.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update redis/observability/metrics.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Refactored kwargs instead of args --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent 26a11c2 commit 9dc37a8

File tree

7 files changed

+472
-38
lines changed

7 files changed

+472
-38
lines changed

redis/connection.py

Lines changed: 54 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
from .backoff import NoBackoff
3737
from .credentials import CredentialProvider, UsernamePasswordCredentialProvider
3838
from .event import AfterConnectionReleasedEvent, EventDispatcher, OnErrorEvent, OnMaintenanceNotificationEvent, \
39-
AfterConnectionCreatedEvent
39+
AfterConnectionCreatedEvent, AfterConnectionAcquiredEvent, AfterConnectionClosedEvent
4040
from .exceptions import (
4141
AuthenticationError,
4242
AuthenticationWrongNumberOfArgsError,
@@ -56,6 +56,7 @@
5656
)
5757
from .observability.attributes import AttributeBuilder, DB_CLIENT_CONNECTION_STATE, ConnectionState, \
5858
DB_CLIENT_CONNECTION_POOL_NAME
59+
from .observability.metrics import CloseReason
5960
from .retry import Retry
6061
from .utils import (
6162
CRYPTOGRAPHY_AVAILABLE,
@@ -194,7 +195,7 @@ def on_connect(self):
194195
pass
195196

196197
@abstractmethod
197-
def disconnect(self, *args):
198+
def disconnect(self, *args, **kwargs):
198199
pass
199200

200201
@abstractmethod
@@ -385,7 +386,7 @@ def read_response(
385386
pass
386387

387388
@abstractmethod
388-
def disconnect(self, *args):
389+
def disconnect(self, *args, **kwargs):
389390
pass
390391

391392
def _configure_maintenance_notifications(
@@ -865,7 +866,7 @@ def connect_check_health(
865866
if retry_socket_connect:
866867
sock = self.retry.call_with_retry(
867868
lambda: self._connect(),
868-
lambda error, failure_count: self.disconnect(error, failure_count),
869+
lambda error, failure_count: self.disconnect(error=error, failure_count=failure_count),
869870
with_failure_count=True
870871
)
871872
else:
@@ -1045,7 +1046,7 @@ def on_connect_check_health(self, check_health: bool = True):
10451046
if str_if_bytes(self.read_response()) != "OK":
10461047
raise ConnectionError("Invalid Database")
10471048

1048-
def disconnect(self, *args):
1049+
def disconnect(self, *args, **kwargs):
10491050
"Disconnects from the Redis server"
10501051
self._parser.on_disconnect()
10511052

@@ -1067,17 +1068,39 @@ def disconnect(self, *args):
10671068
except OSError:
10681069
pass
10691070

1070-
if len(args) > 0 and isinstance(args[0], Exception):
1071+
error = kwargs.get('error')
1072+
failure_count = kwargs.get('failure_count')
1073+
health_check_failed = kwargs.get('health_check_failed')
1074+
1075+
if error:
1076+
if health_check_failed:
1077+
close_reason = CloseReason.HEALTHCHECK_FAILED
1078+
else:
1079+
close_reason = CloseReason.ERROR
1080+
10711081
if args[1] <= self.retry.get_retries():
10721082
self._event_dispatcher.dispatch(
10731083
OnErrorEvent(
10741084
error=args[0],
10751085
server_address=self.host,
10761086
server_port=self.port,
1077-
retry_attempts=args[1],
1087+
retry_attempts=failure_count,
10781088
)
10791089
)
10801090

1091+
self._event_dispatcher.dispatch(
1092+
AfterConnectionClosedEvent(
1093+
close_reason=close_reason,
1094+
error=error,
1095+
)
1096+
)
1097+
else:
1098+
self._event_dispatcher.dispatch(
1099+
AfterConnectionClosedEvent(
1100+
close_reason=CloseReason.APPLICATION_CLOSE
1101+
)
1102+
)
1103+
10811104
def mark_for_reconnect(self):
10821105
self._should_reconnect = True
10831106

@@ -1095,7 +1118,7 @@ def _send_ping(self):
10951118

10961119
def _ping_failed(self, error, failure_count):
10971120
"""Function to call when PING fails"""
1098-
self.disconnect(error, failure_count)
1121+
self.disconnect(error=error, failure_count=failure_count, health_check_failed=True)
10991122

11001123
def check_health(self):
11011124
"""Check the health of the connection with a PING/PONG"""
@@ -1451,10 +1474,10 @@ def connect(self):
14511474
def on_connect(self):
14521475
self._conn.on_connect()
14531476

1454-
def disconnect(self, *args):
1477+
def disconnect(self, *args, **kwargs):
14551478
with self._cache_lock:
14561479
self._cache.flush()
1457-
self._conn.disconnect(*args)
1480+
self._conn.disconnect(*args, **kwargs)
14581481

14591482
def check_health(self):
14601483
self._conn.check_health()
@@ -2648,6 +2671,8 @@ def _checkpid(self) -> None:
26482671
def get_connection(self, command_name=None, *keys, **options) -> "Connection":
26492672
"Get a connection from the pool"
26502673

2674+
# Start timing for observability
2675+
start_time_acquired = time.monotonic()
26512676
self._checkpid()
26522677
is_created = False
26532678

@@ -2656,7 +2681,7 @@ def get_connection(self, command_name=None, *keys, **options) -> "Connection":
26562681
connection = self._available_connections.pop()
26572682
except IndexError:
26582683
# Start timing for observability
2659-
start_time = time.monotonic()
2684+
start_time_created = time.monotonic()
26602685

26612686
connection = self.make_connection()
26622687
is_created = True
@@ -2691,9 +2716,16 @@ def get_connection(self, command_name=None, *keys, **options) -> "Connection":
26912716
self._event_dispatcher.dispatch(
26922717
AfterConnectionCreatedEvent(
26932718
connection_pool=self,
2694-
duration_seconds=time.monotonic() - start_time,
2719+
duration_seconds=time.monotonic() - start_time_created,
26952720
)
26962721
)
2722+
2723+
self._event_dispatcher.dispatch(
2724+
AfterConnectionAcquiredEvent(
2725+
connection_pool=self,
2726+
duration_seconds=time.monotonic() - start_time_acquired,
2727+
)
2728+
)
26972729
return connection
26982730

26992731
def get_encoder(self) -> Encoder:
@@ -2957,6 +2989,7 @@ def get_connection(self, command_name=None, *keys, **options):
29572989
create new connections when we need to, i.e.: the actual number of
29582990
connections will only increase in response to demand.
29592991
"""
2992+
start_time_acquired = time.monotonic()
29602993
# Make sure we haven't changed process.
29612994
self._checkpid()
29622995
is_created = False
@@ -2979,7 +3012,7 @@ def get_connection(self, command_name=None, *keys, **options):
29793012
# a new connection to add to the pool.
29803013
if connection is None:
29813014
# Start timing for observability
2982-
start_time = time.monotonic()
3015+
start_time_created = time.monotonic()
29833016
connection = self.make_connection()
29843017
is_created = True
29853018
finally:
@@ -3014,10 +3047,17 @@ def get_connection(self, command_name=None, *keys, **options):
30143047
self._event_dispatcher.dispatch(
30153048
AfterConnectionCreatedEvent(
30163049
connection_pool=self,
3017-
duration_seconds=time.monotonic() - start_time,
3050+
duration_seconds=time.monotonic() - start_time_created,
30183051
)
30193052
)
30203053

3054+
self._event_dispatcher.dispatch(
3055+
AfterConnectionAcquiredEvent(
3056+
connection_pool=self,
3057+
duration_seconds=time.monotonic() - start_time_acquired,
3058+
)
3059+
)
3060+
30213061
return connection
30223062

30233063
def release(self, connection):

redis/event.py

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@
1111
from redis.observability.attributes import PubSubDirection
1212
from redis.observability.recorder import record_operation_duration, record_error_count, record_maint_notification_count, \
1313
record_connection_create_time, init_connection_count, record_connection_relaxed_timeout, record_connection_handoff, \
14-
record_pubsub_message, record_streaming_lag
14+
record_pubsub_message, record_streaming_lag, record_connection_wait_time, record_connection_use_time, \
15+
record_connection_closed
1516
from redis.utils import str_if_bytes
1617

1718

@@ -113,6 +114,12 @@ def __init__(
113114
AfterConnectionHandoffEvent: [
114115
ExportConnectionHandoffMetric(),
115116
],
117+
AfterConnectionAcquiredEvent: [
118+
ExportConnectionWaitTimeMetric(),
119+
],
120+
AfterConnectionClosedEvent: [
121+
ExportConnectionClosedMetric(),
122+
],
116123
OnPubSubMessageEvent: [
117124
ExportPubSubMessageMetric(),
118125
],
@@ -397,6 +404,22 @@ class AfterConnectionHandoffEvent:
397404
"""
398405
connection_pool: "ConnectionPoolInterface"
399406

407+
@dataclass
408+
class AfterConnectionAcquiredEvent:
409+
"""
410+
Event fired after connection is acquired from pool.
411+
"""
412+
connection_pool: "ConnectionPoolInterface"
413+
duration_seconds: float
414+
415+
@dataclass
416+
class AfterConnectionClosedEvent:
417+
"""
418+
Event fired after connection is closed.
419+
"""
420+
close_reason: "CloseReason"
421+
error: Optional[Exception] = None
422+
400423
class AsyncOnCommandsFailEvent(OnCommandsFailEvent):
401424
pass
402425

@@ -704,3 +727,23 @@ def listen(self, event: OnStreamMessageReceivedEvent):
704727
consumer_group=event.consumer_group,
705728
consumer_name=event.consumer_name,
706729
)
730+
731+
class ExportConnectionWaitTimeMetric(EventListenerInterface):
732+
"""
733+
Listener that exports connection wait time metric.
734+
"""
735+
def listen(self, event: AfterConnectionAcquiredEvent):
736+
record_connection_wait_time(
737+
pool_name=repr(event.connection_pool),
738+
duration_seconds=event.duration_seconds,
739+
)
740+
741+
class ExportConnectionClosedMetric(EventListenerInterface):
742+
"""
743+
Listener that exports connection closed metric.
744+
"""
745+
def listen(self, event: AfterConnectionClosedEvent):
746+
record_connection_closed(
747+
close_reason=event.close_reason,
748+
error_type=event.error,
749+
)

redis/observability/metrics.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import logging
99
import time
10+
from enum import Enum
1011
from typing import Any, Dict, Optional, Callable, List
1112

1213
from redis.observability.attributes import AttributeBuilder, ConnectionState, REDIS_CLIENT_CONNECTION_NOTIFICATION, \
@@ -27,6 +28,22 @@
2728
Meter = None
2829
UpDownCounter = None
2930

31+
class CloseReason(Enum):
32+
"""
33+
Enum representing the reason why a Redis client connection was closed.
34+
35+
Values:
36+
APPLICATION_CLOSE: The connection was closed intentionally by the application
37+
(for example, during normal shutdown or explicit cleanup).
38+
ERROR: The connection was closed due to an unexpected error
39+
(for example, network failure or protocol error).
40+
HEALTHCHECK_FAILED: The connection was closed because a health check
41+
or liveness check for the connection failed.
42+
"""
43+
APPLICATION_CLOSE = "application_close"
44+
ERROR = "error"
45+
HEALTHCHECK_FAILED = "healthcheck_failed"
46+
3047

3148
class RedisMetricsCollector:
3249
"""
@@ -401,24 +418,22 @@ def record_operation_duration(
401418

402419
def record_connection_closed(
403420
self,
404-
pool_name: str,
405-
close_reason: Optional[str] = None,
421+
close_reason: Optional[CloseReason] = None,
406422
error_type: Optional[Exception] = None,
407423
) -> None:
408424
"""
409425
Record a connection closed event.
410426
411427
Args:
412-
pool_name: Connection pool name
413-
close_reason: Reason for closing (e.g., 'idle_timeout', 'error', 'shutdown')
428+
close_reason: Reason for closing (e.g. 'error', 'application_close')
414429
error_type: Error type if closed due to error
415430
"""
416431
if not hasattr(self, "connection_closed"):
417432
return
418433

419-
attrs = self.attr_builder.build_connection_attributes(pool_name=pool_name)
434+
attrs = self.attr_builder.build_connection_attributes()
420435
if close_reason:
421-
attrs[REDIS_CLIENT_CONNECTION_CLOSE_REASON] = close_reason
436+
attrs[REDIS_CLIENT_CONNECTION_CLOSE_REASON] = close_reason.value
422437

423438
attrs.update(
424439
self.attr_builder.build_error_attributes(

redis/observability/recorder.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
from typing import Optional, Callable
2424

2525
from redis.observability.attributes import PubSubDirection, ConnectionState
26-
from redis.observability.metrics import RedisMetricsCollector
26+
from redis.observability.metrics import RedisMetricsCollector, CloseReason
2727
from redis.observability.providers import get_observability_instance
2828

2929
# Global metrics collector instance (lazy-initialized)
@@ -251,16 +251,14 @@ def record_connection_use_time(
251251

252252

253253
def record_connection_closed(
254-
pool_name: str,
255-
close_reason: Optional[str] = None,
254+
close_reason: Optional[CloseReason] = None,
256255
error_type: Optional[Exception] = None,
257256
) -> None:
258257
"""
259258
Record a connection closed event.
260259
261260
Args:
262-
pool_name: Connection pool identifier
263-
close_reason: Reason for closing (e.g., 'idle_timeout', 'error', 'shutdown')
261+
close_reason: Reason for closing (e.g. 'error', 'application_close')
264262
error_type: Error type if closed due to error
265263
266264
Example:
@@ -275,7 +273,6 @@ def record_connection_closed(
275273

276274
# try:
277275
_metrics_collector.record_connection_closed(
278-
pool_name=pool_name,
279276
close_reason=close_reason,
280277
error_type=error_type,
281278
)

0 commit comments

Comments
 (0)