Skip to content

Commit f46b1f5

Browse files
Added export of connection basic metrics (#3891)
* Added export of connection basic metrics * Added new error category attribute * Update tests/test_observability/test_recorder.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update tests/test_observability/test_recorder.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Apply comments * Applied comments --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent 2e7b2db commit f46b1f5

File tree

12 files changed

+785
-233
lines changed

12 files changed

+785
-233
lines changed

redis/connection.py

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@
3535
from .auth.token import TokenInterface
3636
from .backoff import NoBackoff
3737
from .credentials import CredentialProvider, UsernamePasswordCredentialProvider
38-
from .event import AfterConnectionReleasedEvent, EventDispatcher, OnErrorEvent, OnMaintenanceNotificationEvent
38+
from .event import AfterConnectionReleasedEvent, EventDispatcher, OnErrorEvent, OnMaintenanceNotificationEvent, \
39+
AfterConnectionCreatedEvent
3940
from .exceptions import (
4041
AuthenticationError,
4142
AuthenticationWrongNumberOfArgsError,
@@ -53,6 +54,8 @@
5354
MaintNotificationsConnectionHandler,
5455
MaintNotificationsPoolHandler, MaintenanceNotification,
5556
)
57+
from .observability.attributes import AttributeBuilder, DB_CLIENT_CONNECTION_STATE, ConnectionState, \
58+
DB_CLIENT_CONNECTION_POOL_NAME
5659
from .retry import Retry
5760
from .utils import (
5861
CRYPTOGRAPHY_AVAILABLE,
@@ -2060,6 +2063,13 @@ def set_retry(self, retry: Retry):
20602063
def re_auth_callback(self, token: TokenInterface):
20612064
pass
20622065

2066+
@abstractmethod
2067+
def get_connection_count(self) -> list[tuple[int, dict]]:
2068+
"""
2069+
Returns a connection count (both idle and in use).
2070+
"""
2071+
pass
2072+
20632073

20642074
class MaintNotificationsAbstractConnectionPool:
20652075
"""
@@ -2088,8 +2098,12 @@ def __init__(
20882098
"Maintenance notifications handlers on connection are only supported with RESP version 3"
20892099
)
20902100

2101+
self._event_dispatcher = kwargs.get("event_dispatcher", None)
2102+
if self._event_dispatcher is None:
2103+
self._event_dispatcher = EventDispatcher()
2104+
20912105
self._maint_notifications_pool_handler = MaintNotificationsPoolHandler(
2092-
self, maint_notifications_config
2106+
self, maint_notifications_config, self._event_dispatcher
20932107
)
20942108

20952109
self._update_connection_kwargs_for_maint_notifications(
@@ -2157,7 +2171,7 @@ def update_maint_notifications_config(
21572171
# first update pool settings
21582172
if not self._maint_notifications_pool_handler:
21592173
self._maint_notifications_pool_handler = MaintNotificationsPoolHandler(
2160-
self, maint_notifications_config
2174+
self, maint_notifications_config, self._event_dispatcher
21612175
)
21622176
else:
21632177
self._maint_notifications_pool_handler.config = maint_notifications_config
@@ -2635,11 +2649,17 @@ def get_connection(self, command_name=None, *keys, **options) -> "Connection":
26352649
"Get a connection from the pool"
26362650

26372651
self._checkpid()
2652+
is_created = False
2653+
26382654
with self._lock:
26392655
try:
26402656
connection = self._available_connections.pop()
26412657
except IndexError:
2658+
# Start timing for observability
2659+
start_time = time.monotonic()
2660+
26422661
connection = self.make_connection()
2662+
is_created = True
26432663
self._in_use_connections.add(connection)
26442664

26452665
try:
@@ -2666,6 +2686,14 @@ def get_connection(self, command_name=None, *keys, **options) -> "Connection":
26662686
# leak it
26672687
self.release(connection)
26682688
raise
2689+
2690+
if is_created:
2691+
self._event_dispatcher.dispatch(
2692+
AfterConnectionCreatedEvent(
2693+
connection_pool=self,
2694+
duration_seconds=time.monotonic() - start_time,
2695+
)
2696+
)
26692697
return connection
26702698

26712699
def get_encoder(self) -> Encoder:
@@ -2785,6 +2813,20 @@ async def _mock(self, error: RedisError):
27852813
"""
27862814
pass
27872815

2816+
def get_connection_count(self) -> List[tuple[int, dict]]:
2817+
attributes = AttributeBuilder.build_base_attributes()
2818+
attributes[DB_CLIENT_CONNECTION_POOL_NAME] = repr(self)
2819+
free_connections_attributes = attributes.copy()
2820+
in_use_connections_attributes = attributes.copy()
2821+
2822+
free_connections_attributes[DB_CLIENT_CONNECTION_STATE] = ConnectionState.IDLE.value
2823+
in_use_connections_attributes[DB_CLIENT_CONNECTION_STATE] = ConnectionState.USED.value
2824+
2825+
return [
2826+
(len(self._get_free_connections()), free_connections_attributes),
2827+
(len(self._get_in_use_connections()), in_use_connections_attributes),
2828+
]
2829+
27882830

27892831
class BlockingConnectionPool(ConnectionPool):
27902832
"""
@@ -2917,6 +2959,7 @@ def get_connection(self, command_name=None, *keys, **options):
29172959
"""
29182960
# Make sure we haven't changed process.
29192961
self._checkpid()
2962+
is_created = False
29202963

29212964
# Try and get a connection from the pool. If one isn't available within
29222965
# self.timeout then raise a ``ConnectionError``.
@@ -2935,7 +2978,10 @@ def get_connection(self, command_name=None, *keys, **options):
29352978
# If the ``connection`` is actually ``None`` then that's a cue to make
29362979
# a new connection to add to the pool.
29372980
if connection is None:
2981+
# Start timing for observability
2982+
start_time = time.monotonic()
29382983
connection = self.make_connection()
2984+
is_created = True
29392985
finally:
29402986
if self._locked:
29412987
try:
@@ -2964,6 +3010,14 @@ def get_connection(self, command_name=None, *keys, **options):
29643010
self.release(connection)
29653011
raise
29663012

3013+
if is_created:
3014+
self._event_dispatcher.dispatch(
3015+
AfterConnectionCreatedEvent(
3016+
connection_pool=self,
3017+
duration_seconds=time.monotonic() - start_time,
3018+
)
3019+
)
3020+
29673021
return connection
29683022

29693023
def release(self, connection):

redis/event.py

Lines changed: 77 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@
33
from abc import ABC, abstractmethod
44
from dataclasses import dataclass
55
from enum import Enum
6-
from typing import Dict, List, Optional, Type, Union
6+
from typing import Dict, List, Optional, Type, Union, Callable
77

88
from redis.auth.token import TokenInterface
99
from redis.credentials import CredentialProvider, StreamingCredentialProvider
10-
from redis.observability.recorder import record_operation_duration, record_error_count, record_maint_notification_count
10+
from redis.observability.recorder import record_operation_duration, record_error_count, record_maint_notification_count, \
11+
record_connection_create_time, init_connection_count, record_connection_relaxed_timeout, record_connection_handoff
1112

1213

1314
class EventListenerInterface(ABC):
@@ -85,7 +86,8 @@ def __init__(
8586
ReAuthConnectionListener(),
8687
],
8788
AfterPooledConnectionsInstantiationEvent: [
88-
RegisterReAuthForPooledConnections()
89+
RegisterReAuthForPooledConnections(),
90+
InitializeConnectionCountObservability()
8991
],
9092
AfterSingleConnectionInstantiationEvent: [
9193
RegisterReAuthForSingleConnection()
@@ -97,6 +99,16 @@ def __init__(
9799
AsyncReAuthConnectionListener(),
98100
],
99101
OnErrorEvent: [ExportErrorCountMetric()],
102+
OnMaintenanceNotificationEvent: [
103+
ExportMaintenanceNotificationCountMetric(),
104+
],
105+
AfterConnectionCreatedEvent: [ExportConnectionCreateTimeMetric()],
106+
AfterConnectionTimeoutUpdatedEvent: [
107+
ExportConnectionRelaxedTimeoutMetric(),
108+
],
109+
AfterConnectionHandoffEvent: [
110+
ExportConnectionHandoffMetric(),
111+
],
100112
}
101113

102114
self._lock = threading.Lock()
@@ -333,6 +345,30 @@ class OnMaintenanceNotificationEvent:
333345
notification: "MaintenanceNotification"
334346
connection: "MaintNotificationsAbstractConnection"
335347

348+
@dataclass
349+
class AfterConnectionCreatedEvent:
350+
"""
351+
Event fired after connection is created in pool.
352+
"""
353+
connection_pool: "ConnectionPoolInterface"
354+
duration_seconds: float
355+
356+
@dataclass
357+
class AfterConnectionTimeoutUpdatedEvent:
358+
"""
359+
Event fired after connection timeout is updated.
360+
"""
361+
connection: "MaintNotificationsAbstractConnection"
362+
notification: "MaintenanceNotification"
363+
relaxed: bool
364+
365+
@dataclass
366+
class AfterConnectionHandoffEvent:
367+
"""
368+
Event fired after connection is handed off.
369+
"""
370+
connection_pool: "ConnectionPoolInterface"
371+
336372
class AsyncOnCommandsFailEvent(OnCommandsFailEvent):
337373
pass
338374

@@ -547,4 +583,41 @@ def listen(self, event: OnMaintenanceNotificationEvent):
547583
network_peer_address=event.connection.host,
548584
network_peer_port=event.connection.port,
549585
maint_notification=repr(event.notification),
550-
)
586+
)
587+
588+
class ExportConnectionCreateTimeMetric(EventListenerInterface):
589+
"""
590+
Listener that exports connection create time metric.
591+
"""
592+
def listen(self, event: AfterConnectionCreatedEvent):
593+
record_connection_create_time(
594+
connection_pool=event.connection_pool,
595+
duration_seconds=event.duration_seconds,
596+
)
597+
598+
class InitializeConnectionCountObservability(EventListenerInterface):
599+
"""
600+
Listener that initializes connection count observability.
601+
"""
602+
def listen(self, event: AfterPooledConnectionsInstantiationEvent):
603+
init_connection_count(event.connection_pools)
604+
605+
class ExportConnectionRelaxedTimeoutMetric(EventListenerInterface):
606+
"""
607+
Listener that exports connection relaxed timeout metric.
608+
"""
609+
def listen(self, event: AfterConnectionTimeoutUpdatedEvent):
610+
record_connection_relaxed_timeout(
611+
connection_name=repr(event.connection),
612+
maint_notification=repr(event.notification),
613+
relaxed=event.relaxed,
614+
)
615+
616+
class ExportConnectionHandoffMetric(EventListenerInterface):
617+
"""
618+
Listener that exports connection handoff metric.
619+
"""
620+
def listen(self, event: AfterConnectionHandoffEvent):
621+
record_connection_handoff(
622+
pool_name=repr(event.connection_pool),
623+
)

