Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/.env/nightly-tests/max_versions.env
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ kind_version='v0.27.0'
helm_version='v3.17.1'
argocd_version='v2.14.2'
istio_version='1.23.0'
kgateway_api_version='v1.2.0'
kgateway_api_version='v1.3.0'
2 changes: 1 addition & 1 deletion .github/workflows/pr-kubernetes-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
# May 19, 2025: ~19 minutes
- cluster-name: 'cluster-one'
go-test-args: '-v -timeout=25m'
go-test-run-regex: '^TestKgateway$$/^BasicRouting$$|^TestKgateway$$/^HTTPRouteServices$$|^TestKgateway$$/^TLSRouteServices$$|^TestKgateway$$/^GRPCRouteServices$$'
go-test-run-regex: '^TestKgateway$$/^BasicRouting$$|^TestKgateway$$/^HTTPRouteServices$$|^TestKgateway$$/^TLSRouteServices$$|^TestKgateway$$/^GRPCRouteServices$$|^TestListenerSet$$'
localstack: 'false'
# May 19, 2025: ~25 minutes
- cluster-name: 'cluster-two'
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/pr-unit-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
# TODO(tim): rename this job or consolidate with the other workflows.
name: projects/gateway2
runs-on: ubuntu-22.04
timeout-minutes: 15
timeout-minutes: 25
steps:
- uses: actions/checkout@v4
- name: Setup Go
Expand Down
2 changes: 2 additions & 0 deletions api/v1alpha1/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ package v1alpha1

// Gateway API resources with status management
// +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=gatewayclasses;gateways;httproutes;grpcroutes;tcproutes;tlsroutes;referencegrants;backendtlspolicies,verbs=get;list;watch
// +kubebuilder:rbac:groups=gateway.networking.x-k8s.io,resources=xlistenersets,verbs=get;list;watch
// +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=gatewayclasses/status;gateways/status;httproutes/status;grpcroutes/status;tcproutes/status;tlsroutes/status;backendtlspolicies/status,verbs=patch;update
// +kubebuilder:rbac:groups=gateway.networking.x-k8s.io,resources=xlistenersets/status,verbs=patch;update
// +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=gatewayclasses,verbs=create

// Controller resources
Expand Down
15 changes: 15 additions & 0 deletions install/helm/kgateway/templates/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,21 @@ rules:
- get
- list
- watch
- apiGroups:
- gateway.networking.x-k8s.io
resources:
- xlistenersets
verbs:
- get
- list
- watch
- apiGroups:
- gateway.networking.x-k8s.io
resources:
- xlistenersets/status
verbs:
- patch
- update
- apiGroups:
- networking.istio.io
resources:
Expand Down
49 changes: 43 additions & 6 deletions internal/kgateway/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import (
"context"
"fmt"

"istio.io/istio/pkg/kube/krt"
"istio.io/istio/pkg/kube/kubetypes"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -19,12 +21,15 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
infextv1a2 "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2"
apiv1 "sigs.k8s.io/gateway-api/apis/v1"

"github.com/kgateway-dev/kgateway/v2/api/v1alpha1"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/deployer"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/ir"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/wellknown"
common "github.com/kgateway-dev/kgateway/v2/pkg/pluginsdk/collections"
)

