Skip to content

Commit bb5f3d1

Browse files
feat: Open Telemetry Publish Side Support (#1241)
1 parent eafc1c8 commit bb5f3d1

File tree

17 files changed

+1220
-143
lines changed

17 files changed

+1220
-143
lines changed
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Copyright 2024, Google LLC All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# Copyright 2024, Google LLC All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from opentelemetry.propagators.textmap import Setter
16+
17+
from google.pubsub_v1 import PubsubMessage
18+
19+
20+
class OpenTelemetryContextSetter(Setter):
21+
"""
22+
Used by Open Telemetry for context propagation.
23+
"""
24+
25+
def set(self, carrier: PubsubMessage, key: str, value: str) -> None:
26+
"""
27+
Injects trace context into Pub/Sub message attributes with
28+
"googclient_" prefix.
29+
30+
Args:
31+
carrier(PubsubMessage): The Pub/Sub message which is the carrier of Open Telemetry
32+
data.
33+
key(str): The key for which the Open Telemetry context data needs to be set.
34+
value(str): The Open Telemetry context value to be set.
35+
36+
Returns:
37+
None
38+
"""
39+
carrier.attributes["googclient_" + key] = value
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
# Copyright 2017, Google LLC All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import sys
16+
from datetime import datetime
17+
from typing import Optional
18+
19+
from opentelemetry import trace
20+
from opentelemetry.trace.propagation import set_span_in_context
21+
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
22+
23+
from google.pubsub_v1 import types as gapic_types
24+
from google.cloud.pubsub_v1.open_telemetry.context_propagation import (
25+
OpenTelemetryContextSetter,
26+
)
27+
28+
29+
class PublishMessageWrapper:
30+
_OPEN_TELEMETRY_TRACER_NAME: str = "google.cloud.pubsub_v1"
31+
_OPEN_TELEMETRY_MESSAGING_SYSTEM: str = "gcp_pubsub"
32+
_OPEN_TELEMETRY_PUBLISHER_BATCHING = "publisher batching"
33+
34+
_PUBLISH_START_EVENT: str = "publish start"
35+
_PUBLISH_FLOW_CONTROL: str = "publisher flow control"
36+
37+
def __init__(self, message: gapic_types.PubsubMessage):
38+
self._message: gapic_types.PubsubMessage = message
39+
self._create_span: Optional[trace.Span] = None
40+
self._flow_control_span: Optional[trace.Span] = None
41+
self._batching_span: Optional[trace.Span] = None
42+
43+
@property
44+
def message(self):
45+
return self._message
46+
47+
@message.setter # type: ignore[no-redef] # resetting message value is intentional here
48+
def message(self, message: gapic_types.PubsubMessage):
49+
self._message = message
50+
51+
@property
52+
def create_span(self):
53+
return self._create_span
54+
55+
def __eq__(self, other): # pragma: NO COVER
56+
"""Used for pytest asserts to compare two PublishMessageWrapper objects with the same message"""
57+
if isinstance(self, other.__class__):
58+
return self.message == other.message
59+
return False
60+
61+
def start_create_span(self, topic: str, ordering_key: str) -> None:
62+
tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME)
63+
assert len(topic.split("/")) == 4
64+
topic_short_name = topic.split("/")[3]
65+
with tracer.start_as_current_span(
66+
name=f"{topic_short_name} create",
67+
attributes={
68+
"messaging.system": self._OPEN_TELEMETRY_MESSAGING_SYSTEM,
69+
"messaging.destination.name": topic_short_name,
70+
"code.function": "publish",
71+
"messaging.gcp_pubsub.message.ordering_key": ordering_key,
72+
"messaging.operation": "create",
73+
"gcp.project_id": topic.split("/")[1],
74+
"messaging.message.body.size": sys.getsizeof(
75+
self._message.data
76+
), # sys.getsizeof() used since the attribute expects size of message body in bytes
77+
},
78+
kind=trace.SpanKind.PRODUCER,
79+
end_on_exit=False,
80+
) as create_span:
81+
create_span.add_event(
82+
name=self._PUBLISH_START_EVENT,
83+
attributes={
84+
"timestamp": str(datetime.now()),
85+
},
86+
)
87+
self._create_span = create_span
88+
TraceContextTextMapPropagator().inject(
89+
carrier=self._message,
90+
setter=OpenTelemetryContextSetter(),
91+
)
92+
93+
def end_create_span(self, exc: Optional[BaseException] = None) -> None:
94+
assert self._create_span is not None
95+
if exc:
96+
self._create_span.record_exception(exception=exc)
97+
self._create_span.set_status(
98+
trace.Status(status_code=trace.StatusCode.ERROR)
99+
)
100+
self._create_span.end()
101+
102+
def start_publisher_flow_control_span(self) -> None:
103+
tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME)
104+
assert self._create_span is not None
105+
with tracer.start_as_current_span(
106+
name=self._PUBLISH_FLOW_CONTROL,
107+
kind=trace.SpanKind.INTERNAL,
108+
context=set_span_in_context(self._create_span),
109+
end_on_exit=False,
110+
) as flow_control_span:
111+
self._flow_control_span = flow_control_span
112+
113+
def end_publisher_flow_control_span(
114+
self, exc: Optional[BaseException] = None
115+
) -> None:
116+
assert self._flow_control_span is not None
117+
if exc:
118+
self._flow_control_span.record_exception(exception=exc)
119+
self._flow_control_span.set_status(
120+
trace.Status(status_code=trace.StatusCode.ERROR)
121+
)
122+
self._flow_control_span.end()
123+
124+
def start_publisher_batching_span(self) -> None:
125+
assert self._create_span is not None
126+
tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME)
127+
with tracer.start_as_current_span(
128+
name=self._OPEN_TELEMETRY_PUBLISHER_BATCHING,
129+
kind=trace.SpanKind.INTERNAL,
130+
context=set_span_in_context(self._create_span),
131+
end_on_exit=False,
132+
) as batching_span:
133+
self._batching_span = batching_span
134+
135+
def end_publisher_batching_span(self, exc: Optional[BaseException] = None) -> None:
136+
assert self._batching_span is not None
137+
if exc:
138+
self._batching_span.record_exception(exception=exc)
139+
self._batching_span.set_status(
140+
trace.Status(status_code=trace.StatusCode.ERROR)
141+
)
142+
self._batching_span.end()

google/cloud/pubsub_v1/publisher/_batch/base.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@
1919
import typing
2020
from typing import Optional, Sequence
2121

22+
from google.cloud.pubsub_v1.open_telemetry.publish_message_wrapper import (
23+
PublishMessageWrapper,
24+
)
25+
2226

2327
if typing.TYPE_CHECKING: # pragma: NO COVER
2428
from google.cloud import pubsub_v1
@@ -54,7 +58,7 @@ class Batch(metaclass=abc.ABCMeta):
5458

5559
def __len__(self):
5660
"""Return the number of messages currently in the batch."""
57-
return len(self.messages)
61+
return len(self.message_wrappers)
5862

5963
@staticmethod
6064
@abc.abstractmethod
@@ -68,7 +72,7 @@ def make_lock(): # pragma: NO COVER
6872

6973
@property
7074
@abc.abstractmethod
71-
def messages(self) -> Sequence["gapic_types.PubsubMessage"]: # pragma: NO COVER
75+
def message_wrappers(self) -> Sequence[PublishMessageWrapper]: # pragma: NO COVER
7276
"""Return the messages currently in the batch.
7377
7478
Returns:

0 commit comments

Comments
 (0)