Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 29 additions & 31 deletions internal/controller/apisixroute_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (
"fmt"
"slices"

"github.com/api7/gopkg/pkg/log"
"github.com/go-logr/logr"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
networkingv1 "k8s.io/api/networking/v1"
Expand Down Expand Up @@ -284,13 +286,12 @@ func (r *ApisixRouteReconciler) validateBackends(ctx context.Context, tc *provid
var backends = make(map[types.NamespacedName]struct{})
for _, backend := range http.Backends {
var (
service = corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: backend.ServiceName,
Namespace: in.Namespace,
},
au apiv2.ApisixUpstream
service corev1.Service
serviceNN = types.NamespacedName{
Namespace: in.GetNamespace(),
Name: backend.ServiceName,
}
serviceNN = utils.NamespacedName(&service)
)
if _, ok := backends[serviceNN]; ok {
return ReasonError{
Expand All @@ -301,12 +302,24 @@ func (r *ApisixRouteReconciler) validateBackends(ctx context.Context, tc *provid
backends[serviceNN] = struct{}{}

if err := r.Get(ctx, serviceNN, &service); err != nil {
if err := client.IgnoreNotFound(err); err == nil {
if err = client.IgnoreNotFound(err); err == nil {
r.Log.Error(errors.New("service not found"), "Service", serviceNN)
continue
}
return err
}

// try to get apisixupstream with the same name as the backend service
log.Debugw("try to get apisixupstream with the same name as the backend service", zap.Stringer("Service", serviceNN))
if err := r.Get(ctx, serviceNN, &au); err != nil {
log.Debugw("no ApisixUpstream with the same name as the backend service found", zap.Stringer("Service", serviceNN), zap.Error(err))
if err = client.IgnoreNotFound(err); err != nil {
return err
}
} else {
tc.Upstreams[serviceNN] = &au
}

if service.Spec.Type == corev1.ServiceTypeExternalName {
tc.Services[serviceNN] = &service
continue
Expand Down Expand Up @@ -340,11 +353,7 @@ func (r *ApisixRouteReconciler) validateBackends(ctx context.Context, tc *provid

// backend.subset specifies a subset of upstream nodes.
// It specifies that the target pod's label should be a superset of the subset labels of the ApisixUpstream of the serviceName
subsetLabels, err := r.getSubsetLabels(ctx, in, backend)
if err != nil {
return err
}

subsetLabels := r.getSubsetLabels(tc, serviceNN, backend)
tc.EndpointSlices[serviceNN] = r.filterEndpointSlicesBySubsetLabels(ctx, endpoints.Items, subsetLabels)
}

Expand Down Expand Up @@ -513,7 +522,7 @@ func (r *ApisixRouteReconciler) listApisixRouteForApisixUpstream(ctx context.Con

var arList apiv2.ApisixRouteList
if err := r.List(ctx, &arList, client.MatchingFields{indexer.ApisixUpstreamRef: indexer.GenIndexKey(au.GetNamespace(), au.GetName())}); err != nil {
r.Log.Error(err, "failed to list ApisixUpstreams")
r.Log.Error(err, "failed to list ApisixRoutes")
return nil
}

Expand Down Expand Up @@ -570,35 +579,24 @@ func (r *ApisixRouteReconciler) listApisixRoutesForPluginConfig(ctx context.Cont
return pkgutils.DedupComparable(requests)
}

func (r *ApisixRouteReconciler) getSubsetLabels(ctx context.Context, ar *apiv2.ApisixRoute, backend apiv2.ApisixRouteHTTPBackend) (map[string]string, error) {
empty := make(map[string]string)
func (r *ApisixRouteReconciler) getSubsetLabels(tctx *provider.TranslateContext, auNN types.NamespacedName, backend apiv2.ApisixRouteHTTPBackend) map[string]string {
if backend.Subset == "" {
return empty, nil
return nil
}

// Try to Get the ApisixUpstream with the same name as backend.ServiceName
var (
auNN = types.NamespacedName{
Namespace: ar.GetNamespace(),
Name: backend.ServiceName,
}
au apiv2.ApisixUpstream
)
if err := r.Get(ctx, auNN, &au); err != nil {
if client.IgnoreNotFound(err) == nil {
return empty, nil
}
return nil, err
au, ok := tctx.Upstreams[auNN]
if !ok {
return nil
}

// try to get the subset labels from the ApisixUpstream subsets
for _, subset := range au.Spec.Subsets {
if backend.Subset == subset.Name {
return subset.Labels, nil
return subset.Labels
}
}

return empty, nil
return nil
}

func (r *ApisixRouteReconciler) filterEndpointSlicesBySubsetLabels(ctx context.Context, in []discoveryv1.EndpointSlice, labels map[string]string) []discoveryv1.EndpointSlice {
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ func ApisixRouteApisixUpstreamIndexFunc(obj client.Object) (keys []string) {
ar := obj.(*apiv2.ApisixRoute)
for _, rule := range ar.Spec.HTTP {
for _, backend := range rule.Backends {
if backend.Subset != "" && backend.ServiceName != "" {
if backend.ServiceName != "" {
keys = append(keys, GenIndexKey(ar.GetNamespace(), backend.ServiceName))
}
}
Expand Down
64 changes: 30 additions & 34 deletions internal/provider/adc/translator/apisixroute.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,28 +195,35 @@ func (t *Translator) buildRoute(ar *apiv2.ApisixRoute, service *adc.Service, rul

func (t *Translator) buildUpstream(tctx *provider.TranslateContext, service *adc.Service, ar *apiv2.ApisixRoute, rule apiv2.ApisixRouteHTTP) {
var (
upstream = adc.NewDefaultUpstream()
upstreams = make([]*adc.Upstream, 0)
weightedUpstreams = make([]adc.TrafficSplitConfigRuleWeightedUpstream, 0)
backendErr error
)

for _, backend := range rule.Backends {
var upNodes adc.UpstreamNodes
upstream := adc.NewDefaultUpstream()
// try to get the apisixupstream with the same name as the backend service to be upstream config.
// err is ignored because it does not care about the externalNodes of the apisixupstream.
auNN := types.NamespacedName{Namespace: ar.GetNamespace(), Name: backend.ServiceName}
if au, ok := tctx.Upstreams[auNN]; ok {
upstream, _ = t.translateApisixUpstream(tctx, au)
}

if backend.ResolveGranularity == "service" {
upNodes, backendErr = t.translateApisixRouteBackendResolveGranularityService(tctx, utils.NamespacedName(ar), backend)
upstream.Nodes, backendErr = t.translateApisixRouteBackendResolveGranularityService(tctx, utils.NamespacedName(ar), backend)
if backendErr != nil {
t.Log.Error(backendErr, "failed to translate ApisixRoute backend with ResolveGranularity Service")
continue
}
} else {
upNodes, backendErr = t.translateApisixRouteBackendResolveGranularityEndpoint(tctx, utils.NamespacedName(ar), backend)
upstream.Nodes, backendErr = t.translateApisixRouteBackendResolveGranularityEndpoint(tctx, utils.NamespacedName(ar), backend)
if backendErr != nil {
t.Log.Error(backendErr, "failed to translate ApisixRoute backend with ResolveGranularity Endpoint")
continue
}
}
upstream.Nodes = append(upstream.Nodes, upNodes...)

upstreams = append(upstreams, upstream)
}

for _, upstreamRef := range rule.Upstreams {
Expand All @@ -241,21 +248,26 @@ func (t *Translator) buildUpstream(tctx *provider.TranslateContext, service *adc
upstreams = append(upstreams, upstream)
}

// If no .http[].backends is used and only .http[].upstreams is used, the first valid upstream is used as service.upstream;
// Other upstreams are configured in the traffic-split plugin
if len(rule.Backends) == 0 && len(upstreams) > 0 {
upstream = upstreams[0]
upstreams = upstreams[1:]
// no valid upstream
if backendErr != nil || len(upstreams) == 0 || len(upstreams[0].Nodes) == 0 {
return
}

// set the default upstream's weight in traffic-split
weight, err := strconv.Atoi(upstream.Labels["meta_weight"])
if err != nil {
weight = apiv2.DefaultWeight
// the first valid upstream is used as service.upstream;
// the others are configured in the traffic-split plugin
service.Upstream = upstreams[0]
upstreams = upstreams[1:]

// set weight in traffic-split for the default upstream
if len(upstreams) > 0 {
weight, err := strconv.Atoi(service.Upstream.Labels["meta_weight"])
if err != nil {
weight = apiv2.DefaultWeight
}
weightedUpstreams = append(weightedUpstreams, adc.TrafficSplitConfigRuleWeightedUpstream{
Weight: weight,
})
}
weightedUpstreams = append(weightedUpstreams, adc.TrafficSplitConfigRuleWeightedUpstream{
Weight: weight,
})

// set others upstreams in traffic-split
for _, item := range upstreams {
Expand All @@ -269,11 +281,6 @@ func (t *Translator) buildUpstream(tctx *provider.TranslateContext, service *adc
})
}

// set service
service.Upstream = upstream
if backendErr != nil && len(upstream.Nodes) == 0 {
t.addFaultInjectionPlugin(service)
}
if len(weightedUpstreams) > 0 {
service.Plugins["traffic-split"] = &adc.TrafficSplitConfig{
Rules: []adc.TrafficSplitConfigRule{
Expand All @@ -291,21 +298,10 @@ func (t *Translator) buildService(ar *apiv2.ApisixRoute, rule apiv2.ApisixRouteH
service.ID = id.GenID(service.Name)
service.Labels = label.GenLabel(ar)
service.Hosts = rule.Match.Hosts
service.Upstream = adc.NewDefaultUpstream()
return service
}

func (t *Translator) addFaultInjectionPlugin(service *adc.Service) {
if service.Plugins == nil {
service.Plugins = make(map[string]any)
}
service.Plugins["fault-injection"] = map[string]any{
"abort": map[string]any{
"http_status": 500,
"body": "No existing backendRef provided",
},
}
}

func (t *Translator) translateApisixRouteBackendResolveGranularityService(tctx *provider.TranslateContext, arNN types.NamespacedName, backend apiv2.ApisixRouteHTTPBackend) (adc.UpstreamNodes, error) {
serviceNN := types.NamespacedName{
Namespace: arNN.Namespace,
Expand Down
75 changes: 70 additions & 5 deletions test/e2e/apisix/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package apisix

import (
"context"
"fmt"
"net"
"net/http"
Expand All @@ -26,6 +27,7 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"

apiv2 "github.com/apache/apisix-ingress-controller/api/v2"
"github.com/apache/apisix-ingress-controller/test/e2e/framework"
Expand Down Expand Up @@ -247,11 +249,7 @@ spec:
var apisixRoute apiv2.ApisixRoute
applier.MustApplyAPIv2(types.NamespacedName{Namespace: s.Namespace(), Name: "default"}, &apisixRoute, fmt.Sprintf(apisixRouteSpec, "/get"))

By("when there is no replica got 500 by fault-injection")
err := s.ScaleHTTPBIN(0)
Expect(err).ShouldNot(HaveOccurred(), "scale httpbin to 0")
Eventually(request).WithArguments("/get").WithTimeout(8 * time.Second).ProbeEvery(time.Second).Should(Equal(http.StatusInternalServerError))
s.NewAPISIXClient().GET("/get").WithHost("httpbin").Expect().Body().IsEqual("No existing backendRef provided")
Eventually(request).WithArguments("/get").WithTimeout(8 * time.Second).ProbeEvery(time.Second).Should(Equal(http.StatusServiceUnavailable))
})

It("Test ApisixRoute resolveGranularity", func() {
Expand Down Expand Up @@ -501,5 +499,72 @@ spec:
Eventually(upstreamAddrs).Should(HaveKey(endpoint))
Eventually(upstreamAddrs).Should(HaveKey(clusterIP))
})

It("Test backend implicit reference to apisixupstream", func() {
var err error

const apisixRouteSpec = `
apiVersion: apisix.apache.org/v2
kind: ApisixRoute
metadata:
name: default
spec:
ingressClassName: apisix
http:
- name: rule0
match:
hosts:
- httpbin
paths:
- /*
backends:
- serviceName: httpbin-service-e2e-test
servicePort: 80
plugins:
- name: response-rewrite
enable: true
config:
headers:
set:
"X-Upstream-Host": "$upstream_host"

`
const apisixUpstreamSpec = `
apiVersion: apisix.apache.org/v2
kind: ApisixUpstream
metadata:
name: httpbin-service-e2e-test
spec:
ingressClassName: apisix
passHost: rewrite
upstreamHost: hello.httpbin.org
loadbalancer:
type: "chash"
hashOn: "vars"
key: "server_name"
`
expectUpstreamHostIs := func(expectedUpstreamHost string) func(ctx context.Context) (bool, error) {
return func(ctx context.Context) (done bool, err error) {
resp := s.NewAPISIXClient().GET("/get").WithHost("httpbin").Expect().Raw()
return resp.StatusCode == http.StatusOK && resp.Header.Get("X-Upstream-Host") == expectedUpstreamHost, nil
}
}

By("apply apisixroute")
applier.MustApplyAPIv2(types.NamespacedName{Namespace: s.Namespace(), Name: "default"}, new(apiv2.ApisixRoute), apisixRouteSpec)

By("verify ApisixRoute works")
// expect upstream host is "httpbin"
err = wait.PollUntilContextTimeout(context.Background(), time.Second, 10*time.Second, true, expectUpstreamHostIs("httpbin"))
Expect(err).ShouldNot(HaveOccurred(), "verify ApisixRoute works")

By("apply apisixupstream")
applier.MustApplyAPIv2(types.NamespacedName{Namespace: s.Namespace(), Name: "httpbin-service-e2e-test"}, new(apiv2.ApisixUpstream), apisixUpstreamSpec)

By("verify backend implicit reference to apisixupstream works")
// expect upstream host is "hello.httpbin.org" which is rewritten by the apisixupstream
err = wait.PollUntilContextTimeout(context.Background(), time.Second, 10*time.Second, true, expectUpstreamHostIs("hello.httpbin.org"))
Expect(err).ShouldNot(HaveOccurred(), "check apisixupstream is referenced")
})
})
})
18 changes: 0 additions & 18 deletions test/e2e/apiv2/apisixconsumer.go

This file was deleted.

18 changes: 0 additions & 18 deletions test/e2e/apiv2/apisixglobalrule.go

This file was deleted.

Loading
Loading