Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions controller/api/destination/watcher/endpoints_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -1145,10 +1145,7 @@ func (pp *portPublisher) newPodRefAddress(
if err != nil {
return Address{}, PodID{}, fmt.Errorf("unable to fetch pod %v: %w", id, err)
}
ownerKind, ownerName, err := pp.metadataAPI.GetOwnerKindAndName(context.Background(), pod, false)
if err != nil {
return Address{}, PodID{}, err
}
ownerKind, ownerName := pp.metadataAPI.GetOwnerKindAndName(context.Background(), pod, false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts about replacing this with the new function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems reasonable. I can put out a follow-up for this, since the current implementation doesn't support controlling retry logic.

addr := Address{
IP: endpointIP,
Port: endpointPort,
Expand Down
10 changes: 3 additions & 7 deletions controller/api/destination/watcher/workload_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -695,13 +695,9 @@ func (wp *workloadPublisher) updatePod(pod *corev1.Pod) {

// Fill in ownership.
if wp.addr.Pod != nil {
ownerKind, ownerName, err := wp.metadataAPI.GetOwnerKindAndName(context.Background(), wp.addr.Pod, true)
if err != nil {
wp.log.Errorf("Error getting pod owner for pod %s: %q", wp.addr.Pod.GetName(), err)
} else {
wp.addr.OwnerKind = ownerKind
wp.addr.OwnerName = ownerName
}
ownerKind, ownerName := wp.metadataAPI.GetOwnerKindAndName(context.Background(), wp.addr.Pod, true)
wp.addr.OwnerKind = ownerKind
wp.addr.OwnerName = ownerName
Comment on lines +698 to +700
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, saving for follow-up

}

// Compute opaque protocol.
Expand Down
74 changes: 47 additions & 27 deletions controller/k8s/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1057,12 +1057,16 @@ func TestGetOwnerKindAndName(t *testing.T) {
for i, tt := range []struct {
resources

expectedOwnerKind string
expectedOwnerName string
expectedOwnerKind string
expectedOwnerName string
expectedRootOwnerKind string
expectedRootOwnerName string
}{
{
expectedOwnerKind: "deployment",
expectedOwnerName: "t2",
expectedOwnerKind: "deployment",
expectedOwnerName: "t2",
expectedRootOwnerKind: "Deployment",
expectedRootOwnerName: "t2",
resources: resources{
results: []string{`
apiVersion: v1
Expand All @@ -1084,13 +1088,14 @@ metadata:
ownerReferences:
- apiVersion: apps/v1
kind: Deployment
name: t2`,
},
name: t2`},
},
},
{
expectedOwnerKind: "replicaset",
expectedOwnerName: "t1-b4f55d87f",
expectedOwnerKind: "replicaset",
expectedOwnerName: "t1-b4f55d87f",
expectedRootOwnerKind: "ReplicaSet",
expectedRootOwnerName: "t1-b4f55d87f",
resources: resources{
results: []string{`
apiVersion: v1
Expand All @@ -1106,8 +1111,10 @@ metadata:
},
},
{
expectedOwnerKind: "job",
expectedOwnerName: "slow-cooker",
expectedOwnerKind: "job",
expectedOwnerName: "slow-cooker",
expectedRootOwnerKind: "Job",
expectedRootOwnerName: "slow-cooker",
resources: resources{
results: []string{`
apiVersion: v1
Expand All @@ -1123,8 +1130,10 @@ metadata:
},
},
{
expectedOwnerKind: "replicationcontroller",
expectedOwnerName: "web",
expectedOwnerKind: "replicationcontroller",
expectedOwnerName: "web",
expectedRootOwnerKind: "ReplicationController",
expectedRootOwnerName: "web",
resources: resources{
results: []string{`
apiVersion: v1
Expand All @@ -1140,8 +1149,10 @@ metadata:
},
},
{
expectedOwnerKind: "pod",
expectedOwnerName: "vote-bot",
expectedOwnerKind: "pod",
expectedOwnerName: "vote-bot",
expectedRootOwnerKind: "Pod",
expectedRootOwnerName: "vote-bot",
resources: resources{
results: []string{`
apiVersion: v1
Expand All @@ -1153,8 +1164,10 @@ metadata:
},
},
{
expectedOwnerKind: "cronjob",
expectedOwnerName: "my-cronjob",
expectedOwnerKind: "cronjob",
expectedOwnerName: "my-cronjob",
expectedRootOwnerKind: "CronJob",
expectedRootOwnerName: "my-cronjob",
resources: resources{
results: []string{`
apiVersion: v1
Expand All @@ -1176,13 +1189,14 @@ metadata:
ownerReferences:
- apiVersion: batch/v1
kind: CronJob
name: my-cronjob`,
},
name: my-cronjob`},
},
},
{
expectedOwnerKind: "replicaset",
expectedOwnerName: "invalid-rs-parent-2abdffa",
expectedOwnerKind: "replicaset",
expectedOwnerName: "invalid-rs-parent-2abdffa",
expectedRootOwnerKind: "InvalidParentKind",
expectedRootOwnerName: "invalid-parent",
resources: resources{
results: []string{`
apiVersion: v1
Expand All @@ -1191,7 +1205,7 @@ metadata:
name: invalid-rs-parent-dcfq4
namespace: default
ownerReferences:
- apiVersion: v1
- apiVersion: apps/v1
kind: ReplicaSet
name: invalid-rs-parent-2abdffa`,
},
Expand All @@ -1204,8 +1218,7 @@ metadata:
ownerReferences:
- apiVersion: invalidParent/v1
kind: InvalidParentKind
name: invalid-parent`,
},
name: invalid-parent`},
},
},
} {
Expand All @@ -1232,10 +1245,7 @@ metadata:
t.Fatalf("Expected name to be [%s], got [%s]", tt.expectedOwnerName, ownerName)
}

ownerKind, ownerName, err = metadataAPI.GetOwnerKindAndName(context.Background(), pod, retry)
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}
ownerKind, ownerName = metadataAPI.GetOwnerKindAndName(context.Background(), pod, retry)

if ownerKind != tt.expectedOwnerKind {
t.Fatalf("Expected kind to be [%s], got [%s]", tt.expectedOwnerKind, ownerKind)
Expand All @@ -1244,6 +1254,16 @@ metadata:
if ownerName != tt.expectedOwnerName {
t.Fatalf("Expected name to be [%s], got [%s]", tt.expectedOwnerName, ownerName)
}

tm, om := metadataAPI.GetRootOwnerKindAndName(context.Background(), &pod.TypeMeta, &pod.ObjectMeta)

if tm.Kind != tt.expectedRootOwnerKind {
t.Fatalf("Expected root kind to be [%s], got [%s]", tt.expectedRootOwnerKind, tm.Kind)
}

if om.Name != tt.expectedRootOwnerName {
t.Fatalf("Expected root name to be [%s], got [%s]", tt.expectedRootOwnerName, om.Name)
}
})
}
}
Expand Down
66 changes: 59 additions & 7 deletions controller/k8s/metadata_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/informers"
"k8s.io/client-go/metadata"
"k8s.io/client-go/metadata/metadatainformer"
Expand Down Expand Up @@ -213,14 +214,14 @@ func (api *MetadataAPI) GetByNamespaceFiltered(
// Kubernetes singular resource type (e.g. deployment, daemonset, job, etc.).
// If retry is true, when the shared informer cache doesn't return anything we
// try again with a direct Kubernetes API call.
func (api *MetadataAPI) GetOwnerKindAndName(ctx context.Context, pod *corev1.Pod, retry bool) (string, string, error) {
func (api *MetadataAPI) GetOwnerKindAndName(ctx context.Context, pod *corev1.Pod, retry bool) (string, string) {
ownerRefs := pod.GetOwnerReferences()
if len(ownerRefs) == 0 {
// pod without a parent
return "pod", pod.Name, nil
return "pod", pod.Name
} else if len(ownerRefs) > 1 {
log.Debugf("unexpected owner reference count (%d): %+v", len(ownerRefs), ownerRefs)
return "pod", pod.Name, nil
return "pod", pod.Name
}

parent := ownerRefs[0]
Expand Down Expand Up @@ -258,18 +259,69 @@ func (api *MetadataAPI) GetOwnerKindAndName(ctx context.Context, pod *corev1.Pod
}

if rsObj == nil || !isValidRSParent(rsObj) {
return strings.ToLower(parent.Kind), parent.Name, nil
return strings.ToLower(parent.Kind), parent.Name
}
parentObj = rsObj
default:
return strings.ToLower(parent.Kind), parent.Name, nil
return strings.ToLower(parent.Kind), parent.Name
}

if err == nil && len(parentObj.GetOwnerReferences()) == 1 {
grandParent := parentObj.GetOwnerReferences()[0]
return strings.ToLower(grandParent.Kind), grandParent.Name, nil
return strings.ToLower(grandParent.Kind), grandParent.Name
}
return strings.ToLower(parent.Kind), parent.Name, nil
return strings.ToLower(parent.Kind), parent.Name
}

// GetRootOwnerKindAndName returns the resource's owner's type and metadata, using owner
// references from the Kubernetes API. Parent refs are recursively traversed to find the
// root parent resource, at least as far as the controller has permissions to query.
// This will attempt to lookup resources through the shared informer cache where possible,
// but may fall back to direct Kubernetes API calls where necessary.
func (api *MetadataAPI) GetRootOwnerKindAndName(ctx context.Context, tm *metav1.TypeMeta, om *metav1.ObjectMeta) (*metav1.TypeMeta, *metav1.ObjectMeta) {
ownerRefs := om.OwnerReferences
if len(ownerRefs) == 0 {
// resource without a parent
log.Debugf("Found root owner ref (%s)", om)
return tm, om
} else if len(ownerRefs) > 1 {
log.Debugf("unexpected owner reference count (%d): %+v", len(ownerRefs), ownerRefs)
return tm, om
}

parentRef := ownerRefs[0]
// The set of resources that we look up in the indexer are fairly niche. They all must be able to:
// - be a parent of another resource, usually a Pod
// - have a parent resource themselves
// Currently, this is limited to Jobs (parented by CronJobs) and ReplicaSets (parented by
// Deployments, StatefulSets, argo Rollouts, etc.). This list may change in the future, but
// is sufficient for now.
switch parentRef.Kind {
case "Job":
parent, err := api.getByNamespace(Job, om.Namespace, parentRef.Name)
if err == nil {
return api.GetRootOwnerKindAndName(ctx, &parent.TypeMeta, &parent.ObjectMeta)
}
log.Warnf("failed to retrieve job from indexer %s/%s: %s", om.Namespace, parentRef.Name, err)
case "ReplicaSet":
parent, err := api.getByNamespace(RS, om.Namespace, parentRef.Name)
if err == nil {
return api.GetRootOwnerKindAndName(ctx, &parent.TypeMeta, &parent.ObjectMeta)
}
log.Warnf("failed to retrieve replicaset from indexer %s/%s: %s", om.Namespace, parentRef.Name, err)
}

resource := schema.FromAPIVersionAndKind(parentRef.APIVersion, parentRef.Kind).
GroupVersion().
WithResource(strings.ToLower(parentRef.Kind) + "s")
parent, err := api.client.Resource(resource).
Namespace(om.Namespace).
Get(ctx, parentRef.Name, metav1.GetOptions{})
if err != nil {
log.Warnf("failed to retrieve resource from direct API call %s/%s/%s: %s", schema.FromAPIVersionAndKind(parentRef.APIVersion, parentRef.Kind), om.Namespace, parentRef.Name, err)
return &metav1.TypeMeta{Kind: parentRef.Kind, APIVersion: parentRef.APIVersion}, &metav1.ObjectMeta{Name: parentRef.Name, Namespace: om.Namespace}
}
return api.GetRootOwnerKindAndName(ctx, &parent.TypeMeta, &parent.ObjectMeta)
}

func (api *MetadataAPI) addInformer(res APIResource, informerLabels prometheus.Labels) error {
Expand Down
5 changes: 4 additions & 1 deletion controller/k8s/test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,10 @@ func NewFakeClusterScopedAPI(clientSet kubernetes.Interface, l5dClientSet l5dcrd
// NewFakeMetadataAPI provides a mock Kubernetes API for testing.
func NewFakeMetadataAPI(configs []string) (*MetadataAPI, error) {
sch := runtime.NewScheme()
metav1.AddMetaToScheme(sch)
err := metav1.AddMetaToScheme(sch)
if err != nil {
return nil, err
}

var objs []runtime.Object
for _, config := range configs {
Expand Down
10 changes: 9 additions & 1 deletion controller/proxy-injector/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func Inject(linkerdNamespace string, overrider inject.ValueOverrider) webhook.Ha
}
resourceConfig := inject.NewResourceConfig(valuesConfig, inject.OriginWebhook, linkerdNamespace).
WithOwnerRetriever(ownerRetriever(ctx, api, request.Namespace)).
WithRootOwnerRetriever(rootOwnerRetriever(ctx, api, request.Namespace)).
WithNsAnnotations(ns.GetAnnotations()).
WithKind(request.Kind.Kind)

Expand Down Expand Up @@ -217,8 +218,15 @@ func Inject(linkerdNamespace string, overrider inject.ValueOverrider) webhook.Ha
}

func ownerRetriever(ctx context.Context, api *k8s.MetadataAPI, ns string) inject.OwnerRetrieverFunc {
return func(p *v1.Pod) (string, string, error) {
return func(p *v1.Pod) (string, string) {
p.SetNamespace(ns)
return api.GetOwnerKindAndName(ctx, p, true)
}
}

func rootOwnerRetriever(ctx context.Context, api *k8s.MetadataAPI, ns string) inject.RootOwnerRetrieverFunc {
return func(tm *metav1.TypeMeta, om *metav1.ObjectMeta) (*metav1.TypeMeta, *metav1.ObjectMeta) {
om.SetNamespace(ns)
return api.GetRootOwnerKindAndName(ctx, tm, om)
}
}
11 changes: 8 additions & 3 deletions controller/proxy-injector/webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,8 @@ func TestGetAnnotationPatch(t *testing.T) {
fakeReq := getFakeServiceReq(service)
fullConf := testCase.conf.
WithKind(fakeReq.Kind.Kind).
WithOwnerRetriever(ownerRetrieverFake)
WithOwnerRetriever(ownerRetrieverFake).
WithRootOwnerRetriever(rootOwnerRetrieverFake)
_, err = fullConf.ParseMetaAndYAML(fakeReq.Object.Raw)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -408,8 +409,12 @@ func getFakeServiceReq(b []byte) *admissionv1beta1.AdmissionRequest {
}
}

func ownerRetrieverFake(p *corev1.Pod) (string, string, error) {
return pkgK8s.Deployment, "owner-deployment", nil
func ownerRetrieverFake(p *corev1.Pod) (string, string) {
return pkgK8s.Deployment, "owner-deployment"
}

func rootOwnerRetrieverFake(tm *metav1.TypeMeta, om *metav1.ObjectMeta) (*metav1.TypeMeta, *metav1.ObjectMeta) {
return &metav1.TypeMeta{Kind: "Deployment", APIVersion: "apps/v1"}, &metav1.ObjectMeta{Name: "owner-deployment"}
}

func loadPatch(factory *fake.Factory, t *testing.T, name string) ([]byte, unmarshalledPatch) {
Expand Down
Loading