Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/.env/nightly-tests/max_versions.env
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ kind_version='v0.27.0'
helm_version='v3.17.1'
argocd_version='v2.14.2'
istio_version='1.23.0'
kgateway_api_version='v1.2.0'
kgateway_api_version='v1.3.0'
2 changes: 1 addition & 1 deletion .github/workflows/pr-kubernetes-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
# May 19, 2025: ~19 minutes
- cluster-name: 'cluster-one'
go-test-args: '-v -timeout=25m'
go-test-run-regex: '^TestKgateway$$/^BasicRouting$$|^TestKgateway$$/^HTTPRouteServices$$|^TestKgateway$$/^TLSRouteServices$$|^TestKgateway$$/^GRPCRouteServices$$'
go-test-run-regex: '^TestKgateway$$/^BasicRouting$$|^TestKgateway$$/^HTTPRouteServices$$|^TestKgateway$$/^TLSRouteServices$$|^TestKgateway$$/^GRPCRouteServices$$|^TestListenerSet$$'
localstack: 'false'
# May 19, 2025: ~25 minutes
- cluster-name: 'cluster-two'
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/pr-unit-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
# TODO(tim): rename this job or consolidate with the other workflows.
name: projects/gateway2
runs-on: ubuntu-22.04
timeout-minutes: 15
timeout-minutes: 25
steps:
- uses: actions/checkout@v4
- name: Setup Go
Expand Down
2 changes: 2 additions & 0 deletions api/v1alpha1/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ package v1alpha1

// Gateway API resources with status management
// +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=gatewayclasses;gateways;httproutes;grpcroutes;tcproutes;tlsroutes;referencegrants;backendtlspolicies,verbs=get;list;watch
// +kubebuilder:rbac:groups=gateway.networking.x-k8s.io,resources=xlistenersets,verbs=get;list;watch
// +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=gatewayclasses/status;gateways/status;httproutes/status;grpcroutes/status;tcproutes/status;tlsroutes/status;backendtlspolicies/status,verbs=patch;update
// +kubebuilder:rbac:groups=gateway.networking.x-k8s.io,resources=xlistenersets/status,verbs=patch;update
// +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=gatewayclasses,verbs=create

// Controller resources
Expand Down
15 changes: 15 additions & 0 deletions install/helm/kgateway/templates/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,21 @@ rules:
- get
- list
- watch
- apiGroups:
- gateway.networking.x-k8s.io
resources:
- xlistenersets
verbs:
- get
- list
- watch
- apiGroups:
- gateway.networking.x-k8s.io
resources:
- xlistenersets/status
verbs:
- patch
- update
- apiGroups:
- networking.istio.io
resources:
Expand Down
47 changes: 41 additions & 6 deletions internal/kgateway/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import (
"context"
"fmt"

"istio.io/istio/pkg/kube/krt"
"istio.io/istio/pkg/kube/kubetypes"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -19,12 +21,15 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
infextv1a2 "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2"
apiv1 "sigs.k8s.io/gateway-api/apis/v1"

"github.com/kgateway-dev/kgateway/v2/api/v1alpha1"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/deployer"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/ir"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/wellknown"
common "github.com/kgateway-dev/kgateway/v2/pkg/pluginsdk/collections"
)

