Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 47 additions & 10 deletions internal/kgateway/extensions2/plugins/backendconfigpolicy/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"

"github.com/kgateway-dev/kgateway/v2/api/v1alpha1"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/krtcollections"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/utils"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/wellknown"
"github.com/kgateway-dev/kgateway/v2/pkg/logging"
sdk "github.com/kgateway-dev/kgateway/v2/pkg/pluginsdk"
"github.com/kgateway-dev/kgateway/v2/pkg/pluginsdk/collections"
"github.com/kgateway-dev/kgateway/v2/pkg/pluginsdk/ir"
"github.com/kgateway-dev/kgateway/v2/pkg/pluginsdk/reporter"
pluginsdkutils "github.com/kgateway-dev/kgateway/v2/pkg/pluginsdk/utils"
"github.com/kgateway-dev/kgateway/v2/pkg/reports"
"github.com/kgateway-dev/kgateway/v2/pkg/utils/cmputils"
"github.com/kgateway-dev/kgateway/v2/pkg/validator"
)
Expand Down Expand Up @@ -113,16 +116,27 @@ func NewPlugin(ctx context.Context, commoncol *collections.CommonCollections, v
kclient.Filter{ObjectFilter: commoncol.Client.ObjectFilter()},
)
col := krt.WrapClient(cli, commoncol.KrtOpts.ToOptions("BackendConfigPolicy")...)
backendConfigPolicyCol := krt.NewCollection(col, func(krtctx krt.HandlerContext, b *v1alpha1.BackendConfigPolicy) *ir.PolicyWrapper {
gk := wellknown.BackendConfigPolicyGVK.GroupKind()

policyStatusMarker, backendConfigPolicyCol := krt.NewStatusCollection(col, func(krtctx krt.HandlerContext, b *v1alpha1.BackendConfigPolicy) (*krtcollections.StatusMarker, *ir.PolicyWrapper) {
policyIR, errs := translate(commoncol, krtctx, b)
if err := validateXDS(ctx, policyIR, v, commoncol.Settings.ValidationMode); err != nil {
errs = append(errs, err)
}

return &ir.PolicyWrapper{
// Create status marker if existing status has kgateway controller
var statusMarker *krtcollections.StatusMarker
for _, ancestor := range b.Status.Ancestors {
if string(ancestor.ControllerName) == commoncol.ControllerName {
statusMarker = &krtcollections.StatusMarker{}
break
}
}

pol := &ir.PolicyWrapper{
ObjectSource: ir.ObjectSource{
Group: wellknown.BackendConfigPolicyGVK.Group,
Kind: wellknown.BackendConfigPolicyGVK.Kind,
Group: gk.Group,
Kind: gk.Kind,
Namespace: b.Namespace,
Name: b.Name,
},
Expand All @@ -131,15 +145,38 @@ func NewPlugin(ctx context.Context, commoncol *collections.CommonCollections, v
TargetRefs: pluginsdkutils.TargetRefsToPolicyRefs(b.Spec.TargetRefs, b.Spec.TargetSelectors),
Errors: errs,
}
}, commoncol.KrtOpts.ToOptions("BackendConfigPolicyIRs")...)

return statusMarker, pol
})

// processMarkers for policies that have existing status but no current report
processMarkers := func(kctx krt.HandlerContext, reportMap *reports.ReportMap) {
objStatus := krt.Fetch(kctx, policyStatusMarker)
for _, status := range objStatus {
policyKey := reporter.PolicyKey{
Group: gk.Group,
Kind: gk.Kind,
Namespace: status.Obj.GetNamespace(),
Name: status.Obj.GetName(),
}

// Add empty status to clear stale status for policies with no valid targets
if reportMap.Policies[policyKey] == nil {
rp := reports.NewReporter(reportMap)
// create empty policy report entry with no ancestor refs
rp.Policy(policyKey, 0)
}
}
}
return sdk.Plugin{
ContributesPolicies: map[schema.GroupKind]sdk.PolicyPlugin{
wellknown.BackendConfigPolicyGVK.GroupKind(): {
Name: "BackendConfigPolicy",
Policies: backendConfigPolicyCol,
ProcessBackend: processBackend,
GetPolicyStatus: getPolicyStatusFn(cli),
PatchPolicyStatus: patchPolicyStatusFn(cli),
Name: "BackendConfigPolicy",
Policies: backendConfigPolicyCol,
ProcessPolicyStaleStatusMarkers: processMarkers,
ProcessBackend: processBackend,
GetPolicyStatus: getPolicyStatusFn(cli),
PatchPolicyStatus: patchPolicyStatusFn(cli),
},
},
}
Expand Down
51 changes: 43 additions & 8 deletions internal/kgateway/extensions2/plugins/backendtlspolicy/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@ import (
gwv1 "sigs.k8s.io/gateway-api/apis/v1"

eiutils "github.com/kgateway-dev/kgateway/v2/internal/envoyinit/pkg/utils"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/krtcollections"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/utils"
kgwellknown "github.com/kgateway-dev/kgateway/v2/internal/kgateway/wellknown"
sdk "github.com/kgateway-dev/kgateway/v2/pkg/pluginsdk"
"github.com/kgateway-dev/kgateway/v2/pkg/pluginsdk/collections"
"github.com/kgateway-dev/kgateway/v2/pkg/pluginsdk/ir"
"github.com/kgateway-dev/kgateway/v2/pkg/pluginsdk/reporter"
pluginutils "github.com/kgateway-dev/kgateway/v2/pkg/pluginsdk/utils"
"github.com/kgateway-dev/kgateway/v2/pkg/reports"
)

var (
Expand Down Expand Up @@ -73,8 +76,19 @@ func NewPlugin(ctx context.Context, commoncol *collections.CommonCollections) sd
col := krt.WrapClient(cli, commoncol.KrtOpts.ToOptions("BackendTLSPolicy")...)

translate := buildTranslateFunc(commoncol.ConfigMaps)
tlsPolicyCol := krt.NewCollection(col, func(krtctx krt.HandlerContext, i *gwv1.BackendTLSPolicy) *ir.PolicyWrapper {

policyStatusMarker, tlsPolicyCol := krt.NewStatusCollection(col, func(krtctx krt.HandlerContext, i *gwv1.BackendTLSPolicy) (*krtcollections.StatusMarker, *ir.PolicyWrapper) {
tlsPolicyIR, err := translate(krtctx, i)

// Create status marker if existing status has kgateway controller
var statusMarker *krtcollections.StatusMarker
for _, ancestor := range i.Status.Ancestors {
if string(ancestor.ControllerName) == kgwellknown.DefaultGatewayControllerName {
statusMarker = &krtcollections.StatusMarker{}
break
}
}

pol := &ir.PolicyWrapper{
ObjectSource: ir.ObjectSource{
Group: backendTlsPolicyGroupKind.Group,
Expand All @@ -89,17 +103,38 @@ func NewPlugin(ctx context.Context, commoncol *collections.CommonCollections) sd
if err != nil {
pol.Errors = []error{err}
}
return pol
}, commoncol.KrtOpts.ToOptions("BackendTLSPolicyIRs")...)
return statusMarker, pol
})

// processMarkers for policies that have existing status but no current report
processMarkers := func(kctx krt.HandlerContext, reportMap *reports.ReportMap) {
objStatus := krt.Fetch(kctx, policyStatusMarker)
for _, status := range objStatus {
policyKey := reporter.PolicyKey{
Group: backendTlsPolicyGroupKind.Group,
Kind: backendTlsPolicyGroupKind.Kind,
Namespace: status.Obj.GetNamespace(),
Name: status.Obj.GetName(),
}

// Add empty status to clear stale status for policies with no valid targets
if reportMap.Policies[policyKey] == nil {
rp := reports.NewReporter(reportMap)
// create empty policy report entry with no ancestor refs
rp.Policy(policyKey, 0)
}
}
}

return sdk.Plugin{
ContributesPolicies: map[schema.GroupKind]sdk.PolicyPlugin{
backendTlsPolicyGroupKind.GroupKind(): {
Name: "BackendTLSPolicy",
Policies: tlsPolicyCol,
ProcessBackend: processBackend,
GetPolicyStatus: getPolicyStatusFn(cli),
PatchPolicyStatus: patchPolicyStatusFn(cli),
Name: "BackendTLSPolicy",
Policies: tlsPolicyCol,
ProcessPolicyStaleStatusMarkers: processMarkers,
ProcessBackend: processBackend,
GetPolicyStatus: getPolicyStatusFn(cli),
PatchPolicyStatus: patchPolicyStatusFn(cli),
},
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/utils/ptr"

"github.com/kgateway-dev/kgateway/v2/api/v1alpha1"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/krtcollections"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/utils"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/wellknown"
"github.com/kgateway-dev/kgateway/v2/pkg/logging"
Expand All @@ -34,6 +35,7 @@ import (
"github.com/kgateway-dev/kgateway/v2/pkg/pluginsdk/policy"
"github.com/kgateway-dev/kgateway/v2/pkg/pluginsdk/reporter"
pluginsdkutils "github.com/kgateway-dev/kgateway/v2/pkg/pluginsdk/utils"
"github.com/kgateway-dev/kgateway/v2/pkg/reports"
"github.com/kgateway-dev/kgateway/v2/pkg/utils/cmputils"
)

Expand Down Expand Up @@ -175,7 +177,8 @@ func NewPlugin(ctx context.Context, commoncol *collections.CommonCollections) sd
)
col := krt.WrapClient(cli, commoncol.KrtOpts.ToOptions("HTTPListenerPolicy")...)
gk := wellknown.HTTPListenerPolicyGVK.GroupKind()
policyCol := krt.NewCollection(col, func(krtctx krt.HandlerContext, i *v1alpha1.HTTPListenerPolicy) *ir.PolicyWrapper {

policyStatusMarker, policyCol := krt.NewStatusCollection(col, func(krtctx krt.HandlerContext, i *v1alpha1.HTTPListenerPolicy) (*krtcollections.StatusMarker, *ir.PolicyWrapper) {
objSrc := ir.ObjectSource{
Group: gk.Group,
Kind: gk.Kind,
Expand Down Expand Up @@ -218,6 +221,15 @@ func NewPlugin(ctx context.Context, commoncol *collections.CommonCollections) sd
xffNumTrustedHops = ptr.To(uint32(*i.Spec.XffNumTrustedHops)) // nolint:gosec // G115: kubebuilder validation ensures safe for uint32
}

// Create status marker if existing status has kgateway controller
var statusMarker *krtcollections.StatusMarker
for _, ancestor := range i.Status.Ancestors {
if string(ancestor.ControllerName) == commoncol.ControllerName {
statusMarker = &krtcollections.StatusMarker{}
break
}
}

pol := &ir.PolicyWrapper{
ObjectSource: objSrc,
Policy: i,
Expand All @@ -242,16 +254,37 @@ func NewPlugin(ctx context.Context, commoncol *collections.CommonCollections) sd
Errors: errs,
}

return pol
return statusMarker, pol
})

// processMarkers for policies that have existing status but no current report
processMarkers := func(kctx krt.HandlerContext, reportMap *reports.ReportMap) {
objStatus := krt.Fetch(kctx, policyStatusMarker)
for _, status := range objStatus {
policyKey := reporter.PolicyKey{
Group: gk.Group,
Kind: gk.Kind,
Namespace: status.Obj.GetNamespace(),
Name: status.Obj.GetName(),
}

// Add empty status to clear stale status for policies with no valid targets
if reportMap.Policies[policyKey] == nil {
rp := reports.NewReporter(reportMap)
// create empty policy report entry with no ancestor refs
rp.Policy(policyKey, 0)
}
}
}

return sdk.Plugin{
ContributesPolicies: map[schema.GroupKind]sdk.PolicyPlugin{
wellknown.HTTPListenerPolicyGVK.GroupKind(): {
NewGatewayTranslationPass: NewGatewayTranslationPass,
Policies: policyCol,
GetPolicyStatus: getPolicyStatusFn(cli),
PatchPolicyStatus: patchPolicyStatusFn(cli),
NewGatewayTranslationPass: NewGatewayTranslationPass,
Policies: policyCol,
ProcessPolicyStaleStatusMarkers: processMarkers,
GetPolicyStatus: getPolicyStatusFn(cli),
PatchPolicyStatus: patchPolicyStatusFn(cli),
MergePolicies: func(pols []ir.PolicyAtt) ir.PolicyAtt {
return policy.MergePolicies(pols, mergePolicies, "" /*no merge settings*/)
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

apiannotations "github.com/kgateway-dev/kgateway/v2/api/annotations"
"github.com/kgateway-dev/kgateway/v2/api/v1alpha1"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/krtcollections"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/utils"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/wellknown"
"github.com/kgateway-dev/kgateway/v2/pkg/logging"
Expand All @@ -34,6 +35,7 @@ import (
"github.com/kgateway-dev/kgateway/v2/pkg/pluginsdk/policy"
"github.com/kgateway-dev/kgateway/v2/pkg/pluginsdk/reporter"
pluginsdkutils "github.com/kgateway-dev/kgateway/v2/pkg/pluginsdk/utils"
"github.com/kgateway-dev/kgateway/v2/pkg/reports"
"github.com/kgateway-dev/kgateway/v2/pkg/validator"
)

Expand Down Expand Up @@ -216,7 +218,7 @@ func NewPlugin(ctx context.Context, commoncol *collections.CommonCollections, me
constructor := NewTrafficPolicyConstructor(ctx, commoncol)

// TrafficPolicy IR will have TypedConfig -> implement backendroute method to add prompt guard, etc.
policyCol := krt.NewCollection(col, func(krtctx krt.HandlerContext, policyCR *v1alpha1.TrafficPolicy) *ir.PolicyWrapper {
statusCol, policyCol := krt.NewStatusCollection(col, func(krtctx krt.HandlerContext, policyCR *v1alpha1.TrafficPolicy) (*krtcollections.StatusMarker, *ir.PolicyWrapper) {
objSrc := ir.ObjectSource{
Group: gk.Group,
Kind: gk.Kind,
Expand All @@ -234,6 +236,14 @@ func NewPlugin(ctx context.Context, commoncol *collections.CommonCollections, me
errors = append(errors, err)
}

var statusMarker *krtcollections.StatusMarker
for _, ancestor := range policyCR.Status.Ancestors {
if string(ancestor.ControllerName) == commoncol.ControllerName {
statusMarker = &krtcollections.StatusMarker{}
break
}
}

pol := &ir.PolicyWrapper{
ObjectSource: objSrc,
Policy: policyCR,
Expand All @@ -242,14 +252,35 @@ func NewPlugin(ctx context.Context, commoncol *collections.CommonCollections, me
Errors: errors,
PrecedenceWeight: precedenceWeight,
}
return pol
return statusMarker, pol
})

// processMarkers for policies that have existing status but no current report
processMarkers := func(kctx krt.HandlerContext, reportMap *reports.ReportMap) {
objStatus := krt.Fetch(kctx, statusCol)
for _, status := range objStatus {
policyKey := reporter.PolicyKey{
Group: gk.Group,
Kind: gk.Kind,
Namespace: status.Obj.GetNamespace(),
Name: status.Obj.GetName(),
}

// Add empty status to clear stale status for policies with no valid targets
if reportMap.Policies[policyKey] == nil {
rp := reports.NewReporter(reportMap)
// create empty policy report entry with no ancestor refs
rp.Policy(policyKey, 0)
}
}
}

return sdk.Plugin{
ContributesPolicies: map[schema.GroupKind]sdk.PolicyPlugin{
wellknown.TrafficPolicyGVK.GroupKind(): {
NewGatewayTranslationPass: NewGatewayTranslationPass,
Policies: policyCol,
NewGatewayTranslationPass: NewGatewayTranslationPass,
Policies: policyCol,
ProcessPolicyStaleStatusMarkers: processMarkers,
MergePolicies: func(pols []ir.PolicyAtt) ir.PolicyAtt {
return policy.MergePolicies(pols, mergeTrafficPolicies, mergeSettings)
},
Expand Down
13 changes: 13 additions & 0 deletions internal/kgateway/proxy_syncer/proxy_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,13 @@ func (s *ProxySyncer) Init(ctx context.Context, krtopts krtutil.KrtOptions) {
s.backendPolicyReport = krt.NewSingleton(func(kctx krt.HandlerContext) *report {
backends := krt.Fetch(kctx, finalBackendsWithPolicyStatus)
merged := GenerateBackendPolicyReport(backends)

for _, plugin := range s.plugins.ContributesPolicies {
if plugin.ProcessPolicyStaleStatusMarkers != nil && plugin.ProcessBackend != nil {
plugin.ProcessPolicyStaleStatusMarkers(kctx, &merged)
}
}

return &report{merged}
}, krtopts.ToOptions("BackendsPolicyReport")...)

Expand All @@ -271,6 +278,12 @@ func (s *ProxySyncer) Init(ctx context.Context, krtopts krtutil.KrtOptions) {
objStatus := krt.Fetch(kctx, s.commonCols.Routes.GetHTTPRouteStatusMarkers())
s.commonCols.Routes.ProcessHTTPRouteStatusMarkers(objStatus, merged)

for _, plugin := range s.plugins.ContributesPolicies {
if plugin.ProcessPolicyStaleStatusMarkers != nil && plugin.ProcessBackend == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this done to exclude backend policy plugins?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, backend policy and traffic policies uses different status reporting queue

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a small concern with this approach, if we have plugins that contribute both backends and policy will we potentially lose this functionality?

Wondering how we can make this more explicit.

One option would be to have an explicit processBackendPolicyMarkers vs. processPolicyMarkers.

What do you think? Is that overkill?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the functionality will be fine as long as we make sure to report / generate its status in either backend or general status report, and given its gonna contribute to backend most likely it will be reported as a part of backend status reporting, which is what I have here.

We might need to have explicit processBackendPolicyMarkers vs processPolicyMarkers and detailed if check in that case, but right now I didn't want to add more fields into the plugin unless unnecessary, so I went with a check here. Not entirely a fan of checking the ProcessBackend either so if you think have separate is better we can do that

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with your assessment.

Given it's not likely we'll hit this specific scenario anytime soon, I think we can move forward with this as-is.

plugin.ProcessPolicyStaleStatusMarkers(kctx, &merged)
}
}

return &report{merged}
})

Expand Down
7 changes: 5 additions & 2 deletions pkg/pluginsdk/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/endpoints"
"github.com/kgateway-dev/kgateway/v2/pkg/pluginsdk/ir"
"github.com/kgateway-dev/kgateway/v2/pkg/pluginsdk/reporter"
"github.com/kgateway-dev/kgateway/v2/pkg/reports"
)

// ErrNotFound is returned when a requested resource is not found
Expand Down Expand Up @@ -64,8 +65,10 @@ type PolicyPlugin struct {
// Backend processing for agent gateway
ProcessAgentBackend func(pol ir.PolicyIR, in ir.BackendObjectIR) error

Policies krt.Collection[ir.PolicyWrapper]
GlobalPolicies func(krt.HandlerContext) ir.PolicyIR
Policies krt.Collection[ir.PolicyWrapper]
// ProcessPolicyStaleStatusMarkers add empty reports for policies to clear stale status
ProcessPolicyStaleStatusMarkers func(krt.HandlerContext, *reports.ReportMap)
GlobalPolicies func(krt.HandlerContext) ir.PolicyIR
// PoliciesFetch can optionally be set if the plugin needs a custom mechanism for fetching the policy IR,
// rather than the default behavior of fetching by name from the aggregated policy KRT collection
PoliciesFetch func(n, ns string) ir.PolicyIR
Expand Down
2 changes: 1 addition & 1 deletion pkg/reports/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (r *ReportMap) BuildPolicyStatus(
}

ancestorRefs := report.ancestorRefs()
status := gwv1.PolicyStatus{}
status := gwv1.PolicyStatus{Ancestors: make([]gwv1.PolicyAncestorStatus, 0, len(ancestorRefs))}

// Process the parent references to build the RouteParentStatus
for _, ancestorRef := range ancestorRefs {
Expand Down
Loading