Skip to content

Commit dcac933

Browse files
authored
Added k8s resource attribute logic. Old tests pass. Need new tests (#41130)
* Added k8s resource attribute logic. Old tests pass. Need new tests * new tests pass * lint * lint * lint * Add unknown_service use case and tests for invalid values * lint * comments * typo
1 parent a3c2812 commit dcac933

File tree

3 files changed

+226
-13
lines changed

3 files changed

+226
-13
lines changed

sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66

77
### Features Added
88

9+
- Implement spec for k8s attribute logic.
10+
([#41130](https://github.com/Azure/azure-sdk-for-python/pull/41130))
11+
912
### Breaking Changes
1013

1114
### Bugs Fixed

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_utils.py

Lines changed: 47 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import warnings
1212
from typing import Callable, Dict, Any
1313

14-
from opentelemetry.semconv.attributes.service_attributes import SERVICE_NAME
1514
from opentelemetry.semconv.resource import ResourceAttributes
1615
from opentelemetry.sdk.resources import Resource
1716
from opentelemetry.sdk.util import ns_to_iso_str
@@ -230,22 +229,12 @@ def _create_telemetry_item(timestamp: int) -> TelemetryItem:
230229
def _populate_part_a_fields(resource: Resource):
231230
tags = {}
232231
if resource and resource.attributes:
233-
service_name = resource.attributes.get(SERVICE_NAME)
234-
service_namespace = resource.attributes.get(ResourceAttributes.SERVICE_NAMESPACE)
235-
service_instance_id = resource.attributes.get(ResourceAttributes.SERVICE_INSTANCE_ID)
236232
device_id = resource.attributes.get(ResourceAttributes.DEVICE_ID)
237233
device_model = resource.attributes.get(ResourceAttributes.DEVICE_MODEL_NAME)
238234
device_make = resource.attributes.get(ResourceAttributes.DEVICE_MANUFACTURER)
239235
app_version = resource.attributes.get(ResourceAttributes.SERVICE_VERSION)
240-
if service_name:
241-
if service_namespace:
242-
tags[ContextTagKeys.AI_CLOUD_ROLE] = str(service_namespace) + "." + str(service_name)
243-
else:
244-
tags[ContextTagKeys.AI_CLOUD_ROLE] = service_name # type: ignore
245-
if service_instance_id:
246-
tags[ContextTagKeys.AI_CLOUD_ROLE_INSTANCE] = service_instance_id # type: ignore
247-
else:
248-
tags[ContextTagKeys.AI_CLOUD_ROLE_INSTANCE] = platform.node() # hostname default
236+
tags[ContextTagKeys.AI_CLOUD_ROLE] = _get_cloud_role(resource)
237+
tags[ContextTagKeys.AI_CLOUD_ROLE_INSTANCE] = _get_cloud_role_instance(resource)
249238
tags[ContextTagKeys.AI_INTERNAL_NODE_NAME] = tags[ContextTagKeys.AI_CLOUD_ROLE_INSTANCE]
250239
if device_id:
251240
tags[ContextTagKeys.AI_DEVICE_ID] = device_id # type: ignore
@@ -259,6 +248,51 @@ def _populate_part_a_fields(resource: Resource):
259248
return tags
260249

261250

251+
# pylint:disable=too-many-return-statements
252+
def _get_cloud_role(resource: Resource) -> str:
253+
cloud_role = ""
254+
service_name = resource.attributes.get(ResourceAttributes.SERVICE_NAME)
255+
if service_name:
256+
service_namespace = resource.attributes.get(ResourceAttributes.SERVICE_NAMESPACE)
257+
if service_namespace:
258+
cloud_role = str(service_namespace) + "." + str(service_name)
259+
else:
260+
cloud_role = str(service_name)
261+
# If service_name starts with "unknown_service", only use it if kubernetes attributes are not present.
262+
if not str(service_name).startswith("unknown_service"):
263+
return cloud_role
264+
k8s_dep_name = resource.attributes.get(ResourceAttributes.K8S_DEPLOYMENT_NAME)
265+
if k8s_dep_name:
266+
return k8s_dep_name # type: ignore
267+
k8s_rep_set_name = resource.attributes.get(ResourceAttributes.K8S_REPLICASET_NAME)
268+
if k8s_rep_set_name:
269+
return k8s_rep_set_name # type: ignore
270+
k8s_stateful_set_name = resource.attributes.get(ResourceAttributes.K8S_STATEFULSET_NAME)
271+
if k8s_stateful_set_name:
272+
return k8s_stateful_set_name # type: ignore
273+
k8s_job_name = resource.attributes.get(ResourceAttributes.K8S_JOB_NAME)
274+
if k8s_job_name:
275+
return k8s_job_name # type: ignore
276+
k8s_cronjob_name = resource.attributes.get(ResourceAttributes.K8S_CRONJOB_NAME)
277+
if k8s_cronjob_name:
278+
return k8s_cronjob_name # type: ignore
279+
k8s_daemonset_name = resource.attributes.get(ResourceAttributes.K8S_DAEMONSET_NAME)
280+
if k8s_daemonset_name:
281+
return k8s_daemonset_name # type: ignore
282+
# If service_name starts with "unknown_service", only use it if kubernetes attributes are not present.
283+
return cloud_role
284+
285+
286+
def _get_cloud_role_instance(resource: Resource) -> str:
287+
service_instance_id = resource.attributes.get(ResourceAttributes.SERVICE_INSTANCE_ID)
288+
if service_instance_id:
289+
return service_instance_id # type: ignore
290+
k8s_pod_name = resource.attributes.get(ResourceAttributes.K8S_POD_NAME)
291+
if k8s_pod_name:
292+
return k8s_pod_name # type: ignore
293+
return platform.node() # hostname default
294+
295+
262296
def _is_synthetic_source(properties: Attributes) -> bool:
263297
# TODO: Use semconv symbol when released in upstream
264298
synthetic_type = properties.get("user_agent.synthetic.type") # type: ignore

sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_utils.py

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,29 @@ def test_populate_part_a_fields(self):
8282
self.assertEqual(tags.get("ai.device.oemName"), "testDeviceMake")
8383
self.assertEqual(tags.get("ai.application.ver"), "testApplicationVer")
8484

85+
# Default service.name fields should be used when kubernetes values are not present
86+
def test_populate_part_a_fields_unknown_service(self):
87+
resource = Resource(
88+
{
89+
"service.name": "unknown_servicefoobar",
90+
"service.namespace": "testServiceNamespace",
91+
"service.instance.id": "testServiceInstanceId",
92+
"device.id": "testDeviceId",
93+
"device.model.name": "testDeviceModel",
94+
"device.manufacturer": "testDeviceMake",
95+
"service.version": "testApplicationVer",
96+
}
97+
)
98+
tags = _utils._populate_part_a_fields(resource)
99+
self.assertIsNotNone(tags)
100+
self.assertEqual(tags.get("ai.cloud.role"), "testServiceNamespace.unknown_servicefoobar")
101+
self.assertEqual(tags.get("ai.cloud.roleInstance"), "testServiceInstanceId")
102+
self.assertEqual(tags.get("ai.internal.nodeName"), "testServiceInstanceId")
103+
self.assertEqual(tags.get("ai.device.id"), "testDeviceId")
104+
self.assertEqual(tags.get("ai.device.model"), "testDeviceModel")
105+
self.assertEqual(tags.get("ai.device.oemName"), "testDeviceMake")
106+
self.assertEqual(tags.get("ai.application.ver"), "testApplicationVer")
107+
85108
def test_populate_part_a_fields_default(self):
86109
resource = Resource({"service.name": "testServiceName"})
87110
tags = _utils._populate_part_a_fields(resource)
@@ -90,6 +113,159 @@ def test_populate_part_a_fields_default(self):
90113
self.assertEqual(tags.get("ai.cloud.roleInstance"), platform.node())
91114
self.assertEqual(tags.get("ai.internal.nodeName"), tags.get("ai.cloud.roleInstance"))
92115

116+
def test_populate_part_a_fields_aks(self):
117+
resource = Resource(
118+
{
119+
"k8s.deployment.name": "testDeploymentName",
120+
"k8s.replicaset.name": "testReplicaSetName",
121+
"k8s.statefulset.name": "testStatefulSetName",
122+
"k8s.job.name": "testJobName",
123+
"k8s.cronJob.name": "testCronJobName",
124+
"k8s.daemonset.name": "testDaemonSetName",
125+
"k8s.pod.name": "testPodName",
126+
}
127+
)
128+
tags = _utils._populate_part_a_fields(resource)
129+
self.assertIsNotNone(tags)
130+
self.assertEqual(tags.get("ai.cloud.role"), "testDeploymentName")
131+
self.assertEqual(tags.get("ai.cloud.roleInstance"), "testPodName")
132+
self.assertEqual(tags.get("ai.internal.nodeName"), tags.get("ai.cloud.roleInstance"))
133+
134+
def test_populate_part_a_fields_aks_replica(self):
135+
resource = Resource(
136+
{
137+
"k8s.replicaset.name": "testReplicaSetName",
138+
"k8s.statefulset.name": "testStatefulSetName",
139+
"k8s.job.name": "testJobName",
140+
"k8s.cronjob.name": "testCronJobName",
141+
"k8s.daemonset.name": "testDaemonSetName",
142+
"k8s.pod.name": "testPodName",
143+
}
144+
)
145+
tags = _utils._populate_part_a_fields(resource)
146+
self.assertIsNotNone(tags)
147+
self.assertEqual(tags.get("ai.cloud.role"), "testReplicaSetName")
148+
self.assertEqual(tags.get("ai.cloud.roleInstance"), "testPodName")
149+
self.assertEqual(tags.get("ai.internal.nodeName"), tags.get("ai.cloud.roleInstance"))
150+
151+
def test_populate_part_a_fields_aks_stateful(self):
152+
resource = Resource(
153+
{
154+
"k8s.statefulset.name": "testStatefulSetName",
155+
"k8s.job.name": "testJobName",
156+
"k8s.cronjob.name": "testCronJobName",
157+
"k8s.daemonset.name": "testDaemonSetName",
158+
"k8s.pod.name": "testPodName",
159+
}
160+
)
161+
tags = _utils._populate_part_a_fields(resource)
162+
self.assertIsNotNone(tags)
163+
self.assertEqual(tags.get("ai.cloud.role"), "testStatefulSetName")
164+
self.assertEqual(tags.get("ai.cloud.roleInstance"), "testPodName")
165+
self.assertEqual(tags.get("ai.internal.nodeName"), tags.get("ai.cloud.roleInstance"))
166+
167+
def test_populate_part_a_fields_aks_job(self):
168+
resource = Resource(
169+
{
170+
"k8s.job.name": "testJobName",
171+
"k8s.cronjob.name": "testCronJobName",
172+
"k8s.daemonset.name": "testDaemonSetName",
173+
"k8s.pod.name": "testPodName",
174+
}
175+
)
176+
tags = _utils._populate_part_a_fields(resource)
177+
self.assertIsNotNone(tags)
178+
self.assertEqual(tags.get("ai.cloud.role"), "testJobName")
179+
self.assertEqual(tags.get("ai.cloud.roleInstance"), "testPodName")
180+
self.assertEqual(tags.get("ai.internal.nodeName"), tags.get("ai.cloud.roleInstance"))
181+
182+
def test_populate_part_a_fields_aks_cronjob(self):
183+
resource = Resource(
184+
{
185+
"k8s.cronjob.name": "testCronJobName",
186+
"k8s.daemonset.name": "testDaemonSetName",
187+
"k8s.pod.name": "testPodName",
188+
}
189+
)
190+
tags = _utils._populate_part_a_fields(resource)
191+
self.assertIsNotNone(tags)
192+
self.assertEqual(tags.get("ai.cloud.role"), "testCronJobName")
193+
self.assertEqual(tags.get("ai.cloud.roleInstance"), "testPodName")
194+
self.assertEqual(tags.get("ai.internal.nodeName"), tags.get("ai.cloud.roleInstance"))
195+
196+
def test_populate_part_a_fields_aks_daemon(self):
197+
resource = Resource(
198+
{
199+
"k8s.daemonset.name": "testDaemonSetName",
200+
"k8s.pod.name": "testPodName",
201+
}
202+
)
203+
tags = _utils._populate_part_a_fields(resource)
204+
self.assertIsNotNone(tags)
205+
self.assertEqual(tags.get("ai.cloud.role"), "testDaemonSetName")
206+
self.assertEqual(tags.get("ai.cloud.roleInstance"), "testPodName")
207+
self.assertEqual(tags.get("ai.internal.nodeName"), tags.get("ai.cloud.roleInstance"))
208+
209+
# Test that undefined fields are ignored.
210+
def test_populate_part_a_fields_aks_undefined(self):
211+
resource = Resource(
212+
{
213+
"k8s.deployment.name": "",
214+
"k8s.replicaset.name": None,
215+
"k8s.statefulset.name": "",
216+
"k8s.job.name": None,
217+
"k8s.cronJob.name": "",
218+
"k8s.daemonset.name": "testDaemonSetName",
219+
"k8s.pod.name": "testPodName",
220+
}
221+
)
222+
tags = _utils._populate_part_a_fields(resource)
223+
self.assertIsNotNone(tags)
224+
self.assertEqual(tags.get("ai.cloud.role"), "testDaemonSetName")
225+
self.assertEqual(tags.get("ai.cloud.roleInstance"), "testPodName")
226+
self.assertEqual(tags.get("ai.internal.nodeName"), tags.get("ai.cloud.roleInstance"))
227+
228+
229+
def test_populate_part_a_fields_aks_with_service(self):
230+
resource = Resource(
231+
{
232+
"service.name": "testServiceName",
233+
"service.instance.id": "testServiceInstanceId",
234+
"k8s.deployment.name": "testDeploymentName",
235+
"k8s.replicaset.name": "testReplicaSetName",
236+
"k8s.statefulset.name": "testStatefulSetName",
237+
"k8s.job.name": "testJobName",
238+
"k8s.cronjob.name": "testCronJobName",
239+
"k8s.daemonset.name": "testDaemonSetName",
240+
"k8s.pod.name": "testPodName",
241+
}
242+
)
243+
tags = _utils._populate_part_a_fields(resource)
244+
self.assertIsNotNone(tags)
245+
self.assertEqual(tags.get("ai.cloud.role"), "testServiceName")
246+
self.assertEqual(tags.get("ai.cloud.roleInstance"), "testServiceInstanceId")
247+
self.assertEqual(tags.get("ai.internal.nodeName"), tags.get("ai.cloud.roleInstance"))
248+
249+
# Default service.name fields should be ignored when kubernetes values are present
250+
def test_populate_part_a_fields_aks_with_unknown_service(self):
251+
resource = Resource(
252+
{
253+
"service.name": "unknown_servicefoobar",
254+
"k8s.deployment.name": "testDeploymentName",
255+
"k8s.replicaset.name": "testReplicaSetName",
256+
"k8s.statefulset.name": "testStatefulSetName",
257+
"k8s.job.name": "testJobName",
258+
"k8s.cronjob.name": "testCronJobName",
259+
"k8s.daemonset.name": "testDaemonSetName",
260+
"k8s.pod.name": "testPodName",
261+
}
262+
)
263+
tags = _utils._populate_part_a_fields(resource)
264+
self.assertIsNotNone(tags)
265+
self.assertEqual(tags.get("ai.cloud.role"), "testDeploymentName")
266+
self.assertEqual(tags.get("ai.cloud.roleInstance"), "testPodName")
267+
self.assertEqual(tags.get("ai.internal.nodeName"), tags.get("ai.cloud.roleInstance"))
268+
93269
@patch("azure.monitor.opentelemetry.exporter._utils.ns_to_iso_str", return_value=TEST_TIME)
94270
@patch("azure.monitor.opentelemetry.exporter._utils.azure_monitor_context", TEST_AZURE_MONITOR_CONTEXT)
95271
def test_create_telemetry_item(self, mock_ns_to_iso_str):

0 commit comments

Comments
 (0)