redis/maint_notifications.py

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77
from abc import ABC, abstractmethod
88
from typing import TYPE_CHECKING, Literal, Optional, Union
99

10-
from redis.event import OnMaintenanceNotificationEvent
10+
from redis.event import OnMaintenanceNotificationEvent, EventDispatcherInterface, EventDispatcher, \
11+
AfterConnectionTimeoutUpdatedEvent, AfterConnectionHandoffEvent
1112
from redis.typing import Number
1213

1314

@@ -560,21 +561,27 @@ def __init__(
560561
self,
561562
pool: "MaintNotificationsAbstractConnectionPool",
562563
config: MaintNotificationsConfig,
564+
event_dispatcher: Optional[EventDispatcherInterface] = None,
563565
) -> None:
564566
self.pool = pool
565567
self.config = config
566568
self._processed_notifications = set()
567569
self._lock = threading.RLock()
568570
self.connection = None
569571

572+
if event_dispatcher is not None:
573+
self.event_dispatcher = event_dispatcher
574+
else:
575+
self.event_dispatcher = EventDispatcher()
576+
570577
def set_connection(self, connection: "MaintNotificationsAbstractConnection"):
571578
self.connection = connection
572579

573580
def get_handler_for_connection(self):
574581
# Copy all data that should be shared between connections
575582
# but each connection should have its own pool handler
576583
# since each connection can be in a different state
577-
copy = MaintNotificationsPoolHandler(self.pool, self.config)
584+
copy = MaintNotificationsPoolHandler(self.pool, self.config, self.event_dispatcher)
578585
copy._processed_notifications = self._processed_notifications
579586
copy._lock = self._lock
580587
copy.connection = None
@@ -683,6 +690,12 @@ def handle_node_moving_notification(self, notification: NodeMovingNotification):
683690
args=(notification,),
684691
).start()
685692

