Skip to content
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
77cae8c
feat: debug logs
abbrowne126 Sep 29, 2025
dbf9475
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 29, 2025
d1b88fc
Update lint.yml
abbrowne126 Sep 30, 2025
2908929
Update streaming_pull_manager.py
abbrowne126 Sep 30, 2025
ffaf3e7
fix tests
abbrowne126 Sep 30, 2025
9786cad
Update pytest.ini
abbrowne126 Oct 1, 2025
48157e1
Update pytest.ini
abbrowne126 Oct 1, 2025
def1167
Update pytest.ini
abbrowne126 Oct 1, 2025
748fc27
Merge branch 'main' into add-debug-logs
abbrowne126 Oct 3, 2025
92ebea1
re-enable mypy
abbrowne126 Oct 3, 2025
8af2d93
add temporary ignore for credential warning when creds file used in u…
abbrowne126 Oct 3, 2025
7954be6
fix typo
abbrowne126 Oct 3, 2025
23f099d
fix filter
abbrowne126 Oct 3, 2025
791e5f2
fix samples errors
abbrowne126 Oct 3, 2025
fb76483
remove comment
abbrowne126 Oct 3, 2025
1b0a175
fix wrap callback errors structure
abbrowne126 Oct 3, 2025
01a0acb
adjust how exactly once passed to callback loggers
abbrowne126 Oct 3, 2025
c2224e6
formatting
abbrowne126 Oct 3, 2025
7fbf91b
formatting
abbrowne126 Oct 3, 2025
b41d8ab
adjust logic to log ack requests
abbrowne126 Oct 3, 2025
4ea941d
lint
abbrowne126 Oct 3, 2025
c2f9287
format
abbrowne126 Oct 3, 2025
444ba08
keep mypy disabled for now
abbrowne126 Oct 3, 2025
c10a7e6
try reverting
abbrowne126 Oct 3, 2025
e4140d5
format
abbrowne126 Oct 3, 2025
de4eac6
adjust message logic
abbrowne126 Oct 3, 2025
07cf4f0
add mock message generator
abbrowne126 Oct 3, 2025
df81335
formatting
abbrowne126 Oct 3, 2025
0630f20
clean up unused imports
abbrowne126 Oct 3, 2025
9fb4a1a
add missing param
abbrowne126 Oct 3, 2025
dcfaeff
try reverting callback changes
abbrowne126 Oct 3, 2025
a6a3b22
fix mock generation
abbrowne126 Oct 3, 2025
a48f8fb
try message count reduction
abbrowne126 Oct 3, 2025
c69e9c9
see err
abbrowne126 Oct 3, 2025
aee206e
add req_type property to _process_requests
abbrowne126 Oct 3, 2025
1d12711
format
abbrowne126 Oct 3, 2025
821fe3e
fix typing constraint for py3.7
abbrowne126 Oct 3, 2025
fa53276
noxfile
abbrowne126 Oct 4, 2025
6902ca0
nf 2
abbrowne126 Oct 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions google/cloud/pubsub_v1/subscriber/_protocol/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class AckRequest(NamedTuple):
ordering_key: Optional[str]
future: Optional["futures.Future"]
opentelemetry_data: Optional[SubscribeOpenTelemetry] = None
message_id: Optional[str] = None


class DropRequest(NamedTuple):
Expand All @@ -52,6 +53,7 @@ class ModAckRequest(NamedTuple):
seconds: float
future: Optional["futures.Future"]
opentelemetry_data: Optional[SubscribeOpenTelemetry] = None
message_id: Optional[str] = None


class NackRequest(NamedTuple):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@


