Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 157 additions & 0 deletions internal/controller/gatewayproxy_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package controller

import (
"context"
"errors"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
networkingv1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
v1 "sigs.k8s.io/gateway-api/apis/v1"

"github.com/apache/apisix-ingress-controller/api/v1alpha1"
"github.com/apache/apisix-ingress-controller/internal/controller/indexer"
"github.com/apache/apisix-ingress-controller/internal/provider"
"github.com/apache/apisix-ingress-controller/internal/utils"
)

// GatewayProxyController reconciles a GatewayProxy object.
type GatewayProxyController struct {
client.Client

Scheme *runtime.Scheme
Log logr.Logger
Provider provider.Provider
}

func (r *GatewayProxyController) SetupWithManager(mrg ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mrg).
For(&v1alpha1.GatewayProxy{}).
Watches(&corev1.Service{},
handler.EnqueueRequestsFromMapFunc(r.listGatewayProxiesForProviderService),
).
Watches(&discoveryv1.EndpointSlice{},
handler.EnqueueRequestsFromMapFunc(r.listGatewayProxiesForProviderEndpointSlice),
).
Watches(&corev1.Secret{},
handler.EnqueueRequestsFromMapFunc(r.listGatewayProxiesForSecret),
).
Complete(r)
}

func (r *GatewayProxyController) Reconcile(ctx context.Context, req ctrl.Request) (reconcile.Result, error) {
var gp v1alpha1.GatewayProxy
if err := r.Get(ctx, req.NamespacedName, &gp); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}

var tctx = provider.NewDefaultTranslateContext(ctx)

// if there is no provider, update with empty translate context
if gp.Spec.Provider == nil || gp.Spec.Provider.ControlPlane == nil {
return reconcile.Result{}, r.Provider.Update(ctx, tctx, &gp)
}

// process endpoints for provider service
providerService := gp.Spec.Provider.ControlPlane.Service
if providerService == nil {
tctx.EndpointSlices[req.NamespacedName] = nil
} else {
if err := addProviderEndpointsToTranslateContext(tctx, r.Client, types.NamespacedName{
Namespace: gp.Namespace,
Name: providerService.Name,
}); err != nil {
return reconcile.Result{}, err
}
}

// process secret for provider auth
auth := gp.Spec.Provider.ControlPlane.Auth
if auth.AdminKey != nil && auth.AdminKey.ValueFrom != nil && auth.AdminKey.ValueFrom.SecretKeyRef != nil {
var (
secret corev1.Secret
secretNN = types.NamespacedName{
Namespace: gp.GetNamespace(),
Name: auth.AdminKey.ValueFrom.SecretKeyRef.Name,
}
)
if err := r.Get(ctx, secretNN, &secret); err != nil {
r.Log.Error(err, "failed to get secret", "secret", secretNN)
return reconcile.Result{}, err
}
tctx.Secrets[secretNN] = &secret
}

// list Gateways that reference the GatewayProxy
var (
gatewayList v1.GatewayList
ingressClassList networkingv1.IngressClassList
indexKey = indexer.GenIndexKey(gp.GetNamespace(), gp.GetName())
)
if err := r.List(ctx, &gatewayList, client.MatchingFields{indexer.ParametersRef: indexKey}); err != nil {
r.Log.Error(err, "failed to list GatewayList")
return ctrl.Result{}, nil
}

// list IngressClasses that reference the GatewayProxy
if err := r.List(ctx, &ingressClassList, client.MatchingFields{indexer.IngressClassParametersRef: indexKey}); err != nil {
r.Log.Error(err, "failed to list GatewayList")
return reconcile.Result{}, err
}

// append referrers to tanslate context
for _, item := range gatewayList.Items {
tctx.GatewayProxyReferrers[req.NamespacedName] = append(tctx.GatewayProxyReferrers[req.NamespacedName], utils.NamespacedNameKind(&item))
}
for _, item := range ingressClassList.Items {
tctx.GatewayProxyReferrers[req.NamespacedName] = append(tctx.GatewayProxyReferrers[req.NamespacedName], utils.NamespacedNameKind(&item))
}

if err := r.Provider.Update(ctx, tctx, &gp); err != nil {
return reconcile.Result{}, err
}

return reconcile.Result{}, nil
}