const (
Expand Down Expand Up @@ -68,6 +73,8 @@ type GatewayConfig struct {
ClassInfo map[string]*ClassInfo
// DiscoveryNamespaceFilter filters namespaced objects based on the discovery namespace filter.
DiscoveryNamespaceFilter kubetypes.DynamicObjectFilter
// CommonCollections used to fetch ir.Gateways for the deployer to generate the ports for the proxy service
CommonCollections *common.CommonCollections
}

func NewBaseGatewayController(ctx context.Context, cfg GatewayConfig) error {
Expand All @@ -77,8 +84,9 @@ func NewBaseGatewayController(ctx context.Context, cfg GatewayConfig) error {
controllerBuilder := &controllerBuilder{
cfg: cfg,
reconciler: &controllerReconciler{
cli: cfg.Mgr.GetClient(),
scheme: cfg.Mgr.GetScheme(),
cli: cfg.Mgr.GetClient(),
scheme: cfg.Mgr.GetScheme(),
customEvents: make(chan event.TypedGenericEvent[ir.Gateway], 1024),
},
}

Expand All @@ -105,8 +113,9 @@ func NewBaseInferencePoolController(ctx context.Context, poolCfg *InferencePoolC
cfg: *gwCfg,
poolCfg: poolCfg,
reconciler: &controllerReconciler{
cli: poolCfg.Mgr.GetClient(),
scheme: poolCfg.Mgr.GetScheme(),
cli: poolCfg.Mgr.GetClient(),
scheme: poolCfg.Mgr.GetScheme(),
customEvents: make(chan event.TypedGenericEvent[ir.Gateway], 1024),
},
}

Expand Down Expand Up @@ -173,6 +182,7 @@ func (c *controllerBuilder) watchGw(ctx context.Context) error {
IstioAutoMtlsEnabled: c.cfg.IstioAutoMtlsEnabled,
ControlPlane: c.cfg.ControlPlane,
ImageInfo: c.cfg.ImageInfo,
CommonCollections: c.cfg.CommonCollections,
})
if err != nil {
return err
Expand Down Expand Up @@ -224,6 +234,7 @@ func (c *controllerBuilder) watchGw(ctx context.Context) error {
}),
builder.WithPredicates(discoveryNamespaceFilterPredicate),
)

// watch for gatewayclasses managed by our controller and enqueue related gateways
buildr.Watches(
&apiv1.GatewayClass{},
Expand Down Expand Up @@ -260,6 +271,28 @@ func (c *controllerBuilder) watchGw(ctx context.Context) error {
),
)

// Trigger an event when the gateway changes. This can even be a change in listener sets attached to the gateway
c.cfg.CommonCollections.GatewayIndex.Gateways.Register(func(o krt.Event[ir.Gateway]) {
gw := o.Latest()
c.reconciler.customEvents <- event.TypedGenericEvent[ir.Gateway]{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider adding select with default clause; so if the channel is full it won't get stuck.
i think the channel would then just need capacity of 1

Copy link
Contributor Author

@davidjumani davidjumani May 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would that lead to missed events when a gateway is modified ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KRT handles a slow/blocking callback internally; it won't make other Register/dependent collections get stuck.
While we can do something like go func() { events <- ev }() here, I think the current code is fine.

Object: gw,
}
})
buildr.WatchesRawSource(
// Add channel source for custom events
source.Channel(
c.reconciler.customEvents,
handler.TypedEnqueueRequestsFromMapFunc(func(ctx context.Context, obj ir.Gateway) []reconcile.Request {
// Convert the generic event to a reconcile request
return []reconcile.Request{
{
NamespacedName: types.NamespacedName{Namespace: obj.Namespace, Name: obj.Name},
},
}
}),
),
)

for _, gvk := range gvks {
obj, err := c.cfg.Mgr.GetScheme().New(gvk)
if err != nil {
Expand Down Expand Up @@ -379,6 +412,7 @@ func (c *controllerBuilder) watchInferencePool(ctx context.Context) error {
ControllerName: c.cfg.ControllerName,
ImageInfo: c.cfg.ImageInfo,
InferenceExtension: c.poolCfg.InferenceExt,
CommonCollections: c.cfg.CommonCollections,
})
if err != nil {
return err
Expand Down Expand Up @@ -440,8 +474,9 @@ func (c *controllerBuilder) watchGwClass(_ context.Context) error {
}

type controllerReconciler struct {
cli client.Client
scheme *runtime.Scheme
cli client.Client
scheme *runtime.Scheme
customEvents chan event.TypedGenericEvent[ir.Gateway]
}

func (r *controllerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
Expand Down
47 changes: 45 additions & 2 deletions internal/kgateway/controller/controller_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import (
"strings"
"testing"

"github.com/go-logr/logr"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"istio.io/istio/pkg/kube"
istiosets "istio.io/istio/pkg/util/sets"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -31,6 +33,14 @@ import (

"github.com/kgateway-dev/kgateway/v2/internal/kgateway/controller"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/deployer"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/extensions2/registry"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/extensions2/settings"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/krtcollections"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/setup"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/utils/krtutil"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/wellknown"
"github.com/kgateway-dev/kgateway/v2/pkg/client/clientset/versioned"
"github.com/kgateway-dev/kgateway/v2/pkg/pluginsdk/collections"
"github.com/kgateway-dev/kgateway/v2/pkg/schemes"
)

Expand Down Expand Up @@ -187,6 +197,8 @@ func createManager(
return nil, err
}

ctx, cancel := context.WithCancel(parentCtx)
kubeClient, _ := setup.CreateKubeClient(cfg)
gwCfg := controller.GatewayConfig{
Mgr: mgr,
ControllerName: gatewayControllerName,
Expand All @@ -196,8 +208,10 @@ func createManager(
Tag: "latest",
},
DiscoveryNamespaceFilter: fakeDiscoveryNamespaceFilter{},
CommonCollections: newCommonCols(ctx, kubeClient),
}
if err := controller.NewBaseGatewayController(parentCtx, gwCfg); err != nil {
cancel()
return nil, err
}

Expand All @@ -213,6 +227,7 @@ func createManager(
}

if err := controller.NewGatewayClassProvisioner(mgr, gatewayControllerName, classConfigs); err != nil {
cancel()
return nil, err
}

Expand All @@ -222,16 +237,44 @@ func createManager(
InferenceExt: inferenceExt,
}
if err := controller.NewBaseInferencePoolController(parentCtx, poolCfg, &gwCfg); err != nil {
cancel()
return nil, err
}

ctx, cancel := context.WithCancel(parentCtx)
go func() {
defer GinkgoRecover()
kubeconfig = generateKubeConfiguration(cfg)
mgr.GetLogger().Info("starting manager", "kubeconfig", kubeconfig)
Expect(mgr.Start(ctx)).ToNot(HaveOccurred())
}()

return cancel, nil
return func() {
cancel()
kubeClient.Shutdown()
}, nil
}

func newCommonCols(ctx context.Context, kubeClient kube.Client) *collections.CommonCollections {
krtopts := krtutil.NewKrtOptions(ctx.Done(), nil)
cli, err := versioned.NewForConfig(cfg)
if err != nil {
Expect(err).ToNot(HaveOccurred())
}

settings, err := settings.BuildSettings()
if err != nil {
Expect(err).ToNot(HaveOccurred())
}
commoncol, err := collections.NewCommonCollections(ctx, krtopts, kubeClient, cli, nil, wellknown.GatewayControllerName, logr.Discard(), *settings)
if err != nil {
Expect(err).ToNot(HaveOccurred())
}

plugins := registry.Plugins(ctx, commoncol)
plugins = append(plugins, krtcollections.NewBuiltinPlugin(ctx))
extensions := registry.MergePlugins(plugins...)

commoncol.InitPlugins(ctx, extensions)
kubeClient.RunAndWait(ctx.Done())
return commoncol
}
5 changes: 5 additions & 0 deletions internal/kgateway/controller/scheme.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
gwv1a2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
gwv1a3 "sigs.k8s.io/gateway-api/apis/v1alpha3"
gwv1b1 "sigs.k8s.io/gateway-api/apis/v1beta1"
gwxv1a1 "sigs.k8s.io/gateway-api/apisx/v1alpha1"

kgwv1a1 "github.com/kgateway-dev/kgateway/v2/api/v1alpha1"
)
Expand All @@ -24,6 +25,7 @@ var SchemeBuilder = runtime.SchemeBuilder{
gwv1a2.Install,
gwv1a3.Install,
gwv1b1.Install,
gwxv1a1.Install,

// Kubernetes Core resources
corev1.AddToScheme,
Expand Down Expand Up @@ -60,5 +62,8 @@ func GatewayScheme() *runtime.Scheme {
if err := gwv1b1.Install(s); err != nil {
panic(fmt.Sprintf("Failed to install gateway v1beta1 scheme: %v", err))
}
if err := gwxv1a1.Install(s); err != nil {
panic(fmt.Sprintf("Failed to install gateway experimental v1alpha1 scheme: %v", err))
}
return s
}
3 changes: 3 additions & 0 deletions internal/kgateway/controller/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type ControllerBuilder struct {
proxySyncer *proxy_syncer.ProxySyncer
cfg StartConfig
mgr ctrl.Manager
commoncol *common.CommonCollections

ready atomic.Bool
}
Expand Down Expand Up @@ -219,6 +220,7 @@ func NewControllerBuilder(ctx context.Context, cfg StartConfig) (*ControllerBuil
proxySyncer: proxySyncer,
cfg: cfg,
mgr: mgr,
commoncol: commoncol,
}

// wait for the ControllerBuilder to Start
Expand Down Expand Up @@ -279,6 +281,7 @@ func (c *ControllerBuilder) Start(ctx context.Context) error {
PullPolicy: globalSettings.DefaultImagePullPolicy,
},
DiscoveryNamespaceFilter: c.cfg.Client.ObjectFilter(),
CommonCollections: c.commoncol,
}

setupLog.Info("creating gateway class provisioner")
Expand Down
13 changes: 12 additions & 1 deletion internal/kgateway/deployer/deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ import (

"github.com/kgateway-dev/kgateway/v2/api/v1alpha1"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/helm"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/ir"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/wellknown"
"github.com/kgateway-dev/kgateway/v2/internal/version"
common "github.com/kgateway-dev/kgateway/v2/pkg/pluginsdk/collections"
)

var (
Expand Down Expand Up @@ -71,6 +73,7 @@ type Inputs struct {
ControlPlane ControlPlaneInfo
InferenceExtension *InferenceExtInfo
ImageInfo *ImageInfo
CommonCollections *common.CommonCollections
}

type ImageInfo struct {
Expand Down Expand Up @@ -302,13 +305,21 @@ func (d *Deployer) getGatewayClassFromGateway(ctx context.Context, gw *api.Gatew
}

func (d *Deployer) getValues(gw *api.Gateway, gwParam *v1alpha1.GatewayParameters) (*helmConfig, error) {
gwKey := ir.ObjectSource{
Group: wellknown.GatewayGVK.GroupKind().Group,
Kind: wellknown.GatewayGVK.GroupKind().Kind,
Name: gw.GetName(),
Namespace: gw.GetNamespace(),
}
irGW := d.inputs.CommonCollections.GatewayIndex.Gateways.GetKey(gwKey.ResourceName())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be possible that we miss the krt lookup and only have the api.Gateway here if we hit this path before translation finishes; we're guaranteed to eventually get it though

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this should work because the controller should start only after krt is warm, IIRC.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the WaitUntilSynced above in an attempt to fix that


// construct the default values
vals := &helmConfig{
Gateway: &helmGateway{
Name: &gw.Name,
GatewayName: &gw.Name,
GatewayNamespace: &gw.Namespace,
Ports: getPortsValues(gw, gwParam),
Ports: getPortsValues(irGW, gwParam),
Xds: &helmXds{
// The xds host/port MUST map to the Service definition for the Control Plane
// This is the socket address that the Proxy will connect to on startup, to receive xds updates
Expand Down
Loading
Loading