3
3
# This product includes software developed at Datadog (https://www.datadoghq.com/).
4
4
# Copyright 2019 Datadog, Inc.
5
5
6
+ import enum
7
+ import logging
6
8
import os
7
9
import time
8
- import logging
9
- import ujson as json
10
10
from datetime import datetime , timedelta
11
11
12
+ import ujson as json
13
+
12
14
from datadog_lambda .extension import should_use_extension
13
- from datadog_lambda .tags import get_enhanced_metrics_tags , dd_lambda_layer_tag
15
+ from datadog_lambda .fips import fips_mode_enabled
16
+ from datadog_lambda .tags import dd_lambda_layer_tag , get_enhanced_metrics_tags
14
17
15
18
logger = logging .getLogger (__name__ )
16
19
17
- lambda_stats = None
18
- extension_thread_stats = None
19
20
20
- flush_in_thread = os .environ .get ("DD_FLUSH_IN_THREAD" , "" ).lower () == "true"
21
+ class MetricsHandler (enum .Enum ):
22
+ EXTENSION = "extension"
23
+ FORWARDER = "forwarder"
24
+ DATADOG_API = "datadog_api"
25
+ NO_METRICS = "no_metrics"
26
+
21
27
22
- if should_use_extension :
28
+ def _select_metrics_handler ():
29
+ if should_use_extension :
30
+ return MetricsHandler .EXTENSION
31
+ if os .environ .get ("DD_FLUSH_TO_LOG" , "" ).lower () == "true" :
32
+ return MetricsHandler .FORWARDER
33
+
34
+ if fips_mode_enabled :
35
+ logger .debug (
36
+ "With FIPS mode enabled, the Datadog API metrics handler is unavailable."
37
+ )
38
+ return MetricsHandler .NO_METRICS
39
+
40
+ return MetricsHandler .DATADOG_API
41
+
42
+
43
+ metrics_handler = _select_metrics_handler ()
44
+ logger .debug ("identified primary metrics handler as %s" , metrics_handler )
45
+
46
+
47
+ lambda_stats = None
48
+ if metrics_handler == MetricsHandler .EXTENSION :
23
49
from datadog_lambda .statsd_writer import StatsDWriter
24
50
25
51
lambda_stats = StatsDWriter ()
26
- else :
52
+
53
+ elif metrics_handler == MetricsHandler .DATADOG_API :
27
54
# Periodical flushing in a background thread is NOT guaranteed to succeed
28
55
# and leads to data loss. When disabled, metrics are only flushed at the
29
56
# end of invocation. To make metrics submitted from a long-running Lambda
30
57
# function available sooner, consider using the Datadog Lambda extension.
31
- from datadog_lambda .thread_stats_writer import ThreadStatsWriter
32
58
from datadog_lambda .api import init_api
59
+ from datadog_lambda .thread_stats_writer import ThreadStatsWriter
33
60
61
+ flush_in_thread = os .environ .get ("DD_FLUSH_IN_THREAD" , "" ).lower () == "true"
34
62
init_api ()
35
63
lambda_stats = ThreadStatsWriter (flush_in_thread )
36
64
65
+
37
66
enhanced_metrics_enabled = (
38
67
os .environ .get ("DD_ENHANCED_METRICS" , "true" ).lower () == "true"
39
68
)
@@ -44,16 +73,19 @@ def lambda_metric(metric_name, value, timestamp=None, tags=None, force_async=Fal
44
73
Submit a data point to Datadog distribution metrics.
45
74
https://docs.datadoghq.com/graphing/metrics/distributions/
46
75
47
- When DD_FLUSH_TO_LOG is True, write metric to log, and
48
- wait for the Datadog Log Forwarder Lambda function to submit
49
- the metrics asynchronously.
76
+ If the Datadog Lambda Extension is present, metrics are submitted to its
77
+ dogstatsd endpoint.
78
+
79
+ When DD_FLUSH_TO_LOG is True or force_async is True, write metric to log,
80
+ and wait for the Datadog Log Forwarder Lambda function to submit the
81
+ metrics asynchronously.
50
82
51
83
Otherwise, the metrics will be submitted to the Datadog API
52
84
periodically and at the end of the function execution in a
53
85
background thread.
54
86
55
- Note that if the extension is present, it will override the DD_FLUSH_TO_LOG value
56
- and always use the layer to send metrics to the extension
87
+ Note that if the extension is present, it will override the DD_FLUSH_TO_LOG
88
+ value and always use the layer to send metrics to the extension
57
89
"""
58
90
if not metric_name or not isinstance (metric_name , str ):
59
91
logger .warning (
@@ -71,56 +103,54 @@ def lambda_metric(metric_name, value, timestamp=None, tags=None, force_async=Fal
71
103
)
72
104
return
73
105
74
- flush_to_logs = os .environ .get ("DD_FLUSH_TO_LOG" , "" ).lower () == "true"
75
106
tags = [] if tags is None else list (tags )
76
107
tags .append (dd_lambda_layer_tag )
77
108
78
- if should_use_extension and timestamp is not None :
79
- # The extension does not support timestamps for distributions so we create a
80
- # a thread stats writer to submit metrics with timestamps to the API
81
- timestamp_ceiling = int (
82
- (datetime .now () - timedelta (hours = 4 )).timestamp ()
83
- ) # 4 hours ago
84
- if isinstance (timestamp , datetime ):
85
- timestamp = int (timestamp .timestamp ())
86
- if timestamp_ceiling > timestamp :
87
- logger .warning (
88
- "Timestamp %s is older than 4 hours, not submitting metric %s" ,
89
- timestamp ,
90
- metric_name ,
91
- )
92
- return
93
- global extension_thread_stats
94
- if extension_thread_stats is None :
95
- from datadog_lambda .thread_stats_writer import ThreadStatsWriter
96
- from datadog_lambda .api import init_api
97
-
98
- init_api ()
99
- extension_thread_stats = ThreadStatsWriter (flush_in_thread )
100
-
101
- extension_thread_stats .distribution (
102
- metric_name , value , tags = tags , timestamp = timestamp
103
- )
104
- return
109
+ if metrics_handler == MetricsHandler .EXTENSION :
110
+ if timestamp is not None :
111
+ if isinstance (timestamp , datetime ):
112
+ timestamp = int (timestamp .timestamp ())
113
+
114
+ timestamp_floor = int ((datetime .now () - timedelta (hours = 4 )).timestamp ())
115
+ if timestamp < timestamp_floor :
116
+ logger .warning (
117
+ "Timestamp %s is older than 4 hours, not submitting metric %s" ,
118
+ timestamp ,
119
+ metric_name ,
120
+ )
121
+ return
105
122
106
- if should_use_extension :
107
123
logger .debug (
108
124
"Sending metric %s value %s to Datadog via extension" , metric_name , value
109
125
)
110
126
lambda_stats .distribution (metric_name , value , tags = tags , timestamp = timestamp )
127
+
128
+ elif force_async or (metrics_handler == MetricsHandler .FORWARDER ):
129
+ write_metric_point_to_stdout (metric_name , value , timestamp = timestamp , tags = tags )
130
+
131
+ elif metrics_handler == MetricsHandler .DATADOG_API :
132
+ lambda_stats .distribution (metric_name , value , tags = tags , timestamp = timestamp )
133
+
134
+ elif metrics_handler == MetricsHandler .NO_METRICS :
135
+ logger .debug (
136
+ "Metric %s cannot be submitted because the metrics handler is disabled" ,
137
+ metric_name ,
138
+ ),
139
+
111
140
else :
112
- if flush_to_logs or force_async :
113
- write_metric_point_to_stdout (
114
- metric_name , value , timestamp = timestamp , tags = tags
115
- )
116
- else :
117
- lambda_stats .distribution (
118
- metric_name , value , tags = tags , timestamp = timestamp
119
- )
141
+ # This should be qutie impossible, but let's at least log a message if
142
+ # it somehow happens.
143
+ logger .debug (
144
+ "Metric %s cannot be submitted because the metrics handler is not configured: %s" ,
145
+ metric_name ,
146
+ metrics_handler ,
147
+ )
120
148
121
149
122
- def write_metric_point_to_stdout (metric_name , value , timestamp = None , tags = [] ):
150
+ def write_metric_point_to_stdout (metric_name , value , timestamp = None , tags = None ):
123
151
"""Writes the specified metric point to standard output"""
152
+ tags = tags or []
153
+
124
154
logger .debug (
125
155
"Sending metric %s value %s to Datadog via log forwarder" , metric_name , value
126
156
)
@@ -138,19 +168,8 @@ def write_metric_point_to_stdout(metric_name, value, timestamp=None, tags=[]):
138
168
139
169
140
170
def flush_stats (lambda_context = None ):
141
- lambda_stats .flush ()
142
-
143
- if extension_thread_stats is not None :
144
- tags = None
145
- if lambda_context is not None :
146
- tags = get_enhanced_metrics_tags (lambda_context )
147
- split_arn = lambda_context .invoked_function_arn .split (":" )
148
- if len (split_arn ) > 7 :
149
- # Get rid of the alias
150
- split_arn .pop ()
151
- arn = ":" .join (split_arn )
152
- tags .append ("function_arn:" + arn )
153
- extension_thread_stats .flush (tags )
171
+ if lambda_stats is not None :
172
+ lambda_stats .flush ()
154
173
155
174
156
175
def submit_enhanced_metric (metric_name , lambda_context ):
0 commit comments