693+
self.event_dispatcher.dispatch(
694+
AfterConnectionHandoffEvent(
695+
connection_pool=self.pool,
696+
)
697+
)
698+
686699
self._processed_notifications.add(notification)
687700

688701
def run_proactive_reconnect(self, moving_address_src: Optional[str] = None):
@@ -784,12 +797,12 @@ def handle_notification(self, notification: MaintenanceNotification):
784797
return
785798

786799
if notification_type:
787-
self.handle_maintenance_start_notification(MaintenanceState.MAINTENANCE)
800+
self.handle_maintenance_start_notification(MaintenanceState.MAINTENANCE, notification=notification)
788801
else:
789-
self.handle_maintenance_completed_notification()
802+
self.handle_maintenance_completed_notification(notification=notification)
790803

791804
def handle_maintenance_start_notification(
792-
self, maintenance_state: MaintenanceState
805+
self, maintenance_state: MaintenanceState, **kwargs
793806
):
794807
if (
795808
self.connection.maintenance_state == MaintenanceState.MOVING
@@ -804,7 +817,16 @@ def handle_maintenance_start_notification(
804817
# extend the timeout for all created connections
805818
self.connection.update_current_socket_timeout(self.config.relaxed_timeout)
806819

807-
def handle_maintenance_completed_notification(self):
820+
if kwargs.get('notification'):
821+
self.connection.event_dispatcher.dispatch(
822+
AfterConnectionTimeoutUpdatedEvent(
823+
connection=self.connection,
824+
notification=kwargs.get('notification'),
825+
relaxed=True,
826+
)
827+
)
828+
829+
def handle_maintenance_completed_notification(self, **kwargs):
808830
# Only reset timeouts if state is not MOVING and relaxed timeouts are enabled
809831
if (
810832
self.connection.maintenance_state == MaintenanceState.MOVING
@@ -816,3 +838,12 @@ def handle_maintenance_completed_notification(self):
816838
# timeouts by providing -1 as the relaxed timeout
817839
self.connection.update_current_socket_timeout(-1)
818840
self.connection.maintenance_state = MaintenanceState.NONE
841+
842+
if kwargs.get('notification'):
843+
self.connection.event_dispatcher.dispatch(
844+
AfterConnectionTimeoutUpdatedEvent(
845+
connection=self.connection,
846+
notification=kwargs.get('notification'),
847+
relaxed=False,
848+
)
849+
)

0 commit comments

Comments
 (0)