Skip to content

Commit 50d2068

Browse files
committed
Added exception handling
1 parent 3425939 commit 50d2068

File tree

2 files changed

+149
-108
lines changed

2 files changed

+149
-108
lines changed

redis/asyncio/observability/recorder.py

Lines changed: 144 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,6 @@ async def _get_or_create_collector() -> Optional[RedisMetricsCollector]:
5252
"""
5353
global _async_metrics_collector
5454

55-
if _async_metrics_collector is not None:
56-
return _async_metrics_collector
57-
58-
# Double-check after acquiring lock
5955
if _async_metrics_collector is not None:
6056
return _async_metrics_collector
6157

@@ -133,19 +129,22 @@ async def record_operation_duration(
133129
if collector is None:
134130
return
135131

136-
collector.record_operation_duration(
137-
command_name=command_name,
138-
duration_seconds=duration_seconds,
139-
server_address=server_address,
140-
server_port=server_port,
141-
db_namespace=db_namespace,
142-
error_type=error,
143-
network_peer_address=server_address,
144-
network_peer_port=server_port,
145-
is_blocking=is_blocking,
146-
batch_size=batch_size,
147-
retry_attempts=retry_attempts,
148-
)
132+
try:
133+
collector.record_operation_duration(
134+
command_name=command_name,
135+
duration_seconds=duration_seconds,
136+
server_address=server_address,
137+
server_port=server_port,
138+
db_namespace=db_namespace,
139+
error_type=error,
140+
network_peer_address=server_address,
141+
network_peer_port=server_port,
142+
is_blocking=is_blocking,
143+
batch_size=batch_size,
144+
retry_attempts=retry_attempts,
145+
)
146+
except Exception:
147+
pass
149148

150149

151150
async def record_connection_create_time(
@@ -163,10 +162,13 @@ async def record_connection_create_time(
163162
if collector is None:
164163
return
165164

166-
collector.record_connection_create_time(
167-
connection_pool=connection_pool,
168-
duration_seconds=duration_seconds,
169-
)
165+
try:
166+
collector.record_connection_create_time(
167+
connection_pool=connection_pool,
168+
duration_seconds=duration_seconds,
169+
)
170+
except Exception:
171+
pass
170172

171173

172174
async def init_connection_count() -> None:
@@ -187,9 +189,12 @@ def observable_callback(__):
187189

188190
return observations
189191

190-
collector.init_connection_count(
191-
callback=observable_callback,
192-
)
192+
try:
193+
collector.init_connection_count(
194+
callback=observable_callback,
195+
)
196+
except Exception:
197+
pass
193198

194199

195200
async def register_pools_connection_count(
@@ -212,10 +217,13 @@ def connection_count_callback():
212217
observations.append(Observation(count, attributes=attributes))
213218
return observations
214219

215-
observables_registry = get_observables_registry_instance()
216-
observables_registry.register(
217-
CONNECTION_COUNT_REGISTRY_KEY, connection_count_callback
218-
)
220+
try:
221+
observables_registry = get_observables_registry_instance()
222+
observables_registry.register(
223+
CONNECTION_COUNT_REGISTRY_KEY, connection_count_callback
224+
)
225+
except Exception:
226+
pass
219227

220228

221229
async def record_connection_timeout(
@@ -231,9 +239,12 @@ async def record_connection_timeout(
231239
if collector is None:
232240
return
233241

234-
collector.record_connection_timeout(
235-
pool_name=pool_name,
236-
)
242+
try:
243+
collector.record_connection_timeout(
244+
pool_name=pool_name,
245+
)
246+
except Exception:
247+
pass
237248

238249

239250
async def record_connection_wait_time(
@@ -251,10 +262,13 @@ async def record_connection_wait_time(
251262
if collector is None:
252263
return
253264

254-
collector.record_connection_wait_time(
255-
pool_name=pool_name,
256-
duration_seconds=duration_seconds,
257-
)
265+
try:
266+
collector.record_connection_wait_time(
267+
pool_name=pool_name,
268+
duration_seconds=duration_seconds,
269+
)
270+
except Exception:
271+
pass
258272

259273

260274
async def record_connection_closed(
@@ -272,10 +286,13 @@ async def record_connection_closed(
272286
if collector is None:
273287
return
274288

275-
collector.record_connection_closed(
276-
close_reason=close_reason,
277-
error_type=error_type,
278-
)
289+
try:
290+
collector.record_connection_closed(
291+
close_reason=close_reason,
292+
error_type=error_type,
293+
)
294+
except Exception:
295+
pass
279296

280297

281298
async def record_connection_relaxed_timeout(
@@ -295,11 +312,14 @@ async def record_connection_relaxed_timeout(
295312
if collector is None:
296313
return
297314

298-
collector.record_connection_relaxed_timeout(
299-
connection_name=connection_name,
300-
maint_notification=maint_notification,
301-
relaxed=relaxed,
302-
)
315+
try:
316+
collector.record_connection_relaxed_timeout(
317+
connection_name=connection_name,
318+
maint_notification=maint_notification,
319+
relaxed=relaxed,
320+
)
321+
except Exception:
322+
pass
303323

304324

305325
async def record_connection_handoff(
@@ -315,9 +335,12 @@ async def record_connection_handoff(
315335
if collector is None:
316336
return
317337

318-
collector.record_connection_handoff(
319-
pool_name=pool_name,
320-
)
338+
try:
339+
collector.record_connection_handoff(
340+
pool_name=pool_name,
341+
)
342+
except Exception:
343+
pass
321344

322345

323346
async def record_error_count(
@@ -345,15 +368,18 @@ async def record_error_count(
345368
if collector is None:
346369
return
347370

348-
collector.record_error_count(
349-
server_address=server_address,
350-
server_port=server_port,
351-
network_peer_address=network_peer_address,
352-
network_peer_port=network_peer_port,
353-
error_type=error_type,
354-
retry_attempts=retry_attempts,
355-
is_internal=is_internal,
356-
)
371+
try:
372+
collector.record_error_count(
373+
server_address=server_address,
374+
server_port=server_port,
375+
network_peer_address=network_peer_address,
376+
network_peer_port=network_peer_port,
377+
error_type=error_type,
378+
retry_attempts=retry_attempts,
379+
is_internal=is_internal,
380+
)
381+
except Exception:
382+
pass
357383

358384

359385
async def record_pubsub_message(
@@ -383,11 +409,14 @@ async def record_pubsub_message(
383409
# Normalize bytes to str for OTel attributes
384410
effective_channel = str_if_bytes(channel)
385411

386-
collector.record_pubsub_message(
387-
direction=direction,
388-
channel=effective_channel,
389-
sharded=sharded,
390-
)
412+
try:
413+
collector.record_pubsub_message(
414+
direction=direction,
415+
channel=effective_channel,
416+
sharded=sharded,
417+
)
418+
except Exception:
419+
pass
391420

392421

393422
async def record_streaming_lag(
@@ -416,12 +445,15 @@ async def record_streaming_lag(
416445
if config is not None and config.hide_stream_names:
417446
effective_stream_name = None
418447

419-
collector.record_streaming_lag(
420-
lag_seconds=lag_seconds,
421-
stream_name=effective_stream_name,
422-
consumer_group=consumer_group,
423-
consumer_name=consumer_name,
424-
)
448+
try:
449+
collector.record_streaming_lag(
450+
lag_seconds=lag_seconds,
451+
stream_name=effective_stream_name,
452+
consumer_group=consumer_group,
453+
consumer_name=consumer_name,
454+
)
455+
except Exception:
456+
pass
425457

426458

427459
async def record_streaming_lag_from_response(
@@ -446,20 +478,40 @@ async def record_streaming_lag_from_response(
446478
if not response:
447479
return
448480

449-
now = datetime.now().timestamp()
481+
try:
482+
now = datetime.now().timestamp()
483+
484+
# Check if stream names should be hidden
485+
config = await _get_config()
486+
hide_stream_names = config is not None and config.hide_stream_names
450487

451-
# Check if stream names should be hidden
452-
config = await _get_config()
453-
hide_stream_names = config is not None and config.hide_stream_names
454-
455-
# RESP3 format: dict
456-
if isinstance(response, dict):
457-
for stream_name, stream_messages in response.items():
458-
effective_stream_name = (
459-
None if hide_stream_names else str_if_bytes(stream_name)
460-
)
461-
for messages in stream_messages:
462-
for message in messages:
488+
# RESP3 format: dict
489+
if isinstance(response, dict):
490+
for stream_name, stream_messages in response.items():
491+
effective_stream_name = (
492+
None if hide_stream_names else str_if_bytes(stream_name)
493+
)
494+
for messages in stream_messages:
495+
for message in messages:
496+
message_id, _ = message
497+
message_id = str_if_bytes(message_id)
498+
timestamp, _ = message_id.split("-")
499+
# Ensure lag is non-negative (clock skew can cause negative values)
500+
lag_seconds = max(0.0, now - int(timestamp) / 1000)
501+
502+
collector.record_streaming_lag(
503+
lag_seconds=lag_seconds,
504+
stream_name=effective_stream_name,
505+
consumer_group=consumer_group,
506+
consumer_name=consumer_name,
507+
)
508+
else:
509+
# RESP2 format: list
510+
for stream_entry in response:
511+
stream_name = str_if_bytes(stream_entry[0])
512+
effective_stream_name = None if hide_stream_names else stream_name
513+
514+
for message in stream_entry[1]:
463515
message_id, _ = message
464516
message_id = str_if_bytes(message_id)
465517
timestamp, _ = message_id.split("-")
@@ -472,25 +524,8 @@ async def record_streaming_lag_from_response(
472524
consumer_group=consumer_group,
473525
consumer_name=consumer_name,
474526
)
475-
else:
476-
# RESP2 format: list
477-
for stream_entry in response:
478-
stream_name = str_if_bytes(stream_entry[0])
479-
effective_stream_name = None if hide_stream_names else stream_name
480-
481-
for message in stream_entry[1]:
482-
message_id, _ = message
483-
message_id = str_if_bytes(message_id)
484-
timestamp, _ = message_id.split("-")
485-
# Ensure lag is non-negative (clock skew can cause negative values)
486-
lag_seconds = max(0.0, now - int(timestamp) / 1000)
487-
488-
collector.record_streaming_lag(
489-
lag_seconds=lag_seconds,
490-
stream_name=effective_stream_name,
491-
consumer_group=consumer_group,
492-
consumer_name=consumer_name,
493-
)
527+
except Exception:
528+
pass
494529

495530

496531
async def record_maint_notification_count(
@@ -514,13 +549,16 @@ async def record_maint_notification_count(
514549
if collector is None:
515550
return
516551

517-
collector.record_maint_notification_count(
518-
server_address=server_address,
519-
server_port=server_port,
520-
network_peer_address=network_peer_address,
521-
network_peer_port=network_peer_port,
522-
maint_notification=maint_notification,
523-
)
552+
try:
553+
collector.record_maint_notification_count(
554+
server_address=server_address,
555+
server_port=server_port,
556+
network_peer_address=network_peer_address,
557+
network_peer_port=network_peer_port,
558+
maint_notification=maint_notification,
559+
)
560+
except Exception:
561+
pass
524562

525563

526564
async def record_geo_failover(

tests/test_asyncio/test_observability/test_recorder.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,7 @@ async def test_record_connection_create_time(self, setup_async_recorder):
226226
mock_pool = MagicMock()
227227
mock_pool.__class__.__name__ = "ConnectionPool"
228228
mock_pool.connection_kwargs = {"host": "localhost", "port": 6379, "db": 0}
229+
mock_pool._pool_id = "a1b2c3d4" # Mock the unique pool ID
229230

230231
await recorder.record_connection_create_time(
231232
connection_pool=mock_pool,
@@ -235,10 +236,12 @@ async def test_record_connection_create_time(self, setup_async_recorder):
235236
instruments.connection_create_time.record.assert_called_once()
236237
call_args = instruments.connection_create_time.record.call_args
237238

239+
# Verify duration value
238240
assert call_args[0][0] == 0.050
241+
242+
# Verify attributes
239243
attrs = call_args[1]["attributes"]
240-
# Pool name is generated from class name and connection kwargs
241-
assert "localhost:6379/0" in attrs[DB_CLIENT_CONNECTION_POOL_NAME]
244+
assert attrs[DB_CLIENT_CONNECTION_POOL_NAME] == "localhost:6379_a1b2c3d4"
242245

243246

244247
@pytest.mark.asyncio

0 commit comments

Comments
 (0)