func (r *GatewayProxyController) listGatewayProxiesForProviderService(ctx context.Context, obj client.Object) (requests []reconcile.Request) {
service, ok := obj.(*corev1.Service)
if !ok {
r.Log.Error(errors.New("unexpected object type"), "failed to convert object to Service")
return nil
}

return ListRequests(ctx, r.Client, r.Log, &v1alpha1.GatewayProxyList{}, client.MatchingFields{
indexer.ServiceIndexRef: indexer.GenIndexKey(service.GetNamespace(), service.GetName()),
})
}

func (r *GatewayProxyController) listGatewayProxiesForProviderEndpointSlice(ctx context.Context, obj client.Object) (requests []reconcile.Request) {
endpointSlice, ok := obj.(*discoveryv1.EndpointSlice)
if !ok {
r.Log.Error(errors.New("unexpected object type"), "failed to convert object to EndpointSlice")
return nil
}

return ListRequests(ctx, r.Client, r.Log, &v1alpha1.GatewayProxyList{}, client.MatchingFields{
indexer.ServiceIndexRef: indexer.GenIndexKey(endpointSlice.GetNamespace(), endpointSlice.Labels[discoveryv1.LabelServiceName]),
})
}

func (r *GatewayProxyController) listGatewayProxiesForSecret(ctx context.Context, object client.Object) []reconcile.Request {
secret, ok := object.(*corev1.Secret)
if !ok {
r.Log.Error(errors.New("unexpected object type"), "failed to convert object to Secret")
return nil
}
return ListRequests(ctx, r.Client, r.Log, &v1alpha1.GatewayProxyList{}, client.MatchingFields{
indexer.SecretIndexRef: indexer.GenIndexKey(secret.GetNamespace(), secret.GetName()),
})
}
20 changes: 20 additions & 0 deletions internal/controller/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,15 @@ func setupIngressClassIndexer(mgr ctrl.Manager) error {
}