const (
Expand Down Expand Up @@ -68,6 +73,8 @@ type GatewayConfig struct {
ClassInfo map[string]*ClassInfo
// DiscoveryNamespaceFilter filters namespaced objects based on the discovery namespace filter.
DiscoveryNamespaceFilter kubetypes.DynamicObjectFilter
// CommonCollections used to fetch ir.Gateways for the deployer to generate the ports for the proxy service
CommonCollections *common.CommonCollections
}

func NewBaseGatewayController(ctx context.Context, cfg GatewayConfig) error {
Expand All @@ -77,8 +84,9 @@ func NewBaseGatewayController(ctx context.Context, cfg GatewayConfig) error {
controllerBuilder := &controllerBuilder{
cfg: cfg,
reconciler: &controllerReconciler{
cli: cfg.Mgr.GetClient(),
scheme: cfg.Mgr.GetScheme(),
cli: cfg.Mgr.GetClient(),
scheme: cfg.Mgr.GetScheme(),
customEvents: make(chan event.TypedGenericEvent[ir.Gateway], 1024),
},
}

Expand All @@ -105,8 +113,9 @@ func NewBaseInferencePoolController(ctx context.Context, poolCfg *InferencePoolC
cfg: *gwCfg,
poolCfg: poolCfg,
reconciler: &controllerReconciler{
cli: poolCfg.Mgr.GetClient(),
scheme: poolCfg.Mgr.GetScheme(),
cli: poolCfg.Mgr.GetClient(),
scheme: poolCfg.Mgr.GetScheme(),
customEvents: make(chan event.TypedGenericEvent[ir.Gateway], 1024),
},
}

Expand Down Expand Up @@ -173,6 +182,7 @@ func (c *controllerBuilder) watchGw(ctx context.Context) error {
IstioAutoMtlsEnabled: c.cfg.IstioAutoMtlsEnabled,
ControlPlane: c.cfg.ControlPlane,
ImageInfo: c.cfg.ImageInfo,
CommonCollections: c.cfg.CommonCollections,
})
if err != nil {
return err
Expand Down Expand Up @@ -224,6 +234,7 @@ func (c *controllerBuilder) watchGw(ctx context.Context) error {
}),
builder.WithPredicates(discoveryNamespaceFilterPredicate),
)

// watch for gatewayclasses managed by our controller and enqueue related gateways
buildr.Watches(
&apiv1.GatewayClass{},
Expand Down Expand Up @@ -260,6 +271,30 @@ func (c *controllerBuilder) watchGw(ctx context.Context) error {
),
)

// Trigger an event when the gateway changes. This can even be a change in listener sets attached to the gateway
go func() {
Copy link
Contributor Author

@davidjumani davidjumani May 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be in a goroutine ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, will create a goroutine under the hood

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we need to call Register only after krt has started. @stevenctl do you know if that issue was fixed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, it's letting me respond here. For posterity: I believe the only issue is if we need the initial state, and luckily in this case we do not.

c.cfg.CommonCollections.GatewayIndex.Gateways.Register(func(o krt.Event[ir.Gateway]) {
gw := o.Latest()
c.reconciler.customEvents <- event.TypedGenericEvent[ir.Gateway]{
Object: gw,
}
})
}()
buildr.WatchesRawSource(
// Add channel source for custom events
source.Channel(
c.reconciler.customEvents,
handler.TypedEnqueueRequestsFromMapFunc(func(ctx context.Context, obj ir.Gateway) []reconcile.Request {
// Convert the generic event to a reconcile request
return []reconcile.Request{
{
NamespacedName: types.NamespacedName{Namespace: obj.Namespace, Name: obj.Name},
},
}
}),
),
)

for _, gvk := range gvks {
obj, err := c.cfg.Mgr.GetScheme().New(gvk)
if err != nil {
Expand Down Expand Up @@ -379,6 +414,7 @@ func (c *controllerBuilder) watchInferencePool(ctx context.Context) error {
ControllerName: c.cfg.ControllerName,
ImageInfo: c.cfg.ImageInfo,
InferenceExtension: c.poolCfg.InferenceExt,
CommonCollections: c.cfg.CommonCollections,
})
if err != nil {
return err
Expand Down Expand Up @@ -440,8 +476,9 @@ func (c *controllerBuilder) watchGwClass(_ context.Context) error {
}

type controllerReconciler struct {
cli client.Client
scheme *runtime.Scheme
cli client.Client
scheme *runtime.Scheme
customEvents chan event.TypedGenericEvent[ir.Gateway]
}

func (r *controllerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
Expand Down
47 changes: 45 additions & 2 deletions internal/kgateway/controller/controller_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import (
"strings"
"testing"

"github.com/go-logr/logr"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"istio.io/istio/pkg/kube"
istiosets "istio.io/istio/pkg/util/sets"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -31,6 +33,14 @@ import (

"github.com/kgateway-dev/kgateway/v2/internal/kgateway/controller"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/deployer"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/extensions2/registry"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/extensions2/settings"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/krtcollections"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/setup"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/utils/krtutil"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/wellknown"
"github.com/kgateway-dev/kgateway/v2/pkg/client/clientset/versioned"
"github.com/kgateway-dev/kgateway/v2/pkg/pluginsdk/collections"
"github.com/kgateway-dev/kgateway/v2/pkg/schemes"
)

Expand Down Expand Up @@ -187,6 +197,8 @@ func createManager(
return nil, err
}

ctx, cancel := context.WithCancel(parentCtx)
kubeClient, _ := setup.CreateKubeClient(cfg)
gwCfg := controller.GatewayConfig{
Mgr: mgr,
ControllerName: gatewayControllerName,
Expand All @@ -196,8 +208,10 @@ func createManager(
Tag: "latest",
},
DiscoveryNamespaceFilter: fakeDiscoveryNamespaceFilter{},
CommonCollections: newCommonCols(ctx, kubeClient),
}
if err := controller.NewBaseGatewayController(parentCtx, gwCfg); err != nil {
cancel()
return nil, err
}

Expand All @@ -213,6 +227,7 @@ func createManager(
}

if err := controller.NewGatewayClassProvisioner(mgr, gatewayControllerName, classConfigs); err != nil {
cancel()
return nil, err
}

Expand All @@ -222,16 +237,44 @@ func createManager(
InferenceExt: inferenceExt,
}
if err := controller.NewBaseInferencePoolController(parentCtx, poolCfg, &gwCfg); err != nil {
cancel()
return nil, err
}

ctx, cancel := context.WithCancel(parentCtx)
go func() {
defer GinkgoRecover()
kubeconfig = generateKubeConfiguration(cfg)
mgr.GetLogger().Info("starting manager", "kubeconfig", kubeconfig)
Expect(mgr.Start(ctx)).ToNot(HaveOccurred())
}()

return cancel, nil
return func() {
cancel()
kubeClient.Shutdown()
}, nil
}

func newCommonCols(ctx context.Context, kubeClient kube.Client) *collections.CommonCollections {
krtopts := krtutil.NewKrtOptions(ctx.Done(), nil)
cli, err := versioned.NewForConfig(cfg)
if err != nil {
Expect(err).ToNot(HaveOccurred())
}

settings, err := settings.BuildSettings()
if err != nil {
Expect(err).ToNot(HaveOccurred())
}
commoncol, err := collections.NewCommonCollections(ctx, krtopts, kubeClient, cli, nil, wellknown.GatewayControllerName, logr.Discard(), *settings)
if err != nil {
Expect(err).ToNot(HaveOccurred())
}

plugins := registry.Plugins(ctx, commoncol)
plugins = append(plugins, krtcollections.NewBuiltinPlugin(ctx))
extensions := registry.MergePlugins(plugins...)

commoncol.InitPlugins(ctx, extensions)
kubeClient.RunAndWait(ctx.Done())
return commoncol
}
5 changes: 5 additions & 0 deletions internal/kgateway/controller/scheme.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
gwv1a2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
gwv1a3 "sigs.k8s.io/gateway-api/apis/v1alpha3"
gwv1b1 "sigs.k8s.io/gateway-api/apis/v1beta1"
gwxv1a1 "sigs.k8s.io/gateway-api/apisx/v1alpha1"

kgwv1a1 "github.com/kgateway-dev/kgateway/v2/api/v1alpha1"
)
Expand All @@ -24,6 +25,7 @@ var SchemeBuilder = runtime.SchemeBuilder{
gwv1a2.Install,
gwv1a3.Install,
gwv1b1.Install,
gwxv1a1.Install,

// Kubernetes Core resources
corev1.AddToScheme,
Expand Down Expand Up @@ -60,5 +62,8 @@ func GatewayScheme() *runtime.Scheme {
if err := gwv1b1.Install(s); err != nil {
panic(fmt.Sprintf("Failed to install gateway v1beta1 scheme: %v", err))
}
if err := gwxv1a1.Install(s); err != nil {
panic(fmt.Sprintf("Failed to install gateway experimental v1alpha1 scheme: %v", err))
}
return s
}
3 changes: 3 additions & 0 deletions internal/kgateway/controller/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type ControllerBuilder struct {
proxySyncer *proxy_syncer.ProxySyncer
cfg StartConfig
mgr ctrl.Manager
commoncol *common.CommonCollections

ready atomic.Bool
}
Expand Down Expand Up @@ -219,6 +220,7 @@ func NewControllerBuilder(ctx context.Context, cfg StartConfig) (*ControllerBuil
proxySyncer: proxySyncer,
cfg: cfg,
mgr: mgr,
commoncol: commoncol,
}

// wait for the ControllerBuilder to Start
Expand Down Expand Up @@ -279,6 +281,7 @@ func (c *ControllerBuilder) Start(ctx context.Context) error {
PullPolicy: globalSettings.DefaultImagePullPolicy,
},
DiscoveryNamespaceFilter: c.cfg.Client.ObjectFilter(),
CommonCollections: c.commoncol,
}

setupLog.Info("creating gateway class provisioner")
Expand Down
15 changes: 14 additions & 1 deletion internal/kgateway/deployer/deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ import (

"github.com/kgateway-dev/kgateway/v2/api/v1alpha1"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/helm"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/ir"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/wellknown"
"github.com/kgateway-dev/kgateway/v2/internal/version"
common "github.com/kgateway-dev/kgateway/v2/pkg/pluginsdk/collections"
)

var (
Expand Down Expand Up @@ -71,6 +73,7 @@ type Inputs struct {
ControlPlane ControlPlaneInfo
InferenceExtension *InferenceExtInfo
ImageInfo *ImageInfo
CommonCollections *common.CommonCollections
}

type ImageInfo struct {
Expand Down Expand Up @@ -302,13 +305,23 @@ func (d *Deployer) getGatewayClassFromGateway(ctx context.Context, gw *api.Gatew
}

func (d *Deployer) getValues(gw *api.Gateway, gwParam *v1alpha1.GatewayParameters) (*helmConfig, error) {
d.inputs.CommonCollections.GatewayIndex.Gateways.WaitUntilSynced(make(<-chan struct{}))

gwKey := ir.ObjectSource{
Group: wellknown.GatewayGVK.GroupKind().Group,
Kind: wellknown.GatewayGVK.GroupKind().Kind,
Name: gw.GetName(),
Namespace: gw.GetNamespace(),
}
irGW := d.inputs.CommonCollections.GatewayIndex.Gateways.GetKey(gwKey.ResourceName())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be possible that we miss the krt lookup and only have the api.Gateway here if we hit this path before translation finishes; we're guaranteed to eventually get it though

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this should work because the controller should start only after krt is warm, IIRC.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the WaitUntilSynced above in an attempt to fix that


// construct the default values
vals := &helmConfig{
Gateway: &helmGateway{
Name: &gw.Name,
GatewayName: &gw.Name,
GatewayNamespace: &gw.Namespace,
Ports: getPortsValues(gw, gwParam),
Ports: getPortsValues(irGW, gwParam),
Xds: &helmXds{
// The xds host/port MUST map to the Service definition for the Control Plane
// This is the socket address that the Proxy will connect to on startup, to receive xds updates
Expand Down
Loading