_LOGGER = logging.getLogger(__name__)
_SLOW_ACK_LOGGER = logging.getLogger("slow-ack")
_STREAMS_LOGGER = logging.getLogger("subscriber-streams")
_FLOW_CONTROL_LOGGER = logging.getLogger("subscriber-flow-control")
_CALLBACK_DELIVERY_LOGGER = logging.getLogger("callback-delivery")
_CALLBACK_EXCEPTION_LOGGER = logging.getLogger("callback-exceptions")
_EXPIRY_LOGGER = logging.getLogger("expiry")
_REGULAR_SHUTDOWN_THREAD_NAME = "Thread-RegularStreamShutdown"
_RPC_ERROR_THREAD_NAME = "Thread-OnRpcTerminated"
_RETRYABLE_STREAM_ERRORS = (
Expand Down Expand Up @@ -145,6 +151,14 @@ def _wrap_callback_errors(
callback: The user callback.
message: The Pub/Sub message.
"""
_CALLBACK_DELIVERY_LOGGER.debug(
"Message (id=%s, ack_id=%s, ordering_key=%s, exactly_once=%s) received by subscriber callback",
message.message_id,
message.ack_id,
message.ordering_key,
message.exactly_once_enabled,
)

try:
if message.opentelemetry_data:
message.opentelemetry_data.end_subscribe_concurrency_control_span()
Expand All @@ -156,9 +170,15 @@ def _wrap_callback_errors(
# Note: the likelihood of this failing is extremely low. This just adds
# a message to a queue, so if this doesn't work the world is in an
# unrecoverable state and this thread should just bail.
_LOGGER.exception(
"Top-level exception occurred in callback while processing a message"

_CALLBACK_EXCEPTION_LOGGER.exception(
"Message (id=%s, ack_id=%s, ordering_key=%s, exactly_once=%s)'s callback threw exception, nacking message.",
message.message_id,
message.ack_id,
message.ordering_key,
message.exactly_once_enabled,
)

message.nack()
on_callback_error(exc)

Expand Down Expand Up @@ -199,6 +219,7 @@ def _process_requests(
error_status: Optional["status_pb2.Status"],
ack_reqs_dict: Dict[str, requests.AckRequest],
errors_dict: Optional[Dict[str, str]],
ack_histogram: Optional[histogram.Histogram] = None,
):
"""Process requests when exactly-once delivery is enabled by referring to
error_status and errors_dict.
Expand All @@ -210,6 +231,16 @@ def _process_requests(
requests_completed = []
requests_to_retry = []
for ack_id in ack_reqs_dict:
# Debug logging: slow acks
if ack_histogram and ack_reqs_dict[
ack_id
].time_to_ack > ack_histogram.percentile(percent=99):
_SLOW_ACK_LOGGER.debug(
"Message (id=%s, ack_id=%s) ack duration of %s s is higher than the p99 ack duration",
ack_reqs_dict[ack_id].message_id,
ack_reqs_dict[ack_id].ack_id,
)

# Handle special errors returned for ack/modack RPCs via the ErrorInfo
# sidecar metadata when exactly-once delivery is enabled.
if errors_dict and ack_id in errors_dict:
Expand Down Expand Up @@ -560,8 +591,10 @@ def maybe_pause_consumer(self) -> None:
with self._pause_resume_lock:
if self.load >= _MAX_LOAD:
if self._consumer is not None and not self._consumer.is_paused:
_LOGGER.debug(
"Message backlog over load at %.2f, pausing.", self.load
_FLOW_CONTROL_LOGGER.debug(
"Message backlog over load at %.2f (threshold %.2f), initiating client-side flow control",
self.load,
_RESUME_THRESHOLD,
)
self._consumer.pause()

Expand All @@ -588,10 +621,18 @@ def maybe_resume_consumer(self) -> None:
self._maybe_release_messages()

if self.load < _RESUME_THRESHOLD:
_LOGGER.debug("Current load is %.2f, resuming consumer.", self.load)
_FLOW_CONTROL_LOGGER.debug(
"Current load is %.2f (threshold %.2f), suspending client-side flow control.",
self.load,
_RESUME_THRESHOLD,
)
self._consumer.resume()
else:
_LOGGER.debug("Did not resume, current load is %.2f.", self.load)
_FLOW_CONTROL_LOGGER.debug(
"Current load is %.2f (threshold %.2f), retaining client-side flow control.",
self.load,
_RESUME_THRESHOLD,
)

def _maybe_release_messages(self) -> None:
"""Release (some of) the held messages if the current load allows for it.
Expand Down Expand Up @@ -702,7 +743,7 @@ def send_unary_ack(

if self._exactly_once_delivery_enabled():
requests_completed, requests_to_retry = _process_requests(
error_status, ack_reqs_dict, ack_errors_dict
error_status, ack_reqs_dict, ack_errors_dict, self.ack_histogram
)
else:
requests_completed = []
Expand Down Expand Up @@ -796,7 +837,7 @@ def send_unary_modack(

if self._exactly_once_delivery_enabled():
requests_completed, requests_to_retry = _process_requests(
error_status, ack_reqs_dict, modack_errors_dict
error_status, ack_reqs_dict, modack_errors_dict, self.ack_histogram
)
else:
requests_completed = []
Expand Down Expand Up @@ -1239,6 +1280,11 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None:
receipt_modack=True,
)

if len(expired_ack_ids):
_EXPIRY_LOGGER.debug(
"ack ids %s were dropped as they have already expired."
)

with self._pause_resume_lock:
if self._scheduler is None or self._leaser is None:
_LOGGER.debug(
Expand Down Expand Up @@ -1304,9 +1350,13 @@ def _should_recover(self, exception: BaseException) -> bool:
# If this is in the list of idempotent exceptions, then we want to
# recover.
if isinstance(exception, _RETRYABLE_STREAM_ERRORS):
_LOGGER.debug("Observed recoverable stream error %s", exception)
_STREAMS_LOGGER.debug(
"Observed recoverable stream error %s, reopening stream", exception
)
return True
_LOGGER.debug("Observed non-recoverable stream error %s", exception)
_STREAMS_LOGGER.debug(
"Observed non-recoverable stream error %s, shutting down stream", exception
)
return False

def _should_terminate(self, exception: BaseException) -> bool:
Expand All @@ -1326,9 +1376,13 @@ def _should_terminate(self, exception: BaseException) -> bool:
is_api_error = isinstance(exception, exceptions.GoogleAPICallError)
# Terminate any non-API errors, or non-retryable errors (permission denied, unauthorized, etc.)
if not is_api_error or isinstance(exception, _TERMINATING_STREAM_ERRORS):
_LOGGER.debug("Observed terminating stream error %s", exception)
_STREAMS_LOGGER.debug(
"Observed terminating stream error %s, shutting down stream", exception
)
return True
_LOGGER.debug("Observed non-terminating stream error %s", exception)
_STREAMS_LOGGER.debug(
"Observed non-terminating stream error %s, attempting to reopen", exception
)
return False

def _on_rpc_done(self, future: Any) -> None:
Expand Down
30 changes: 30 additions & 0 deletions google/cloud/pubsub_v1/subscriber/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import datetime as dt
import json
import logging
import math
import time
import typing
Expand Down Expand Up @@ -43,6 +44,8 @@
attributes: {}
}}"""

_ACK_NACK_LOGGER = logging.getLogger("ack-nack")

_SUCCESS_FUTURE = futures.Future()
_SUCCESS_FUTURE.set_result(AcknowledgeStatus.SUCCESS)

Expand Down Expand Up @@ -274,6 +277,7 @@ def ack(self) -> None:
time_to_ack = math.ceil(time.time() - self._received_timestamp)
self._request_queue.put(
requests.AckRequest(
message_id=self.message_id,
ack_id=self._ack_id,
byte_size=self.size,
time_to_ack=time_to_ack,
Expand All @@ -282,6 +286,12 @@ def ack(self) -> None:
opentelemetry_data=self.opentelemetry_data,
)
)
_ACK_NACK_LOGGER.debug(
"Called ack for message (id=%s, ack_id=%s, ordering_key=%s)",
self.message_id,
self.ack_id,
self.ordering_key,
)

def ack_with_response(self) -> "futures.Future":
"""Acknowledge the given message.
Expand Down Expand Up @@ -322,6 +332,12 @@ def ack_with_response(self) -> "futures.Future":
pubsub_v1.subscriber.exceptions.AcknowledgeError exception
will be thrown.
"""
_ACK_NACK_LOGGER.debug(
"Called ack for message (id=%s, ack_id=%s, ordering_key=%s, exactly_once=True)",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could users still use the ack_with_response interface when EOD is disabled? If so, it would be worth setting exactly_once=self.exactly_once_enabled

self.message_id,
self.ack_id,
self.ordering_key,
)
if self.opentelemetry_data:
self.opentelemetry_data.add_process_span_event("ack called")
self.opentelemetry_data.end_process_span()
Expand All @@ -335,6 +351,7 @@ def ack_with_response(self) -> "futures.Future":
time_to_ack = math.ceil(time.time() - self._received_timestamp)
self._request_queue.put(
requests.AckRequest(
message_id=self.message_id,
ack_id=self._ack_id,
byte_size=self.size,
time_to_ack=time_to_ack,
Expand Down Expand Up @@ -382,6 +399,7 @@ def modify_ack_deadline(self, seconds: int) -> None:
"""
self._request_queue.put(
requests.ModAckRequest(
message_id=self.message_id,
ack_id=self._ack_id,
seconds=seconds,
future=None,
Expand Down Expand Up @@ -445,6 +463,7 @@ def modify_ack_deadline_with_response(self, seconds: int) -> "futures.Future":

self._request_queue.put(
requests.ModAckRequest(
message_id=self.message_id,
ack_id=self._ack_id,
seconds=seconds,
future=req_future,
Expand All @@ -461,6 +480,13 @@ def nack(self) -> None:
may take place immediately or after a delay, and may arrive at this subscriber
or another.
"""
_ACK_NACK_LOGGER.debug(
"Called nack for message (id=%s, ack_id=%s, ordering_key=%s, exactly_once=%s)",
self.message_id,
self.ack_id,
self.ordering_key,
self._exactly_once_delivery_enabled_func(),
)
if self.opentelemetry_data:
self.opentelemetry_data.add_process_span_event("nack called")
self.opentelemetry_data.end_process_span()
Expand Down Expand Up @@ -530,3 +556,7 @@ def nack_with_response(self) -> "futures.Future":
)

return future

@property
def exactly_once_enabled(self):
return self._exactly_once_delivery_enabled_func()
3 changes: 1 addition & 2 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,7 @@ def mypy(session):
# TODO: Only check the hand-written layer, the generated code does not pass
# mypy checks yet.
# https://github.com/googleapis/gapic-generator-python/issues/1092
# TODO: Re-enable mypy checks once we merge, since incremental checks are failing due to protobuf upgrade
# session.run("mypy", "-p", "google.cloud", "--exclude", "google/pubsub_v1/")
session.run("mypy", "-p", "-i", "google.cloud", "--exclude", "google/pubsub_v1/")


@nox.session(python=DEFAULT_PYTHON_VERSION)
Expand Down
2 changes: 2 additions & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,6 @@ filterwarnings =
# Remove once https://github.com/googleapis/gapic-generator-python/issues/2303 is fixed
ignore:The python-bigquery library will stop supporting Python 3.7:PendingDeprecationWarning
# Remove once we move off credential files https://github.com/googleapis/google-auth-library-python/pull/1812
# Note that these are used in tests only
ignore:Your config file at [/home/kbuilder/.docker/config.json] contains these credential helper entries:DeprecationWarning
ignore:The `credentials_file` argument is deprecated because of a potential security risk:DeprecationWarning
24 changes: 21 additions & 3 deletions tests/unit/pubsub_v1/subscriber/test_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ def test_ack():
msg.ack()
put.assert_called_once_with(
requests.AckRequest(
message_id=msg.message_id,
ack_id="bogus_ack_id",
byte_size=30,
time_to_ack=mock.ANY,
Expand All @@ -305,6 +306,7 @@ def test_ack_with_response_exactly_once_delivery_disabled():
future = msg.ack_with_response()
put.assert_called_once_with(
requests.AckRequest(
message_id=msg.message_id,
ack_id="bogus_ack_id",
byte_size=30,
time_to_ack=mock.ANY,
Expand All @@ -325,6 +327,7 @@ def test_ack_with_response_exactly_once_delivery_enabled():
future = msg.ack_with_response()
put.assert_called_once_with(
requests.AckRequest(
message_id=msg.message_id,
ack_id="bogus_ack_id",
byte_size=30,
time_to_ack=mock.ANY,
Expand All @@ -350,7 +353,12 @@ def test_modify_ack_deadline():
with mock.patch.object(msg._request_queue, "put") as put:
msg.modify_ack_deadline(60)
put.assert_called_once_with(
requests.ModAckRequest(ack_id="bogus_ack_id", seconds=60, future=None)
requests.ModAckRequest(
message_id=msg.message_id,
ack_id="bogus_ack_id",
seconds=60,
future=None,
)
)
check_call_types(put, requests.ModAckRequest)

Expand All @@ -360,7 +368,12 @@ def test_modify_ack_deadline_with_response_exactly_once_delivery_disabled():
with mock.patch.object(msg._request_queue, "put") as put:
future = msg.modify_ack_deadline_with_response(60)
put.assert_called_once_with(
requests.ModAckRequest(ack_id="bogus_ack_id", seconds=60, future=None)
requests.ModAckRequest(
message_id=msg.message_id,
ack_id="bogus_ack_id",
seconds=60,
future=None,
)
)
assert future.result() == AcknowledgeStatus.SUCCESS
assert future == message._SUCCESS_FUTURE
Expand All @@ -374,7 +387,12 @@ def test_modify_ack_deadline_with_response_exactly_once_delivery_enabled():
with mock.patch.object(msg._request_queue, "put") as put:
future = msg.modify_ack_deadline_with_response(60)
put.assert_called_once_with(
requests.ModAckRequest(ack_id="bogus_ack_id", seconds=60, future=future)
requests.ModAckRequest(
message_id=msg.message_id,
ack_id="bogus_ack_id",
seconds=60,
future=future,
)
)
check_call_types(put, requests.ModAckRequest)

Expand Down
Loading
Loading