func setupGatewayProxyIndexer(mgr ctrl.Manager) error {
if err := mgr.GetFieldIndexer().IndexField(
context.Background(),
&v1alpha1.GatewayProxy{},
ServiceIndexRef,
GatewayProxyserviceIndexFunc,
); err != nil {
return err
}

if err := mgr.GetFieldIndexer().IndexField(
context.Background(),
&v1alpha1.GatewayProxy{},
Expand Down Expand Up @@ -272,6 +281,17 @@ func setupGatewayClassIndexer(mgr ctrl.Manager) error {
)
}

func GatewayProxyserviceIndexFunc(rawOjb client.Object) []string {
gatewayProxy := rawOjb.(*v1alpha1.GatewayProxy)
if gatewayProxy.Spec.Provider != nil &&
gatewayProxy.Spec.Provider.ControlPlane != nil &&
gatewayProxy.Spec.Provider.ControlPlane.Service != nil {
service := gatewayProxy.Spec.Provider.ControlPlane.Service
return []string{GenIndexKey(gatewayProxy.GetNamespace(), service.Name)}
}
return nil
}

func GatewayProxySecretIndexFunc(rawObj client.Object) []string {
gatewayProxy := rawObj.(*v1alpha1.GatewayProxy)
secretKeys := make([]string, 0)
Expand Down
6 changes: 6 additions & 0 deletions internal/manager/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,5 +168,11 @@ func setupControllers(ctx context.Context, mgr manager.Manager, pro provider.Pro
Log: ctrl.LoggerFrom(ctx).WithName("controllers").WithName("ApisixUpstream"),
Updater: updater,
},
&controller.GatewayProxyController{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Log: ctrl.LoggerFrom(ctx).WithName("controllers").WithName("GatewayProxy"),
Provider: pro,
},
}, nil
}
2 changes: 2 additions & 0 deletions internal/provider/adc/adc.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ func (d *adcClient) Update(ctx context.Context, tctx *provider.TranslateContext,
case *apiv2.ApisixConsumer:
result, err = d.translator.TranslateApisixConsumer(tctx, t.DeepCopy())
resourceTypes = append(resourceTypes, "consumer")
case *v1alpha1.GatewayProxy:
return d.updateConfigForGatewayProxy(tctx, t)
}
if err != nil {
return err
Expand Down
44 changes: 36 additions & 8 deletions internal/provider/adc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,22 @@
package adc

import (
"errors"
"fmt"
"net"
"slices"
"strconv"

"github.com/api7/gopkg/pkg/log"
"github.com/pkg/errors"
"go.uber.org/zap"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
v1 "sigs.k8s.io/gateway-api/apis/v1"
gatewayv1 "sigs.k8s.io/gateway-api/apis/v1"

"github.com/apache/apisix-ingress-controller/api/v1alpha1"
"github.com/apache/apisix-ingress-controller/internal/provider"
"github.com/apache/apisix-ingress-controller/internal/types"
"github.com/apache/apisix-ingress-controller/internal/utils"
)

func (d *adcClient) getConfigsForGatewayProxy(tctx *provider.TranslateContext, gatewayProxy *v1alpha1.GatewayProxy) (*adcConfig, error) {
Expand Down Expand Up @@ -87,17 +89,17 @@ func (d *adcClient) getConfigsForGatewayProxy(tctx *provider.TranslateContext, g
}
_, ok := tctx.Services[namespacedName]
if !ok {
return nil, errors.Errorf("no service found for service reference: %s", namespacedName)
return nil, fmt.Errorf("no service found for service reference: %s", namespacedName)
}
endpoint := tctx.EndpointSlices[namespacedName]
if endpoint == nil {
return nil, nil
}
upstreamNodes, err := d.translator.TranslateBackendRef(tctx, v1.BackendRef{
BackendObjectReference: v1.BackendObjectReference{
Name: v1.ObjectName(provider.ControlPlane.Service.Name),
Namespace: (*v1.Namespace)(&gatewayProxy.Namespace),
Port: ptr.To(v1.PortNumber(provider.ControlPlane.Service.Port)),
upstreamNodes, err := d.translator.TranslateBackendRef(tctx, gatewayv1.BackendRef{
BackendObjectReference: gatewayv1.BackendObjectReference{
Name: gatewayv1.ObjectName(provider.ControlPlane.Service.Name),
Namespace: (*gatewayv1.Namespace)(&gatewayProxy.Namespace),
Port: ptr.To(gatewayv1.PortNumber(provider.ControlPlane.Service.Port)),
},
})
if err != nil {
Expand Down Expand Up @@ -167,6 +169,32 @@ func (d *adcClient) updateConfigs(rk types.NamespacedNameKind, tctx *provider.Tr
return nil
}

// updateConfigForGatewayProxy update config for all referrers of the GatewayProxy
func (d *adcClient) updateConfigForGatewayProxy(tctx *provider.TranslateContext, gp *v1alpha1.GatewayProxy) error {
d.Lock()
defer d.Unlock()

config, err := d.getConfigsForGatewayProxy(tctx, gp)
if err != nil {
return err
}

referrers := tctx.GatewayProxyReferrers[utils.NamespacedName(gp)]

if config == nil {
for _, ref := range referrers {
delete(d.configs, ref)
}
return nil
}

for _, ref := range referrers {
d.configs[ref] = *config
}

return nil
}

func (d *adcClient) findConfigsToDelete(oldParentRefs, newParentRefs []types.NamespacedNameKind) []adcConfig {
var deleteConfigs []adcConfig
for _, parentRef := range oldParentRefs {
Expand Down
5 changes: 4 additions & 1 deletion internal/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ type TranslateContext struct {
Upstreams map[k8stypes.NamespacedName]*apiv2.ApisixUpstream
GatewayProxies map[types.NamespacedNameKind]v1alpha1.GatewayProxy
ResourceParentRefs map[types.NamespacedNameKind][]types.NamespacedNameKind
HTTPRoutePolicies []v1alpha1.HTTPRoutePolicy
// GatewayProxyReferrers key is GatewayProxy, value is a list of resources that reference this GatewayProxy
GatewayProxyReferrers map[k8stypes.NamespacedName][]types.NamespacedNameKind
HTTPRoutePolicies []v1alpha1.HTTPRoutePolicy

StatusUpdaters []status.Update
}
Expand All @@ -72,5 +74,6 @@ func NewDefaultTranslateContext(ctx context.Context) *TranslateContext {
Upstreams: make(map[k8stypes.NamespacedName]*apiv2.ApisixUpstream),
GatewayProxies: make(map[types.NamespacedNameKind]v1alpha1.GatewayProxy),
ResourceParentRefs: make(map[types.NamespacedNameKind][]types.NamespacedNameKind),
GatewayProxyReferrers: make(map[k8stypes.NamespacedName][]types.NamespacedNameKind),
}
}
Loading
Loading