2121import logging
2222import threading
2323import typing
24- from typing import Any , Dict , Callable , Iterable , List , Optional , Set , Tuple
24+ from typing import (
25+ Any ,
26+ Dict ,
27+ Callable ,
28+ Iterable ,
29+ List ,
30+ Optional ,
31+ Set ,
32+ Tuple ,
33+ )
2534import uuid
2635
2736from opentelemetry import trace
6069
6170
6271_LOGGER = logging .getLogger (__name__ )
72+ _SLOW_ACK_LOGGER = logging .getLogger ("slow-ack" )
73+ _STREAMS_LOGGER = logging .getLogger ("subscriber-streams" )
74+ _FLOW_CONTROL_LOGGER = logging .getLogger ("subscriber-flow-control" )
75+ _CALLBACK_DELIVERY_LOGGER = logging .getLogger ("callback-delivery" )
76+ _CALLBACK_EXCEPTION_LOGGER = logging .getLogger ("callback-exceptions" )
77+ _EXPIRY_LOGGER = logging .getLogger ("expiry" )
6378_REGULAR_SHUTDOWN_THREAD_NAME = "Thread-RegularStreamShutdown"
6479_RPC_ERROR_THREAD_NAME = "Thread-OnRpcTerminated"
6580_RETRYABLE_STREAM_ERRORS = (
@@ -145,6 +160,14 @@ def _wrap_callback_errors(
145160 callback: The user callback.
146161 message: The Pub/Sub message.
147162 """
163+ _CALLBACK_DELIVERY_LOGGER .debug (
164+ "Message (id=%s, ack_id=%s, ordering_key=%s, exactly_once=%s) received by subscriber callback" ,
165+ message .message_id ,
166+ message .ack_id ,
167+ message .ordering_key ,
168+ message .exactly_once_enabled ,
169+ )
170+
148171 try :
149172 if message .opentelemetry_data :
150173 message .opentelemetry_data .end_subscribe_concurrency_control_span ()
@@ -156,9 +179,15 @@ def _wrap_callback_errors(
156179 # Note: the likelihood of this failing is extremely low. This just adds
157180 # a message to a queue, so if this doesn't work the world is in an
158181 # unrecoverable state and this thread should just bail.
159- _LOGGER .exception (
160- "Top-level exception occurred in callback while processing a message"
182+
183+ _CALLBACK_EXCEPTION_LOGGER .exception (
184+ "Message (id=%s, ack_id=%s, ordering_key=%s, exactly_once=%s)'s callback threw exception, nacking message." ,
185+ message .message_id ,
186+ message .ack_id ,
187+ message .ordering_key ,
188+ message .exactly_once_enabled ,
161189 )
190+
162191 message .nack ()
163192 on_callback_error (exc )
164193
@@ -199,6 +228,9 @@ def _process_requests(
199228 error_status : Optional ["status_pb2.Status" ],
200229 ack_reqs_dict : Dict [str , requests .AckRequest ],
201230 errors_dict : Optional [Dict [str , str ]],
231+ ack_histogram : Optional [histogram .Histogram ] = None ,
232+ # TODO - Change this param to a Union of Literals when we drop p3.7 support
233+ req_type : str = "ack" ,
202234):
203235 """Process requests when exactly-once delivery is enabled by referring to
204236 error_status and errors_dict.
@@ -209,28 +241,40 @@ def _process_requests(
209241 """
210242 requests_completed = []
211243 requests_to_retry = []
212- for ack_id in ack_reqs_dict :
244+ for ack_id , ack_request in ack_reqs_dict .items ():
245+ # Debug logging: slow acks
246+ if (
247+ req_type == "ack"
248+ and ack_histogram
249+ and ack_request .time_to_ack > ack_histogram .percentile (percent = 99 )
250+ ):
251+ _SLOW_ACK_LOGGER .debug (
252+ "Message (id=%s, ack_id=%s) ack duration of %s s is higher than the p99 ack duration" ,
253+ ack_request .message_id ,
254+ ack_request .ack_id ,
255+ )
256+
213257 # Handle special errors returned for ack/modack RPCs via the ErrorInfo
214258 # sidecar metadata when exactly-once delivery is enabled.
215259 if errors_dict and ack_id in errors_dict :
216260 exactly_once_error = errors_dict [ack_id ]
217261 if exactly_once_error .startswith ("TRANSIENT_" ):
218- requests_to_retry .append (ack_reqs_dict [ ack_id ] )
262+ requests_to_retry .append (ack_request )
219263 else :
220264 if exactly_once_error == "PERMANENT_FAILURE_INVALID_ACK_ID" :
221265 exc = AcknowledgeError (AcknowledgeStatus .INVALID_ACK_ID , info = None )
222266 else :
223267 exc = AcknowledgeError (AcknowledgeStatus .OTHER , exactly_once_error )
224- future = ack_reqs_dict [ ack_id ] .future
268+ future = ack_request .future
225269 if future is not None :
226270 future .set_exception (exc )
227- requests_completed .append (ack_reqs_dict [ ack_id ] )
271+ requests_completed .append (ack_request )
228272 # Temporary GRPC errors are retried
229273 elif (
230274 error_status
231275 and error_status .code in _EXACTLY_ONCE_DELIVERY_TEMPORARY_RETRY_ERRORS
232276 ):
233- requests_to_retry .append (ack_reqs_dict [ ack_id ] )
277+ requests_to_retry .append (ack_request )
234278 # Other GRPC errors are NOT retried
235279 elif error_status :
236280 if error_status .code == code_pb2 .PERMISSION_DENIED :
@@ -239,20 +283,20 @@ def _process_requests(
239283 exc = AcknowledgeError (AcknowledgeStatus .FAILED_PRECONDITION , info = None )
240284 else :
241285 exc = AcknowledgeError (AcknowledgeStatus .OTHER , str (error_status ))
242- future = ack_reqs_dict [ ack_id ] .future
286+ future = ack_request .future
243287 if future is not None :
244288 future .set_exception (exc )
245- requests_completed .append (ack_reqs_dict [ ack_id ] )
289+ requests_completed .append (ack_request )
246290 # Since no error occurred, requests with futures are completed successfully.
247- elif ack_reqs_dict [ ack_id ] .future :
248- future = ack_reqs_dict [ ack_id ] .future
291+ elif ack_request .future :
292+ future = ack_request .future
249293 # success
250294 assert future is not None
251295 future .set_result (AcknowledgeStatus .SUCCESS )
252- requests_completed .append (ack_reqs_dict [ ack_id ] )
296+ requests_completed .append (ack_request )
253297 # All other requests are considered completed.
254298 else :
255- requests_completed .append (ack_reqs_dict [ ack_id ] )
299+ requests_completed .append (ack_request )
256300
257301 return requests_completed , requests_to_retry
258302
@@ -560,8 +604,10 @@ def maybe_pause_consumer(self) -> None:
560604 with self ._pause_resume_lock :
561605 if self .load >= _MAX_LOAD :
562606 if self ._consumer is not None and not self ._consumer .is_paused :
563- _LOGGER .debug (
564- "Message backlog over load at %.2f, pausing." , self .load
607+ _FLOW_CONTROL_LOGGER .debug (
608+ "Message backlog over load at %.2f (threshold %.2f), initiating client-side flow control" ,
609+ self .load ,
610+ _RESUME_THRESHOLD ,
565611 )
566612 self ._consumer .pause ()
567613
@@ -588,10 +634,18 @@ def maybe_resume_consumer(self) -> None:
588634 self ._maybe_release_messages ()
589635
590636 if self .load < _RESUME_THRESHOLD :
591- _LOGGER .debug ("Current load is %.2f, resuming consumer." , self .load )
637+ _FLOW_CONTROL_LOGGER .debug (
638+ "Current load is %.2f (threshold %.2f), suspending client-side flow control." ,
639+ self .load ,
640+ _RESUME_THRESHOLD ,
641+ )
592642 self ._consumer .resume ()
593643 else :
594- _LOGGER .debug ("Did not resume, current load is %.2f." , self .load )
644+ _FLOW_CONTROL_LOGGER .debug (
645+ "Current load is %.2f (threshold %.2f), retaining client-side flow control." ,
646+ self .load ,
647+ _RESUME_THRESHOLD ,
648+ )
595649
596650 def _maybe_release_messages (self ) -> None :
597651 """Release (some of) the held messages if the current load allows for it.
@@ -702,7 +756,7 @@ def send_unary_ack(
702756
703757 if self ._exactly_once_delivery_enabled ():
704758 requests_completed , requests_to_retry = _process_requests (
705- error_status , ack_reqs_dict , ack_errors_dict
759+ error_status , ack_reqs_dict , ack_errors_dict , self . ack_histogram , "ack"
706760 )
707761 else :
708762 requests_completed = []
@@ -796,7 +850,11 @@ def send_unary_modack(
796850
797851 if self ._exactly_once_delivery_enabled ():
798852 requests_completed , requests_to_retry = _process_requests (
799- error_status , ack_reqs_dict , modack_errors_dict
853+ error_status ,
854+ ack_reqs_dict ,
855+ modack_errors_dict ,
856+ self .ack_histogram ,
857+ "modack" ,
800858 )
801859 else :
802860 requests_completed = []
@@ -1239,6 +1297,11 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None:
12391297 receipt_modack = True ,
12401298 )
12411299
1300+ if len (expired_ack_ids ):
1301+ _EXPIRY_LOGGER .debug (
1302+ "ack ids %s were dropped as they have already expired." , expired_ack_ids
1303+ )
1304+
12421305 with self ._pause_resume_lock :
12431306 if self ._scheduler is None or self ._leaser is None :
12441307 _LOGGER .debug (
@@ -1304,9 +1367,13 @@ def _should_recover(self, exception: BaseException) -> bool:
13041367 # If this is in the list of idempotent exceptions, then we want to
13051368 # recover.
13061369 if isinstance (exception , _RETRYABLE_STREAM_ERRORS ):
1307- _LOGGER .debug ("Observed recoverable stream error %s" , exception )
1370+ _STREAMS_LOGGER .debug (
1371+ "Observed recoverable stream error %s, reopening stream" , exception
1372+ )
13081373 return True
1309- _LOGGER .debug ("Observed non-recoverable stream error %s" , exception )
1374+ _STREAMS_LOGGER .debug (
1375+ "Observed non-recoverable stream error %s, shutting down stream" , exception
1376+ )
13101377 return False
13111378
13121379 def _should_terminate (self , exception : BaseException ) -> bool :
@@ -1326,9 +1393,13 @@ def _should_terminate(self, exception: BaseException) -> bool:
13261393 is_api_error = isinstance (exception , exceptions .GoogleAPICallError )
13271394 # Terminate any non-API errors, or non-retryable errors (permission denied, unauthorized, etc.)
13281395 if not is_api_error or isinstance (exception , _TERMINATING_STREAM_ERRORS ):
1329- _LOGGER .debug ("Observed terminating stream error %s" , exception )
1396+ _STREAMS_LOGGER .debug (
1397+ "Observed terminating stream error %s, shutting down stream" , exception
1398+ )
13301399 return True
1331- _LOGGER .debug ("Observed non-terminating stream error %s" , exception )
1400+ _STREAMS_LOGGER .debug (
1401+ "Observed non-terminating stream error %s, attempting to reopen" , exception
1402+ )
13321403 return False
13331404
13341405 def _on_rpc_done (self , future : Any ) -> None :
0 commit comments