Skip to content

Commit 8410d1c

Browse files
authored
fix(destination): Replace With with GetMetricWith (#14428)
There are a number of places in the destination controller where prometheus metrics are constructed using the `.With` function to supply label names. The `.With` function panics if it encounters an invalid label name. To avoid potential panics, we replace all uses of `.With` with the safer `.GetMetricWith` which instead returns an error if it is provided an invalid label name. We then handle these errors appropriately instead of panicking. Signed-off-by: Alex Leong <[email protected]>
1 parent 5801546 commit 8410d1c

19 files changed

+281
-78
lines changed

controller/api/destination/destination_fuzzer.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,10 @@ func FuzzProfileTranslatorUpdate(data []byte) int {
9494

9595
id := watcher.ServiceID{Namespace: "bar", Name: "foo"}
9696
server := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)}
97-
translator := newProfileTranslator(id, server, logging.WithField("test", t.Name()), "foo.bar.svc.cluster.local", 80, nil)
97+
translator, err := newProfileTranslator(id, server, logging.WithField("test", t.Name()), "foo.bar.svc.cluster.local", 80, nil)
98+
if err != nil {
99+
return 0
100+
}
98101
translator.Start()
99102
defer translator.Stop()
100103
translator.Update(profile)

controller/api/destination/endpoint_translator.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func newEndpointTranslator(
101101
stream pb.Destination_GetServer,
102102
endStream chan struct{},
103103
log *logging.Entry,
104-
) *endpointTranslator {
104+
) (*endpointTranslator, error) {
105105
log = log.WithFields(logging.Fields{
106106
"component": "endpoint-translator",
107107
"service": service,
@@ -115,6 +115,11 @@ func newEndpointTranslator(
115115

116116
filteredSnapshot := newEmptyAddressSet()
117117

118+
counter, err := updatesQueueOverflowCounter.GetMetricWith(prometheus.Labels{"service": service})
119+
if err != nil {
120+
return nil, fmt.Errorf("failed to create updates queue overflow counter: %w", err)
121+
}
122+
118123
return &endpointTranslator{
119124
controllerNS,
120125
identityTrustDomain,
@@ -133,10 +138,10 @@ func newEndpointTranslator(
133138
stream,
134139
endStream,
135140
log,
136-
updatesQueueOverflowCounter.With(prometheus.Labels{"service": service}),
141+
counter,
137142
make(chan interface{}, updateQueueCapacity),
138143
make(chan struct{}),
139-
}
144+
}, nil
140145
}
141146

142147
func (et *endpointTranslator) Add(set watcher.AddressSet) {

controller/api/destination/federated_service_watcher.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ func (fs *federatedService) remoteDiscoverySubscribe(
348348
return
349349
}
350350

351-
translator := newEndpointTranslator(
351+
translator, err := newEndpointTranslator(
352352
fs.config.ControllerNS,
353353
remoteConfig.TrustDomain,
354354
fs.config.ForceOpaqueTransport,
@@ -365,13 +365,18 @@ func (fs *federatedService) remoteDiscoverySubscribe(
365365
subscriber.endStream,
366366
fs.log,
367367
)
368+
if err != nil {
369+
fs.log.Errorf("Failed to create endpoint translator for remote discovery service %q in cluster %s: %s", id.service.Name, id.cluster, err)
370+
return
371+
}
372+
368373
translator.Start()
369374
subscriber.remoteTranslators[id] = translator
370375

371376
fs.log.Debugf("Subscribing to remote discovery service %s in cluster %s", id.service, id.cluster)
372-
err := remoteWatcher.Subscribe(watcher.ServiceID{Namespace: id.service.Namespace, Name: id.service.Name}, subscriber.port, subscriber.instanceID, translator)
377+
err = remoteWatcher.Subscribe(watcher.ServiceID{Namespace: id.service.Namespace, Name: id.service.Name}, subscriber.port, subscriber.instanceID, translator)
373378
if err != nil {
374-
fs.log.Errorf("Failed to subscribe to remote disocvery service %q in cluster %s: %s", id.service.Name, id.cluster, err)
379+
fs.log.Errorf("Failed to subscribe to remote discovery service %q in cluster %s: %s", id.service.Name, id.cluster, err)
375380
}
376381
}
377382

@@ -397,7 +402,7 @@ func (fs *federatedService) localDiscoverySubscribe(
397402
subscriber *federatedServiceSubscriber,
398403
localDiscovery string,
399404
) {
400-
translator := newEndpointTranslator(
405+
translator, err := newEndpointTranslator(
401406
fs.config.ControllerNS,
402407
fs.config.IdentityTrustDomain,
403408
fs.config.ForceOpaqueTransport,
@@ -414,11 +419,15 @@ func (fs *federatedService) localDiscoverySubscribe(
414419
subscriber.endStream,
415420
fs.log,
416421
)
422+
if err != nil {
423+
fs.log.Errorf("Failed to create endpoint translator for %s: %s", localDiscovery, err)
424+
return
425+
}
417426
translator.Start()
418427
subscriber.localTranslators[localDiscovery] = translator
419428

420429
fs.log.Debugf("Subscribing to local discovery service %s", localDiscovery)
421-
err := fs.localEndpoints.Subscribe(watcher.ServiceID{Namespace: fs.namespace, Name: localDiscovery}, subscriber.port, subscriber.instanceID, translator)
430+
err = fs.localEndpoints.Subscribe(watcher.ServiceID{Namespace: fs.namespace, Name: localDiscovery}, subscriber.port, subscriber.instanceID, translator)
422431
if err != nil {
423432
fs.log.Errorf("Failed to subscribe to %s: %s", localDiscovery, err)
424433
}

controller/api/destination/profile_translator.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ var profileUpdatesQueueOverflowCounter = promauto.NewCounterVec(
4646
},
4747
)
4848

49-
func newProfileTranslator(serviceID watcher.ServiceID, stream pb.Destination_GetProfileServer, log *logging.Entry, fqn string, port uint32, endStream chan struct{}) *profileTranslator {
49+
func newProfileTranslator(serviceID watcher.ServiceID, stream pb.Destination_GetProfileServer, log *logging.Entry, fqn string, port uint32, endStream chan struct{}) (*profileTranslator, error) {
5050
parentRef := &meta.Metadata{
5151
Kind: &meta.Metadata_Resource{
5252
Resource: &meta.Resource{
@@ -59,6 +59,11 @@ func newProfileTranslator(serviceID watcher.ServiceID, stream pb.Destination_Get
5959
},
6060
}
6161

62+
overflowCounter, err := profileUpdatesQueueOverflowCounter.GetMetricWith(prometheus.Labels{"fqn": fqn, "port": fmt.Sprintf("%d", port)})
63+
if err != nil {
64+
return nil, fmt.Errorf("failed to create profile updates queue overflow counter: %w", err)
65+
}
66+
6267
return &profileTranslator{
6368
fullyQualifiedName: fqn,
6469
port: port,
@@ -67,10 +72,10 @@ func newProfileTranslator(serviceID watcher.ServiceID, stream pb.Destination_Get
6772
stream: stream,
6873
endStream: endStream,
6974
log: log.WithField("component", "profile-translator"),
70-
overflowCounter: profileUpdatesQueueOverflowCounter.With(prometheus.Labels{"fqn": fqn, "port": fmt.Sprintf("%d", port)}),
75+
overflowCounter: overflowCounter,
7176
updates: make(chan *sp.ServiceProfile, updateQueueCapacity),
7277
stop: make(chan struct{}),
73-
}
78+
}, nil
7479
}
7580

7681
// Update is called from a client-go informer callback and therefore must not

controller/api/destination/profile_translator_test.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -481,7 +481,10 @@ func newMockTranslator(t *testing.T) (*profileTranslator, chan *pb.DestinationPr
481481
t.Helper()
482482
id := watcher.ServiceID{Namespace: "bar", Name: "foo"}
483483
server := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)}
484-
translator := newProfileTranslator(id, server, logging.WithField("test", t.Name()), "foo.bar.svc.cluster.local", 80, nil)
484+
translator, err := newProfileTranslator(id, server, logging.WithField("test", t.Name()), "foo.bar.svc.cluster.local", 80, nil)
485+
if err != nil {
486+
t.Fatalf("failed to create profile translator: %s", err)
487+
}
485488
return translator, server.profilesReceived
486489
}
487490

@@ -593,7 +596,10 @@ func TestProfileTranslator(t *testing.T) {
593596

594597
t.Run("Sends empty update", func(t *testing.T) {
595598
server := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)}
596-
translator := newProfileTranslator(watcher.ID{}, server, logging.WithField("test", t.Name()), "", 80, nil)
599+
translator, err := newProfileTranslator(watcher.ID{}, server, logging.WithField("test", t.Name()), "", 80, nil)
600+
if err != nil {
601+
t.Fatalf("failed to create profile translator: %s", err)
602+
}
597603

598604
translator.Start()
599605
defer translator.Stop()

controller/api/destination/server.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ func (s *server) Get(dest *pb.GetDestination, stream pb.Destination_GetServer) e
203203
log.Errorf("Failed to get remote cluster %s", cluster)
204204
return status.Errorf(codes.NotFound, "Remote cluster not found: %s", cluster)
205205
}
206-
translator := newEndpointTranslator(
206+
translator, err := newEndpointTranslator(
207207
s.config.ControllerNS,
208208
remoteConfig.TrustDomain,
209209
s.config.ForceOpaqueTransport,
@@ -220,6 +220,9 @@ func (s *server) Get(dest *pb.GetDestination, stream pb.Destination_GetServer) e
220220
streamEnd,
221221
log,
222222
)
223+
if err != nil {
224+
return status.Errorf(codes.Internal, "Failed to create endpoint translator: %s", err)
225+
}
223226
translator.Start()
224227
defer translator.Stop()
225228

@@ -238,7 +241,7 @@ func (s *server) Get(dest *pb.GetDestination, stream pb.Destination_GetServer) e
238241
} else {
239242
log.Debug("Local discovery service detected")
240243
// Local discovery
241-
translator := newEndpointTranslator(
244+
translator, err := newEndpointTranslator(
242245
s.config.ControllerNS,
243246
s.config.IdentityTrustDomain,
244247
s.config.ForceOpaqueTransport,
@@ -255,6 +258,9 @@ func (s *server) Get(dest *pb.GetDestination, stream pb.Destination_GetServer) e
255258
streamEnd,
256259
log,
257260
)
261+
if err != nil {
262+
return status.Errorf(codes.Internal, "Failed to create endpoint translator: %s", err)
263+
}
258264
translator.Start()
259265
defer translator.Stop()
260266

@@ -397,7 +403,10 @@ func (s *server) subscribeToServiceProfile(
397403
// We build up the pipeline of profile updaters backwards, starting from
398404
// the translator which takes profile updates, translates them to protobuf
399405
// and pushes them onto the gRPC stream.
400-
translator := newProfileTranslator(service, stream, log, fqn, port, streamEnd)
406+
translator, err := newProfileTranslator(service, stream, log, fqn, port, streamEnd)
407+
if err != nil {
408+
return status.Errorf(codes.Internal, "Failed to create profile translator: %s", err)
409+
}
401410
translator.Start()
402411
defer translator.Stop()
403412

@@ -408,7 +417,7 @@ func (s *server) subscribeToServiceProfile(
408417

409418
// Create an adaptor that merges service-level opaque port configurations
410419
// onto profile updates.
411-
err := s.opaquePorts.Subscribe(service, opaquePortsAdaptor)
420+
err = s.opaquePorts.Subscribe(service, opaquePortsAdaptor)
412421
if err != nil {
413422
log.Warnf("Failed to subscribe to service updates for %s: %s", service, err)
414423
return err

controller/api/destination/test_util.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1119,7 +1119,7 @@ metadata:
11191119
metadataAPI.Sync(nil)
11201120

11211121
mockGetServer := &mockDestinationGetServer{updatesReceived: make(chan *pb.Update, 50)}
1122-
translator := newEndpointTranslator(
1122+
translator, err := newEndpointTranslator(
11231123
"linkerd",
11241124
"trust.domain",
11251125
forceOpaqueTransport,
@@ -1136,5 +1136,8 @@ metadata:
11361136
nil,
11371137
logging.WithField("test", t.Name()),
11381138
)
1139+
if err != nil {
1140+
t.Fatalf("failed to create endpoint translator: %s", err)
1141+
}
11391142
return mockGetServer, translator
11401143
}

controller/api/destination/watcher/endpoints_watcher.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -261,8 +261,7 @@ func (ew *EndpointsWatcher) Subscribe(id ServiceID, port Port, hostname string,
261261

262262
sp := ew.getOrNewServicePublisher(id)
263263

264-
sp.subscribe(port, hostname, listener)
265-
return nil
264+
return sp.subscribe(port, hostname, listener)
266265
}
267266

268267
// Unsubscribe removes a listener from the subscribers list for this authority.
@@ -659,7 +658,7 @@ func (sp *servicePublisher) updateService(newService *corev1.Service) {
659658

660659
}
661660

662-
func (sp *servicePublisher) subscribe(srcPort Port, hostname string, listener EndpointUpdateListener) {
661+
func (sp *servicePublisher) subscribe(srcPort Port, hostname string, listener EndpointUpdateListener) error {
663662
sp.Lock()
664663
defer sp.Unlock()
665664

@@ -669,10 +668,15 @@ func (sp *servicePublisher) subscribe(srcPort Port, hostname string, listener En
669668
}
670669
port, ok := sp.ports[key]
671670
if !ok {
672-
port = sp.newPortPublisher(srcPort, hostname)
671+
var err error
672+
port, err = sp.newPortPublisher(srcPort, hostname)
673+
if err != nil {
674+
return err
675+
}
673676
sp.ports[key] = port
674677
}
675678
port.subscribe(listener)
679+
return nil
676680
}
677681

678682
func (sp *servicePublisher) unsubscribe(srcPort Port, hostname string, listener EndpointUpdateListener) {
@@ -693,7 +697,7 @@ func (sp *servicePublisher) unsubscribe(srcPort Port, hostname string, listener
693697
}
694698
}
695699

696-
func (sp *servicePublisher) newPortPublisher(srcPort Port, hostname string) *portPublisher {
700+
func (sp *servicePublisher) newPortPublisher(srcPort Port, hostname string) (*portPublisher, error) {
697701
targetPort := intstr.FromInt(int(srcPort))
698702
svc, err := sp.k8sAPI.Svc().Lister().Services(sp.id.Namespace).Get(sp.id.Name)
699703
if err != nil && !apierrors.IsNotFound(err) {
@@ -707,6 +711,10 @@ func (sp *servicePublisher) newPortPublisher(srcPort Port, hostname string) *por
707711

708712
log := sp.log.WithField("port", srcPort)
709713

714+
metrics, err := endpointsVecs.newEndpointsMetrics(sp.metricsLabels(srcPort, hostname))
715+
if err != nil {
716+
return nil, err
717+
}
710718
port := &portPublisher{
711719
listeners: []EndpointUpdateListener{},
712720
targetPort: targetPort,
@@ -716,7 +724,7 @@ func (sp *servicePublisher) newPortPublisher(srcPort Port, hostname string) *por
716724
k8sAPI: sp.k8sAPI,
717725
metadataAPI: sp.metadataAPI,
718726
log: log,
719-
metrics: endpointsVecs.newEndpointsMetrics(sp.metricsLabels(srcPort, hostname)),
727+
metrics: metrics,
720728
enableEndpointSlices: sp.enableEndpointSlices,
721729
localTrafficPolicy: sp.localTrafficPolicy,
722730
}
@@ -744,7 +752,7 @@ func (sp *servicePublisher) newPortPublisher(srcPort Port, hostname string) *por
744752
}
745753
}
746754

747-
return port
755+
return port, nil
748756
}
749757

750758
func (sp *servicePublisher) metricsLabels(port Port, hostname string) prometheus.Labels {

controller/api/destination/watcher/opaque_ports_watcher.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,12 @@ func (opw *OpaquePortsWatcher) Subscribe(id ServiceID, listener OpaquePortsUpdat
9999
numListeners = float64(len(ss.listeners))
100100
}
101101

102-
opw.subscribersGauge.With(id.Labels()).Set(numListeners)
102+
gauge, err := opw.subscribersGauge.GetMetricWith(id.Labels())
103+
if err != nil {
104+
opw.log.Errorf("failed to get service_subscribers metric: %q", err)
105+
} else {
106+
gauge.Set(numListeners)
107+
}
103108

104109
return nil
105110
}
@@ -125,7 +130,12 @@ func (opw *OpaquePortsWatcher) Unsubscribe(id ServiceID, listener OpaquePortsUpd
125130

126131
labels := id.Labels()
127132
if len(ss.listeners) > 0 {
128-
opw.subscribersGauge.With(labels).Set(float64(len(ss.listeners)))
133+
gauge, err := opw.subscribersGauge.GetMetricWith(labels)
134+
if err != nil {
135+
opw.log.Errorf("failed to get service_subscribers metric: %q", err)
136+
} else {
137+
gauge.Set(float64(len(ss.listeners)))
138+
}
129139
} else {
130140
if !opw.subscribersGauge.Delete(labels) {
131141
opw.log.Warnf("unable to delete service_subscribers metric with labels %s", labels)

0 commit comments

Comments
 (0)