Skip to content

Commit 605e4e6

Browse files
committed
reduce prometheus metrics overhead in write many path
1 parent 6c8d7b0 commit 605e4e6

File tree

2 files changed

+44
-9
lines changed

2 files changed

+44
-9
lines changed

client.go

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2668,6 +2668,18 @@ func (c *Client) startWriter(batchDelay time.Duration, maxMessagesInFrame int, q
26682668
},
26692669
WriteManyFn: func(items ...queue.Item) error {
26702670
messages := make([][]byte, 0, len(items))
2671+
2672+
// Batch metric updates - accumulate counts locally first
2673+
type metricKey struct {
2674+
frameType string
2675+
namespace string
2676+
}
2677+
metricCounts := make(map[metricKey]struct {
2678+
count int
2679+
size int
2680+
})
2681+
transportName := c.transport.Name()
2682+
26712683
for i := 0; i < len(items); i++ {
26722684
if c.node.clientEvents.transportWriteHandler != nil {
26732685
pass := c.node.clientEvents.transportWriteHandler(c, TransportWriteEvent(items[i]))
@@ -2676,8 +2688,26 @@ func (c *Client) startWriter(batchDelay time.Duration, maxMessagesInFrame int, q
26762688
}
26772689
}
26782690
messages = append(messages, items[i].Data)
2679-
c.node.metrics.incTransportMessagesSent(c.transport.Name(), items[i].FrameType, items[i].Channel, len(items[i].Data))
2691+
2692+
// Accumulate metrics locally
2693+
key := metricKey{
2694+
frameType: items[i].FrameType.String(),
2695+
namespace: c.node.metrics.getChannelNamespaceLabel(items[i].Channel),
2696+
}
2697+
stats := metricCounts[key]
2698+
stats.count++
2699+
stats.size += len(items[i].Data)
2700+
metricCounts[key] = stats
2701+
}
2702+
2703+
// Update metrics once per unique label combination
2704+
for key, stats := range metricCounts {
2705+
counters := c.node.metrics.getTransportMessagesSentCounters(transportName, key.frameType, key.namespace)
2706+
// Batch update - add count and size together
2707+
counters.counterSent.Add(float64(stats.count))
2708+
counters.counterSentSize.Add(float64(stats.size))
26802709
}
2710+
26812711
writeMu.Lock()
26822712
defer writeMu.Unlock()
26832713
if err := c.transport.WriteMany(messages...); err != nil {

metrics.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -651,25 +651,30 @@ type transportMessagesReceived struct {
651651
counterReceivedSize prometheus.Counter
652652
}
653653

654-
func (m *metrics) incTransportMessagesSent(transport string, frameType protocol.FrameType, channel string, size int) {
655-
channelNamespace := m.getChannelNamespaceLabel(channel)
654+
func (m *metrics) getTransportMessagesSentCounters(transport string, frameType string, namespace string) transportMessagesSent {
656655
labels := transportMessageLabels{
657656
Transport: transport,
658-
ChannelNamespace: channelNamespace,
659-
FrameType: frameType.String(),
657+
ChannelNamespace: namespace,
658+
FrameType: frameType,
660659
}
661660
counters, ok := m.transportMessagesSentCache.Load(labels)
662661
if !ok {
663-
counterSent := m.transportMessagesSent.WithLabelValues(transport, labels.FrameType, channelNamespace)
664-
counterSentSize := m.transportMessagesSentSize.WithLabelValues(transport, labels.FrameType, channelNamespace)
662+
counterSent := m.transportMessagesSent.WithLabelValues(transport, frameType, namespace)
663+
counterSentSize := m.transportMessagesSentSize.WithLabelValues(transport, frameType, namespace)
665664
counters = transportMessagesSent{
666665
counterSent: counterSent,
667666
counterSentSize: counterSentSize,
668667
}
669668
m.transportMessagesSentCache.Store(labels, counters)
670669
}
671-
counters.(transportMessagesSent).counterSent.Inc()
672-
counters.(transportMessagesSent).counterSentSize.Add(float64(size))
670+
return counters.(transportMessagesSent)
671+
}
672+
673+
func (m *metrics) incTransportMessagesSent(transport string, frameType protocol.FrameType, channel string, size int) {
674+
channelNamespace := m.getChannelNamespaceLabel(channel)
675+
counters := m.getTransportMessagesSentCounters(transport, frameType.String(), channelNamespace)
676+
counters.counterSent.Inc()
677+
counters.counterSentSize.Add(float64(size))
673678
}
674679

675680
func (m *metrics) incTransportMessagesReceived(transport string, frameType protocol.FrameType, channel string, size int) {

0 commit comments

Comments
 (0)