Skip to content

Commit e388a3b

Browse files
committed
feat(resourcemanager): add keyspace name label to metrics
Signed-off-by: JmPotato <github@ipotato.me>
1 parent fa84398 commit e388a3b

File tree

4 files changed

+265
-121
lines changed

4 files changed

+265
-121
lines changed

pkg/mcs/resourcemanager/server/manager.go

Lines changed: 141 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ package server
1717
import (
1818
"context"
1919
"encoding/json"
20-
"math"
20+
"fmt"
2121
"strings"
2222
"time"
2323

@@ -54,20 +54,44 @@ type Manager struct {
5454
srv bs.Server
5555
controllerConfig *ControllerConfig
5656
krgms map[uint32]*keyspaceResourceGroupManager
57-
storage endpoint.ResourceGroupStorage
57+
storage interface {
58+
// Used to store the resource group settings and states.
59+
endpoint.ResourceGroupStorage
60+
// Used to get the keyspace meta info.
61+
endpoint.KeyspaceStorage
62+
}
5863
// consumptionChan is used to send the consumption
5964
// info to the background metrics flusher.
6065
consumptionDispatcher chan *consumptionItem
6166
// record update time of each resource group
6267
consumptionRecord map[consumptionRecordKey]time.Time
68+
// max per sec trackers for each keyspace and resource group.
69+
maxPerSecTrackers map[trackerKey]*maxPerSecCostTracker
70+
// cached counter metrics for each keyspace, resource group and RU type.
71+
counterMetrics map[metricsKey]*counterMetrics
72+
// cached gauge metrics for each keyspace, resource group and RU type.
73+
gaugeMetrics map[metricsKey]*gaugeMetrics
74+
// cached keyspace name for each keyspace ID.
75+
keyspaceNameLookup map[uint32]string
6376
}
6477

6578
type consumptionRecordKey struct {
6679
keyspaceID uint32
67-
name string
80+
groupName string
81+
ruType string
82+
}
83+
84+
type metricsKey struct {
85+
keyspaceID uint32
86+
groupName string
6887
ruType string
6988
}
7089

90+
type trackerKey struct {
91+
keyspaceID uint32
92+
groupName string
93+
}
94+
7195
// ConfigProvider is used to get resource manager config from the given
7296
// `bs.server` without modifying its interface.
7397
type ConfigProvider interface {
@@ -82,6 +106,10 @@ func NewManager[T ConfigProvider](srv bs.Server) *Manager {
82106
krgms: make(map[uint32]*keyspaceResourceGroupManager),
83107
consumptionDispatcher: make(chan *consumptionItem, defaultConsumptionChanSize),
84108
consumptionRecord: make(map[consumptionRecordKey]time.Time),
109+
maxPerSecTrackers: make(map[trackerKey]*maxPerSecCostTracker),
110+
counterMetrics: make(map[metricsKey]*counterMetrics),
111+
gaugeMetrics: make(map[metricsKey]*gaugeMetrics),
112+
keyspaceNameLookup: make(map[uint32]string),
85113
}
86114
// The first initialization after the server is started.
87115
srv.AddStartCallback(func() {
@@ -346,6 +374,80 @@ func (m *Manager) dispatchConsumption(req *rmpb.TokenBucketRequest) error {
346374
return nil
347375
}
348376

377+
func (m *Manager) insertConsumptionRecord(keyspaceID uint32, groupName string, ruType string) {
378+
m.consumptionRecord[consumptionRecordKey{keyspaceID: keyspaceID, groupName: groupName, ruType: ruType}] = time.Now()
379+
}
380+
381+
func (m *Manager) deleteConsumptionRecord(record consumptionRecordKey) {
382+
delete(m.consumptionRecord, record)
383+
}
384+
385+
func (m *Manager) getMaxPerSecTracker(keyspaceID uint32, keyspaceName, groupName string) *maxPerSecCostTracker {
386+
tracker := m.maxPerSecTrackers[trackerKey{keyspaceID, groupName}]
387+
if tracker == nil {
388+
tracker = newMaxPerSecCostTracker(keyspaceName, groupName, defaultCollectIntervalSec)
389+
m.maxPerSecTrackers[trackerKey{keyspaceID, groupName}] = tracker
390+
}
391+
return tracker
392+
}
393+
394+
func (m *Manager) deleteMaxPerSecTracker(keyspaceID uint32, groupName string) {
395+
delete(m.maxPerSecTrackers, trackerKey{keyspaceID, groupName})
396+
}
397+
398+
func (m *Manager) getCounterMetrics(keyspaceID uint32, keyspaceName, groupName, ruType string) *counterMetrics {
399+
key := metricsKey{keyspaceID, groupName, ruType}
400+
if m.counterMetrics[key] == nil {
401+
m.counterMetrics[key] = newCounterMetrics(keyspaceName, groupName, ruType)
402+
}
403+
return m.counterMetrics[key]
404+
}
405+
406+
func (m *Manager) getGaugeMetrics(keyspaceID uint32, keyspaceName, groupName string) *gaugeMetrics {
407+
key := metricsKey{keyspaceID, groupName, ""}
408+
if m.gaugeMetrics[key] == nil {
409+
m.gaugeMetrics[key] = newGaugeMetrics(keyspaceName, groupName)
410+
}
411+
return m.gaugeMetrics[key]
412+
}
413+
414+
func (m *Manager) deleteMetrics(keyspaceID uint32, keyspaceName, groupName, ruType string) {
415+
delete(m.counterMetrics, metricsKey{keyspaceID, groupName, ruType})
416+
delete(m.gaugeMetrics, metricsKey{keyspaceID, groupName, ""})
417+
deleteLabelValues(keyspaceName, groupName, ruType)
418+
}
419+
420+
func (m *Manager) getKeyspaceNameByID(ctx context.Context, id uint32) (string, error) {
421+
if id == constant.NullKeyspaceID {
422+
return "", nil
423+
}
424+
// Try to get the keyspace name from the cache first.
425+
name, ok := m.keyspaceNameLookup[id]
426+
if ok {
427+
return name, nil
428+
}
429+
var loadedName string
430+
// If the keyspace name is not in the cache, try to get it from the storage.
431+
err := m.storage.RunInTxn(ctx, func(txn kv.Txn) error {
432+
meta, err := m.storage.LoadKeyspaceMeta(txn, id)
433+
if err != nil {
434+
return err
435+
}
436+
loadedName = meta.GetName()
437+
return nil
438+
})
439+
if err != nil {
440+
log.Error("failed to get the keyspace name", zap.Uint32("keyspace-id", id), zap.Error(err))
441+
return "", err
442+
}
443+
if len(loadedName) == 0 {
444+
return "", fmt.Errorf("got an empty keyspace name by id %d", id)
445+
}
446+
// Update the cache.
447+
m.keyspaceNameLookup[id] = loadedName
448+
return loadedName, nil
449+
}
450+
349451
func (m *Manager) backgroundMetricsFlush(ctx context.Context) {
350452
defer logutil.LogPanic()
351453
cleanUpTicker := time.NewTicker(metricsCleanupInterval)
@@ -355,31 +457,6 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) {
355457
recordMaxTicker := time.NewTicker(tickPerSecond)
356458
defer recordMaxTicker.Stop()
357459

358-
maxPerSecTrackers := make(map[uint32]map[string]*maxPerSecCostTracker)
359-
// getMaxPerSecTracker returns the max per sec tracker for the given keyspace ID and name.
360-
// If the tracker doesn't exist, it will be created.
361-
getMaxPerSecTracker := func(keyspaceID uint32, name string) *maxPerSecCostTracker {
362-
trackers := maxPerSecTrackers[keyspaceID]
363-
if trackers == nil {
364-
trackers = make(map[string]*maxPerSecCostTracker)
365-
maxPerSecTrackers[keyspaceID] = trackers
366-
}
367-
tracker, ok := trackers[name]
368-
if !ok {
369-
tracker = newMaxPerSecCostTracker(name, defaultCollectIntervalSec)
370-
trackers[name] = tracker
371-
}
372-
return tracker
373-
}
374-
// deleteMaxPerSecTracker deletes the max per sec tracker for the given keyspace ID and name.
375-
// If the tracker doesn't exist, it will do nothing.
376-
deleteMaxPerSecTracker := func(keyspaceID uint32, name string) {
377-
trackers := maxPerSecTrackers[keyspaceID]
378-
if trackers == nil {
379-
return
380-
}
381-
delete(trackers, name)
382-
}
383460
for {
384461
select {
385462
case <-ctx.Done():
@@ -389,6 +466,12 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) {
389466
if consumption == nil {
390467
continue
391468
}
469+
keyspaceID := consumptionInfo.keyspaceID
470+
keyspaceName, err := m.getKeyspaceNameByID(ctx, keyspaceID)
471+
if err != nil {
472+
continue
473+
}
474+
392475
ruLabelType := defaultTypeLabel
393476
if consumptionInfo.isBackground {
394477
ruLabelType = backgroundTypeLabel
@@ -397,53 +480,10 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) {
397480
ruLabelType = tiflashTypeLabel
398481
}
399482

400-
var (
401-
// TODO: add keyspace name lable to the metrics.
402-
keyspaceID = consumptionInfo.keyspaceID
403-
name = consumptionInfo.resourceGroupName
404-
rruMetrics = readRequestUnitCost.WithLabelValues(name, name, ruLabelType)
405-
wruMetrics = writeRequestUnitCost.WithLabelValues(name, name, ruLabelType)
406-
sqlLayerRuMetrics = sqlLayerRequestUnitCost.WithLabelValues(name, name)
407-
readByteMetrics = readByteCost.WithLabelValues(name, name, ruLabelType)
408-
writeByteMetrics = writeByteCost.WithLabelValues(name, name, ruLabelType)
409-
kvCPUMetrics = kvCPUCost.WithLabelValues(name, name, ruLabelType)
410-
sqlCPUMetrics = sqlCPUCost.WithLabelValues(name, name, ruLabelType)
411-
readRequestCountMetrics = requestCount.WithLabelValues(name, name, readTypeLabel)
412-
writeRequestCountMetrics = requestCount.WithLabelValues(name, name, writeTypeLabel)
413-
)
414-
getMaxPerSecTracker(keyspaceID, name).CollectConsumption(consumption)
415-
416-
// RU info.
417-
if consumption.RRU > 0 {
418-
rruMetrics.Add(consumption.RRU)
419-
}
420-
if consumption.WRU > 0 {
421-
wruMetrics.Add(consumption.WRU)
422-
}
423-
// Byte info.
424-
if consumption.ReadBytes > 0 {
425-
readByteMetrics.Add(consumption.ReadBytes)
426-
}
427-
if consumption.WriteBytes > 0 {
428-
writeByteMetrics.Add(consumption.WriteBytes)
429-
}
430-
// CPU time info.
431-
if consumption.TotalCpuTimeMs > 0 {
432-
if consumption.SqlLayerCpuTimeMs > 0 {
433-
sqlLayerRuMetrics.Add(consumption.SqlLayerCpuTimeMs * m.controllerConfig.RequestUnit.CPUMsCost)
434-
sqlCPUMetrics.Add(consumption.SqlLayerCpuTimeMs)
435-
}
436-
kvCPUMetrics.Add(consumption.TotalCpuTimeMs - consumption.SqlLayerCpuTimeMs)
437-
}
438-
// RPC count info.
439-
if consumption.KvReadRpcCount > 0 {
440-
readRequestCountMetrics.Add(consumption.KvReadRpcCount)
441-
}
442-
if consumption.KvWriteRpcCount > 0 {
443-
writeRequestCountMetrics.Add(consumption.KvWriteRpcCount)
444-
}
445-
446-
m.consumptionRecord[consumptionRecordKey{keyspaceID: keyspaceID, name: name, ruType: ruLabelType}] = time.Now()
483+
name := consumptionInfo.resourceGroupName
484+
m.getMaxPerSecTracker(keyspaceID, keyspaceName, name).collect(consumption)
485+
m.getCounterMetrics(keyspaceID, keyspaceName, name, ruLabelType).add(consumption, m.controllerConfig)
486+
m.insertConsumptionRecord(keyspaceID, name, ruLabelType)
447487

448488
// TODO: maybe we need to distinguish background ru.
449489
if rg := m.GetMutableResourceGroup(keyspaceID, name); rg != nil {
@@ -455,47 +495,44 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) {
455495
if time.Since(lastTime) <= metricsCleanupTimeout {
456496
continue
457497
}
458-
readRequestUnitCost.DeleteLabelValues(r.name, r.name, r.ruType)
459-
writeRequestUnitCost.DeleteLabelValues(r.name, r.name, r.ruType)
460-
sqlLayerRequestUnitCost.DeleteLabelValues(r.name, r.name, r.ruType)
461-
readByteCost.DeleteLabelValues(r.name, r.name, r.ruType)
462-
writeByteCost.DeleteLabelValues(r.name, r.name, r.ruType)
463-
kvCPUCost.DeleteLabelValues(r.name, r.name, r.ruType)
464-
sqlCPUCost.DeleteLabelValues(r.name, r.name, r.ruType)
465-
requestCount.DeleteLabelValues(r.name, r.name, readTypeLabel)
466-
requestCount.DeleteLabelValues(r.name, r.name, writeTypeLabel)
467-
availableRUCounter.DeleteLabelValues(r.name, r.name)
468-
delete(m.consumptionRecord, r)
469-
deleteMaxPerSecTracker(r.keyspaceID, r.name)
470-
readRequestUnitMaxPerSecCost.DeleteLabelValues(r.name)
471-
writeRequestUnitMaxPerSecCost.DeleteLabelValues(r.name)
472-
resourceGroupConfigGauge.DeletePartialMatch(prometheus.Labels{newResourceGroupNameLabel: r.name})
498+
keyspaceName, err := m.getKeyspaceNameByID(ctx, r.keyspaceID)
499+
if err != nil {
500+
continue
501+
}
502+
m.deleteConsumptionRecord(r)
503+
m.deleteMetrics(r.keyspaceID, keyspaceName, r.groupName, r.ruType)
504+
m.deleteMaxPerSecTracker(r.keyspaceID, r.groupName)
473505
}
474506
case <-availableRUTicker.C:
475507
// Prevent from holding the lock too long when there're many keyspaces and resource groups.
476508
for _, krgm := range m.getKeyspaceResourceGroupManagers() {
509+
keyspaceName, err := m.getKeyspaceNameByID(ctx, krgm.keyspaceID)
510+
if err != nil {
511+
continue
512+
}
477513
for _, group := range krgm.getResourceGroupList(true, false) {
478-
ru := math.Max(group.getRUToken(), 0)
479-
availableRUCounter.WithLabelValues(group.Name, group.Name).Set(ru)
480-
resourceGroupConfigGauge.WithLabelValues(group.Name, priorityLabel).Set(group.getPriority())
481-
resourceGroupConfigGauge.WithLabelValues(group.Name, ruPerSecLabel).Set(group.getFillRate())
482-
resourceGroupConfigGauge.WithLabelValues(group.Name, ruCapacityLabel).Set(group.getBurstLimit())
514+
m.getGaugeMetrics(krgm.keyspaceID, keyspaceName, group.Name).set(group)
483515
}
484516
}
485517
case <-recordMaxTicker.C:
486518
// Record the sum of RRU and WRU every second.
487519
for _, krgm := range m.getKeyspaceResourceGroupManagers() {
520+
keyspaceName, err := m.getKeyspaceNameByID(ctx, krgm.keyspaceID)
521+
if err != nil {
522+
continue
523+
}
488524
names := krgm.getResourceGroupNames(true)
489525
for _, name := range names {
490-
getMaxPerSecTracker(krgm.keyspaceID, name).FlushMetrics()
526+
m.getMaxPerSecTracker(krgm.keyspaceID, keyspaceName, name).flushMetrics()
491527
}
492528
}
493529
}
494530
}
495531
}
496532

497533
type maxPerSecCostTracker struct {
498-
name string
534+
keyspaceName string
535+
groupName string
499536
maxPerSecRRU float64
500537
maxPerSecWRU float64
501538
rruSum float64
@@ -508,23 +545,22 @@ type maxPerSecCostTracker struct {
508545
wruMaxMetrics prometheus.Gauge
509546
}
510547

511-
func newMaxPerSecCostTracker(name string, flushPeriod int) *maxPerSecCostTracker {
548+
func newMaxPerSecCostTracker(keyspaceName, groupName string, flushPeriod int) *maxPerSecCostTracker {
512549
return &maxPerSecCostTracker{
513-
name: name,
550+
keyspaceName: keyspaceName,
551+
groupName: groupName,
514552
flushPeriod: flushPeriod,
515-
rruMaxMetrics: readRequestUnitMaxPerSecCost.WithLabelValues(name),
516-
wruMaxMetrics: writeRequestUnitMaxPerSecCost.WithLabelValues(name),
553+
rruMaxMetrics: readRequestUnitMaxPerSecCost.WithLabelValues(groupName, keyspaceName),
554+
wruMaxMetrics: writeRequestUnitMaxPerSecCost.WithLabelValues(groupName, keyspaceName),
517555
}
518556
}
519557

520-
// CollectConsumption collects the consumption info.
521-
func (t *maxPerSecCostTracker) CollectConsumption(consume *rmpb.Consumption) {
558+
func (t *maxPerSecCostTracker) collect(consume *rmpb.Consumption) {
522559
t.rruSum += consume.RRU
523560
t.wruSum += consume.WRU
524561
}
525562

526-
// FlushMetrics and set the maxPerSecRRU and maxPerSecWRU to the metrics.
527-
func (t *maxPerSecCostTracker) FlushMetrics() {
563+
func (t *maxPerSecCostTracker) flushMetrics() {
528564
if t.lastRRUSum == 0 && t.lastWRUSum == 0 {
529565
t.lastRRUSum = t.rruSum
530566
t.lastWRUSum = t.wruSum

pkg/mcs/resourcemanager/server/manager_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ func TestBackgroundMetricsFlush(t *testing.T) {
7777

7878
// Verify consumption was added to the resource group.
7979
testutil.Eventually(re, func() bool {
80+
// TODO: test with the keyspace name.
8081
updatedGroup := m.GetResourceGroup(constant.NullKeyspaceID, req.GetResourceGroupName(), true)
8182
re.NotNil(updatedGroup)
8283
return updatedGroup.RUConsumption.RRU == req.ConsumptionSinceLastRequest.RRU &&

0 commit comments

Comments
 (0)