Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ rules:
- gateways
- grpcroutes
- httproutes
- tcproutes
- referencegrants
- tcproutes
verbs:
- get
- list
Expand Down
76 changes: 53 additions & 23 deletions internal/adc/translator/apisixroute.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,30 +211,12 @@ func (t *Translator) buildUpstream(tctx *provider.TranslateContext, service *adc
)

for _, backend := range rule.Backends {
var backendErr error
upstream := adc.NewDefaultUpstream()
// try to get the apisixupstream with the same name as the backend service to be upstream config.
// err is ignored because it does not care about the externalNodes of the apisixupstream.
auNN := types.NamespacedName{Namespace: ar.GetNamespace(), Name: backend.ServiceName}
if au, ok := tctx.Upstreams[auNN]; ok {
upstream, _ = t.translateApisixUpstream(tctx, au)
}

if backend.ResolveGranularity == apiv2.ResolveGranularityService {
upstream.Nodes, backendErr = t.translateApisixRouteBackendResolveGranularityService(tctx, utils.NamespacedName(ar), backend)
if backendErr != nil {
t.Log.Error(backendErr, "failed to translate ApisixRoute backend with ResolveGranularity Service")
continue
}
} else {
upstream.Nodes, backendErr = t.translateApisixRouteBackendResolveGranularityEndpoint(tctx, utils.NamespacedName(ar), backend)
if backendErr != nil {
t.Log.Error(backendErr, "failed to translate ApisixRoute backend with ResolveGranularity Endpoint")
continue
}
}
if backend.Weight != nil {
upstream.Labels["meta_weight"] = strconv.FormatInt(int64(*backend.Weight), 10)
upstream, err := t.translateApisixRouteHTTPBackend(tctx, ar, backend)
if err != nil {
t.Log.Error(err, "failed to translate ApisixRoute backend", "backend", backend)
continue
}

upstreamName := adc.ComposeUpstreamName(ar.Namespace, backend.ServiceName, backend.Subset, backend.ServicePort, backend.ResolveGranularity)
Expand Down Expand Up @@ -350,6 +332,45 @@ func getPortFromService(svc *v1.Service, backendSvcPort intstr.IntOrString) (int
return port, nil
}

func (t *Translator) translateApisixRouteHTTPBackend(tctx *provider.TranslateContext, ar *apiv2.ApisixRoute, backend apiv2.ApisixRouteHTTPBackend) (*adc.Upstream, error) {
auNN := types.NamespacedName{
Namespace: ar.Namespace,
Name: backend.ServiceName,
}
upstream := adc.NewDefaultUpstream()
if au, ok := tctx.Upstreams[auNN]; ok {
svc := tctx.Services[auNN]
if svc == nil {
return upstream, nil
}
port, err := getPortFromService(svc, backend.ServicePort)
if err != nil {
return nil, err
}
upstream, err = t.translateApisixUpstreamForPort(tctx, au, ptr.To(port))
if err != nil {
return nil, err
}
}
var err error
if backend.ResolveGranularity == apiv2.ResolveGranularityService {
upstream.Nodes, err = t.translateApisixRouteBackendResolveGranularityService(tctx, auNN, backend)
if err != nil {
return nil, err
}
} else {
upstream.Nodes, err = t.translateApisixRouteBackendResolveGranularityEndpoint(tctx, auNN, backend)
if err != nil {
return nil, err
}
}

if backend.Weight != nil {
upstream.Labels["meta_weight"] = strconv.FormatInt(int64(*backend.Weight), 10)
}
return upstream, nil
}

func (t *Translator) translateApisixRouteBackendResolveGranularityService(tctx *provider.TranslateContext, arNN types.NamespacedName, backend apiv2.ApisixRouteHTTPBackend) (adc.UpstreamNodes, error) {
serviceNN := types.NamespacedName{
Namespace: arNN.Namespace,
Expand Down Expand Up @@ -434,9 +455,18 @@ func (t *Translator) translateStreamRule(tctx *provider.TranslateContext, ar *ap
svc.StreamRoutes = append(svc.StreamRoutes, sr)

auNN := types.NamespacedName{Namespace: ar.GetNamespace(), Name: part.Backend.ServiceName}

upstream := adc.NewDefaultUpstream()
if au, ok := tctx.Upstreams[auNN]; ok {
upstream, _ = t.translateApisixUpstream(tctx, au)
service := tctx.Services[auNN]
if service == nil {
return nil, errors.Errorf("service not found, ApisixRoute: %s, Service: %s", utils.NamespacedName(ar), auNN)
}
port, err := getPortFromService(service, part.Backend.ServicePort)
if err != nil {
return nil, err
}
upstream, _ = t.translateApisixUpstreamForPort(tctx, au, ptr.To(port))
}
nodes, err := t.translateApisixRouteStreamBackendResolveGranularity(tctx, utils.NamespacedName(ar), part.Backend)
if err != nil {
Expand Down
99 changes: 60 additions & 39 deletions internal/adc/translator/apisixupstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package translator
import (
"cmp"
"fmt"
"maps"

"github.com/api7/gopkg/pkg/log"
"github.com/pkg/errors"
Expand All @@ -33,50 +34,73 @@ import (
"github.com/apache/apisix-ingress-controller/internal/utils"
)

func (t *Translator) translateApisixUpstream(tctx *provider.TranslateContext, au *apiv2.ApisixUpstream) (ups *adc.Upstream, err error) {
ups = adc.NewDefaultUpstream()
for _, f := range []func(*apiv2.ApisixUpstream, *adc.Upstream) error{
patchApisixUpstreamBasics,
func (t *Translator) translateApisixUpstream(tctx *provider.TranslateContext, au *apiv2.ApisixUpstream) (*adc.Upstream, error) {
return t.translateApisixUpstreamForPort(tctx, au, nil)
}

func (t *Translator) translateApisixUpstreamForPort(tctx *provider.TranslateContext, au *apiv2.ApisixUpstream, port *int32) (*adc.Upstream, error) {
log.Debugw("translating ApisixUpstream", zap.Any("apisixupstream", au), zap.Int32p("port", port))

ups := adc.NewDefaultUpstream()
ups.Name = composeExternalUpstreamName(au)
maps.Copy(ups.Labels, au.Labels)

if err := translateApisixUpstreamConfig(tctx, &au.Spec.ApisixUpstreamConfig, ups); err != nil {
return nil, err
}
if err := translateApisixUpstreamExternalNodes(tctx, au, ups); err != nil {
return nil, err
}

// If PortLevelSettings is configured and a specific port is provided,
// apply the ApisixUpstreamConfig for the matching port to the upstream.
if len(au.Spec.PortLevelSettings) > 0 && port != nil {
for _, pls := range au.Spec.PortLevelSettings {
if pls.Port != *port {
continue
}
if err := translateApisixUpstreamConfig(tctx, &pls.ApisixUpstreamConfig, ups); err != nil {
return nil, err
}
}
}

log.Debugw("translated ApisixUpstream", zap.Any("upstream", ups))

return ups, nil
}

func translateApisixUpstreamConfig(tctx *provider.TranslateContext, config *apiv2.ApisixUpstreamConfig, ups *adc.Upstream) (err error) {
for _, f := range []func(*apiv2.ApisixUpstreamConfig, *adc.Upstream) error{
translateApisixUpstreamScheme,
translateApisixUpstreamLoadBalancer,
translateApisixUpstreamRetriesAndTimeout,
translateApisixUpstreamPassHost,
translateUpstreamHealthCheck,
translateUpstreamDiscovery,
} {
if err = f(au, ups); err != nil {
if err = f(config, ups); err != nil {
return
}
}
for _, f := range []func(*provider.TranslateContext, *apiv2.ApisixUpstream, *adc.Upstream) error{
for _, f := range []func(*provider.TranslateContext, *apiv2.ApisixUpstreamConfig, *adc.Upstream) error{
translateApisixUpstreamClientTLS,
translateApisixUpstreamExternalNodes,
} {
if err = f(tctx, au, ups); err != nil {
if err = f(tctx, config, ups); err != nil {
return
}
}

log.Debugw("translated ApisixUpstream", zap.Any("upstream", ups),
zap.String("namespace", au.Namespace), zap.String("name", au.Name))
return
}

func patchApisixUpstreamBasics(au *apiv2.ApisixUpstream, ups *adc.Upstream) error {
ups.Name = composeExternalUpstreamName(au)
for k, v := range au.Labels {
ups.Labels[k] = v
}
func translateApisixUpstreamScheme(config *apiv2.ApisixUpstreamConfig, ups *adc.Upstream) error {
ups.Scheme = cmp.Or(config.Scheme, apiv2.SchemeHTTP)
return nil
}

func translateApisixUpstreamScheme(au *apiv2.ApisixUpstream, ups *adc.Upstream) error {
ups.Scheme = cmp.Or(au.Spec.Scheme, apiv2.SchemeHTTP)
return nil
}

func translateApisixUpstreamLoadBalancer(au *apiv2.ApisixUpstream, ups *adc.Upstream) error {
lb := au.Spec.LoadBalancer
func translateApisixUpstreamLoadBalancer(config *apiv2.ApisixUpstreamConfig, ups *adc.Upstream) error {
lb := config.LoadBalancer
if lb == nil || lb.Type == "" {
ups.Type = apiv2.LbRoundRobin
return nil
Expand Down Expand Up @@ -107,9 +131,9 @@ func translateApisixUpstreamLoadBalancer(au *apiv2.ApisixUpstream, ups *adc.Upst
return nil
}

func translateApisixUpstreamRetriesAndTimeout(au *apiv2.ApisixUpstream, ups *adc.Upstream) error {
retries := au.Spec.Retries
timeout := au.Spec.Timeout
func translateApisixUpstreamRetriesAndTimeout(config *apiv2.ApisixUpstreamConfig, ups *adc.Upstream) error {
retries := config.Retries
timeout := config.Timeout

if retries != nil && *retries < 0 {
return errors.New("invalid value retries")
Expand Down Expand Up @@ -144,15 +168,15 @@ func translateApisixUpstreamRetriesAndTimeout(au *apiv2.ApisixUpstream, ups *adc
return nil
}

func translateApisixUpstreamClientTLS(tctx *provider.TranslateContext, au *apiv2.ApisixUpstream, ups *adc.Upstream) error {
if au.Spec.TLSSecret == nil {
func translateApisixUpstreamClientTLS(tctx *provider.TranslateContext, config *apiv2.ApisixUpstreamConfig, ups *adc.Upstream) error {
if config.TLSSecret == nil {
return nil
}

var (
secretNN = types.NamespacedName{
Namespace: au.Spec.TLSSecret.Namespace,
Name: au.Spec.TLSSecret.Name,
Namespace: config.TLSSecret.Namespace,
Name: config.TLSSecret.Name,
}
)
secret, ok := tctx.Secrets[secretNN]
Expand All @@ -173,9 +197,9 @@ func translateApisixUpstreamClientTLS(tctx *provider.TranslateContext, au *apiv2
return nil
}

func translateApisixUpstreamPassHost(au *apiv2.ApisixUpstream, ups *adc.Upstream) error {
ups.PassHost = au.Spec.PassHost
ups.UpstreamHost = au.Spec.UpstreamHost
func translateApisixUpstreamPassHost(config *apiv2.ApisixUpstreamConfig, ups *adc.Upstream) error {
ups.PassHost = config.PassHost
ups.UpstreamHost = config.UpstreamHost

return nil
}
Expand Down Expand Up @@ -259,11 +283,8 @@ func translateApisixUpstreamExternalNodesService(tctx *provider.TranslateContext
return nil
}

func translateUpstreamHealthCheck(au *apiv2.ApisixUpstream, ups *adc.Upstream) error {
if au == nil {
return nil
}
healcheck := au.Spec.HealthCheck
func translateUpstreamHealthCheck(config *apiv2.ApisixUpstreamConfig, ups *adc.Upstream) error {
healcheck := config.HealthCheck
if healcheck == nil || (healcheck.Passive == nil && healcheck.Active == nil) {
return nil
}
Expand Down Expand Up @@ -346,8 +367,8 @@ func translateUpstreamPassiveHealthCheck(config *apiv2.PassiveHealthCheck) *adc.
return &passive
}

func translateUpstreamDiscovery(au *apiv2.ApisixUpstream, ups *adc.Upstream) error {
discovery := au.Spec.Discovery
func translateUpstreamDiscovery(config *apiv2.ApisixUpstreamConfig, ups *adc.Upstream) error {
discovery := config.Discovery
if discovery == nil {
return nil
}
Expand Down
Loading
Loading