Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 23 additions & 9 deletions lib/prom_ex/plugins/broadway.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ if Code.ensure_loaded?(Broadway) do
end
```

## GenStage Producer

To correctly capture per-message metrics and error rate, add the following transform to your pipeline:
```
defmodule WebApp.MyPipeline do
Expand All @@ -54,15 +56,25 @@ if Code.ensure_loaded?(Broadway) do
acknowledger: {__MODULE__, :ack_id, :ack_data}
}
end

def ack(:ack_id, _successful_messages, _failed_messages) do
:ok
end
end
```

## BroadwayRabbitMQ.Producer

There's no need to configure an acknowledger on messages when using BroadwayRabbitMQ.

`BroadwayRabbitMQ.Producer` handles acking messages internally, which involves a unique `:delivery_tag` and `AMQP`.
"""

use PromEx.Plugin

require Logger

alias Broadway.{BatchInfo, Message, Options}
alias Broadway.{BatchInfo, Options}
alias PromEx.Utils

@millisecond_duration_buckets [10, 100, 500, 1_000, 10_000, 30_000, 60_000]
Expand Down Expand Up @@ -268,12 +280,7 @@ if Code.ensure_loaded?(Broadway) do
buckets: @millisecond_duration_buckets
],
tags: [:processor_key, :name],
tag_values: fn %{processor_key: processor_key, message: %Message{acknowledger: {acknowledger, _, _}}} ->
%{
processor_key: processor_key,
name: Utils.normalize_module_name(acknowledger)
}
end,
tag_values: &extract_message_tag_values/1,
unit: {:native, :millisecond}
),
distribution(
Expand Down Expand Up @@ -338,6 +345,13 @@ if Code.ensure_loaded?(Broadway) do
)
end

defp extract_message_tag_values(%{processor_key: processor_key, topology_name: name}) do
%{
processor_key: processor_key,
name: Utils.normalize_module_name(name)
}
end

defp extract_batcher_tag_values(%{batch_info: batch_info = %BatchInfo{}, topology_name: name}) do
%{
name: Utils.normalize_module_name(name),
Expand All @@ -362,15 +376,15 @@ if Code.ensure_loaded?(Broadway) do
kind: kind,
reason: reason,
stacktrace: stacktrace,
message: %Message{acknowledger: {acknowledger, _, _}}
topology_name: name
}) do
reason = Utils.normalize_exception(kind, reason, stacktrace)

%{
processor_key: processor_key,
kind: kind,
reason: reason,
name: Utils.normalize_module_name(acknowledger)
name: Utils.normalize_module_name(name)
}
end
end
Expand Down
32 changes: 32 additions & 0 deletions test/prom_ex/plugins/broadway_test.exs
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
defmodule PromEx.Plugins.BroadwayTest do
use ExUnit.Case, async: true

alias PromEx.MetricTypes.Event
alias PromEx.Plugins.Broadway
alias PromEx.Test.Support.{Events, Metrics}
alias Telemetry.Metrics.Distribution

@default_metadata %{
processor_key: :default,
topology_name: Elixir.SomeModule,
message: %Elixir.Broadway.Message{acknowledger: {Elixir.SomeAcker}, data: %{}}
}

defmodule WebApp.PromEx do
use PromEx, otp_app: :web_app
Expand All @@ -24,4 +32,28 @@ defmodule PromEx.Plugins.BroadwayTest do

assert metrics == Metrics.read_expected(:broadway)
end

describe "event_metrics/1" do
test "returns topology_name for message tags" do
metric = assert_event_metric(:broadway_message_event_metrics, [:broadway, :processor, :message, :stop])
assert %{name: "SomeModule", processor_key: :default} = metric.tag_values.(@default_metadata)
end

test "returns topology_name for message exception tags" do
metric = assert_event_metric(:broadway_message_event_metrics, [:broadway, :processor, :message, :exception])
exception_metadata = Map.merge(@default_metadata, %{kind: Error, reason: "notsure", stacktrace: []})
assert %{name: "SomeModule", processor_key: :default} = metric.tag_values.(exception_metadata)
end

defp assert_event_metric(metric_group, event) do
assert event_metrics = Broadway.event_metrics(otp_app: :web_app)

assert %Event{metrics: message_metrics} =
Enum.find(event_metrics, fn metrics -> metrics.group_name == metric_group end)

assert %Distribution{} = metric = Enum.find(message_metrics, fn dist -> dist.event_name == event end)

metric
end
end
end