@@ -77,7 +77,6 @@ if Code.ensure_loaded?(Broadway) do
7777 alias Broadway . { BatchInfo , Options }
7878 alias PromEx.Utils
7979
80- @ millisecond_duration_buckets [ 10 , 100 , 500 , 1_000 , 10_000 , 30_000 , 60_000 ]
8180 @ message_batch_size_buckets [ 1 , 5 , 10 , 20 , 50 , 100 ]
8281
8382 @ init_topology_event [ :broadway , :topology , :init ]
@@ -92,6 +91,7 @@ if Code.ensure_loaded?(Broadway) do
9291 def event_metrics ( opts ) do
9392 otp_app = Keyword . fetch! ( opts , :otp_app )
9493 metric_prefix = Keyword . get ( opts , :metric_prefix , PromEx . metric_prefix ( otp_app , :broadway ) )
94+ duration_unit = Keyword . get ( opts , :duration_unit , :millisecond )
9595
9696 # Telemetry metrics will emit warnings if multiple handlers with the same names are defined.
9797 # As a result, this plugin supports gathering metrics on multiple processors and batches, but needs
@@ -102,13 +102,15 @@ if Code.ensure_loaded?(Broadway) do
102102
103103 # Event metrics definitions
104104 [
105- topology_init_events ( metric_prefix ) ,
106- handle_message_events ( metric_prefix ) ,
107- handle_batch_events ( metric_prefix )
105+ topology_init_events ( metric_prefix , duration_unit ) ,
106+ handle_message_events ( metric_prefix , duration_unit ) ,
107+ handle_batch_events ( metric_prefix , duration_unit )
108108 ]
109109 end
110110
111- defp topology_init_events ( metric_prefix ) do
111+ defp topology_init_events ( metric_prefix , duration_unit ) do
112+ duration_unit_plural = Utils . make_plural_atom ( duration_unit )
113+
112114 Event . build (
113115 :broadway_init_event_metrics ,
114116 [
@@ -121,29 +123,29 @@ if Code.ensure_loaded?(Broadway) do
121123 tag_values: & extract_init_tag_values / 1
122124 ) ,
123125 last_value (
124- metric_prefix ++ [ :init , :hibernate_after , :default , :milliseconds ] ,
126+ metric_prefix ++ [ :init , :hibernate_after , :default , duration_unit_plural ] ,
125127 event_name: @ init_topology_event ,
126128 description: "The Broadway supervisor's hibernate after default value." ,
127129 measurement: extract_default_config_measurement ( :hibernate_after ) ,
128130 tags: [ :name ] ,
129131 tag_values: & extract_init_tag_values / 1
130132 ) ,
131133 last_value (
132- metric_prefix ++ [ :init , :resubscribe_interval , :default , :milliseconds ] ,
134+ metric_prefix ++ [ :init , :resubscribe_interval , :default , duration_unit_plural ] ,
133135 event_name: @ init_topology_event ,
134136 description: "The Broadway supervisor's resubscribe interval default value." ,
135137 measurement: extract_default_config_measurement ( :resubscribe_interval ) ,
136138 tags: [ :name ] ,
137139 tag_values: & extract_init_tag_values / 1
138140 ) ,
139141 last_value (
140- metric_prefix ++ [ :init , :max , :duration , :default , :milliseconds ] ,
142+ metric_prefix ++ [ :init , :max , :duration , :default , duration_unit_plural ] ,
141143 event_name: @ init_topology_event ,
142- description: "The Broadway supervisor's max seconds default value (in milliseconds )." ,
144+ description: "The Broadway supervisor's max seconds default value (in #{ duration_unit_plural } )." ,
143145 measurement: extract_default_config_measurement ( :max_seconds ) ,
144146 tags: [ :name ] ,
145147 tag_values: & extract_init_tag_values / 1 ,
146- unit: { :second , :millisecond }
148+ unit: { :second , duration_unit }
147149 ) ,
148150 last_value (
149151 metric_prefix ++ [ :init , :max_restarts , :default , :value ] ,
@@ -154,15 +156,15 @@ if Code.ensure_loaded?(Broadway) do
154156 tag_values: & extract_init_tag_values / 1
155157 ) ,
156158 last_value (
157- metric_prefix ++ [ :init , :shutdown , :default , :milliseconds ] ,
159+ metric_prefix ++ [ :init , :shutdown , :default , duration_unit_plural ] ,
158160 event_name: @ init_topology_event ,
159161 description: "The Broadway supervisor's shutdown default value." ,
160162 measurement: extract_default_config_measurement ( :shutdown ) ,
161163 tags: [ :name ] ,
162164 tag_values: & extract_init_tag_values / 1
163165 ) ,
164166 last_value (
165- metric_prefix ++ [ :init , :processor , :hibernate_after , :milliseconds ] ,
167+ metric_prefix ++ [ :init , :processor , :hibernate_after , duration_unit_plural ] ,
166168 event_name: @ init_topology_processors_proxy_event ,
167169 description: "The Broadway processors hibernate after value." ,
168170 measurement: fn _measurements , % { hibernate_after: hibernate_after } -> hibernate_after end ,
@@ -183,7 +185,7 @@ if Code.ensure_loaded?(Broadway) do
183185 tags: [ :name , :processor ]
184186 ) ,
185187 last_value (
186- metric_prefix ++ [ :init , :batcher , :hibernate_after , :milliseconds ] ,
188+ metric_prefix ++ [ :init , :batcher , :hibernate_after , duration_unit_plural ] ,
187189 event_name: @ init_topology_batchers_proxy_event ,
188190 description: "The Broadway batchers hibernate after value." ,
189191 measurement: fn _measurements , % { hibernate_after: hibernate_after } -> hibernate_after end ,
@@ -204,7 +206,7 @@ if Code.ensure_loaded?(Broadway) do
204206 tags: [ :name , :batcher ]
205207 ) ,
206208 last_value (
207- metric_prefix ++ [ :init , :batcher , :batch_timeout , :milliseconds ] ,
209+ metric_prefix ++ [ :init , :batcher , :batch_timeout , duration_unit_plural ] ,
208210 event_name: @ init_topology_batchers_proxy_event ,
209211 description: "The Broadway batchers timeout value." ,
210212 measurement: fn _measurements , % { batch_timeout: batch_timeout } -> batch_timeout end ,
@@ -267,53 +269,57 @@ if Code.ensure_loaded?(Broadway) do
267269 end
268270 end
269271
270- defp handle_message_events ( metric_prefix ) do
272+ defp handle_message_events ( metric_prefix , duration_unit ) do
273+ duration_unit_plural = Utils . make_plural_atom ( duration_unit )
274+
271275 Event . build (
272276 :broadway_message_event_metrics ,
273277 [
274278 distribution (
275- metric_prefix ++ [ :process , :message , :duration , :milliseconds ] ,
279+ metric_prefix ++ [ :process , :message , :duration , duration_unit_plural ] ,
276280 event_name: @ message_stop_event ,
277281 measurement: :duration ,
278282 description: "The time it takes Broadway to process a message." ,
279283 reporter_options: [
280- buckets: @ millisecond_duration_buckets
284+ buckets: buckets_for_unit ( duration_unit )
281285 ] ,
282286 tags: [ :processor_key , :name ] ,
283287 tag_values: & extract_message_tag_values / 1 ,
284- unit: { :native , :millisecond }
288+ unit: { :native , duration_unit }
285289 ) ,
286290 distribution (
287- metric_prefix ++ [ :process , :message , :exception , :duration , :milliseconds ] ,
291+ metric_prefix ++ [ :process , :message , :exception , :duration , duration_unit_plural ] ,
288292 event_name: @ message_exception_event ,
289293 measurement: :duration ,
290294 description: "The time it takes Broadway to process a message that results in an error." ,
291295 reporter_options: [
292- buckets: @ millisecond_duration_buckets
296+ buckets: buckets_for_unit ( duration_unit )
293297 ] ,
294298 tags: [ :processor_key , :name , :kind , :reason ] ,
295299 tag_values: & extract_exception_tag_values / 1 ,
296- unit: { :native , :millisecond }
300+ unit: { :native , duration_unit }
297301 )
298302 ]
299303 )
300304 end
301305
302- defp handle_batch_events ( metric_prefix ) do
306+ defp handle_batch_events ( metric_prefix , duration_unit ) do
307+ duration_unit_plural = Utils . make_plural_atom ( duration_unit )
308+
303309 Event . build (
304310 :broadway_batch_event_metrics ,
305311 [
306312 distribution (
307- metric_prefix ++ [ :process , :batch , :duration , :milliseconds ] ,
313+ metric_prefix ++ [ :process , :batch , :duration , duration_unit_plural ] ,
308314 event_name: @ batch_stop_event ,
309315 measurement: :duration ,
310316 description: "The time it takes Broadway to process a batch of messages." ,
311317 reporter_options: [
312- buckets: @ millisecond_duration_buckets
318+ buckets: buckets_for_unit ( duration_unit )
313319 ] ,
314320 tags: [ :batcher , :name ] ,
315321 tag_values: & extract_batcher_tag_values / 1 ,
316- unit: { :native , :millisecond }
322+ unit: { :native , duration_unit }
317323 ) ,
318324 distribution (
319325 metric_prefix ++ [ :process , :batch , :failure , :size ] ,
@@ -387,6 +393,15 @@ if Code.ensure_loaded?(Broadway) do
387393 name: Utils . normalize_module_name ( name )
388394 }
389395 end
396+
397+ defp buckets_for_unit ( unit ) do
398+ case unit do
399+ :nanosecond -> [ 1 , 100 , 1_000 , 2_000 , 10_000 , 50_000 , 1_000_000 ]
400+ :microsecond -> [ 10_000 , 100_000 , 500_000 , 1_000_000 , 10_000_000 , 30_000_000 , 60_000_000 ]
401+ :millisecond -> [ 10 , 100 , 500 , 1_000 , 10_000 , 30_000 , 60_000 ]
402+ :second -> [ 1 , 5 , 10 , 100 , 500 , 1_000 , 10_000 ]
403+ end
404+ end
390405 end
391406else
392407 defmodule PromEx.Plugins.Broadway do
0 commit comments