Skip to content

Commit df5fb93

Browse files
committed
Enhance ACNP Service related feature
1. Only load Service GroupID into reg when AntreaPolicy is enabled. Service GroupID is only used by AntreaPolicy "toServices" and "AppliedTo NodePort Serivces" features for now. 2. In IngressSecurityClassifierTable, only forward packet to AntreaPolicyIngressRuleTable when AntreaPolicy is enabled and proxyAll is enabled. This forward flow is only used by AntreaPolicy "AppliedTo NodePort Services" feature to avoid packets skip AntreaPolicyIngressRuleTable, where policy will be enforced, when the endpoint of this Service is not on current NodePort Node. 3. In ACNP appliedTo NodePort Service e2e test, change to add another netNS to fake external network. 4. Change to use gwOFPort as inPort of reject response for some cases. Signed-off-by: graysonwu <wgrayson@vmware.com>
1 parent e0ae53c commit df5fb93

6 files changed

Lines changed: 160 additions & 70 deletions

File tree

pkg/agent/controller/networkpolicy/reject.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"encoding/binary"
1919
"fmt"
2020

21-
"antrea.io/libOpenflow/openflow15"
2221
"antrea.io/libOpenflow/protocol"
2322
"antrea.io/ofnet/ofctrl"
2423

@@ -165,14 +164,8 @@ func (c *Controller) rejectRequest(pktIn *ofctrl.PacketIn) error {
165164
srcMAC = sIface.MAC.String()
166165
dstMAC = dIface.MAC.String()
167166
}
168-
tunPort := c.tunPort
169-
if tunPort == 0 {
170-
// openflow15.P_CONTROLLER is used with noEncap mode when tunnel interface is not found.
171-
// It won't cause a loop with openflow15.P_CONTROLLER because it is used as the input port but not output port
172-
// in the packet out message.
173-
tunPort = uint32(openflow15.P_CONTROLLER)
174-
}
175-
inPort, outPort := getRejectOFPorts(packetOutType, sIface, dIface, c.gwPort, tunPort)
167+
168+
inPort, outPort := getRejectOFPorts(packetOutType, sIface, dIface, c.gwPort, c.tunPort)
176169
mutateFunc := getRejectPacketOutMutateFunc(packetOutType, c.nodeType)
177170

178171
if proto == protocol.Type_TCP {
@@ -289,9 +282,19 @@ func getRejectOFPorts(rejectType RejectType, sIface, dIface *interfacestore.Inte
289282
outPort = gwOFPort
290283
case RejectNoAPServiceRemoteToLocal:
291284
inPort = tunOFPort
285+
if inPort == 0 {
286+
// If tunnel interface is not found, which means we are in noEncap mode, then use
287+
// gateway port as inPort.
288+
inPort = gwOFPort
289+
}
292290
outPort = gwOFPort
293291
case RejectServiceRemoteToExternal:
294292
inPort = tunOFPort
293+
if inPort == 0 {
294+
// If tunnel interface is not found, which means we are in noEncap mode, then use
295+
// gateway port as inPort.
296+
inPort = gwOFPort
297+
}
295298
}
296299
return inPort, outPort
297300
}

pkg/agent/openflow/client.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -808,6 +808,7 @@ func (c *client) generatePipelines() {
808808
c.networkConfig,
809809
c.serviceConfig,
810810
c.bridge,
811+
c.enableAntreaPolicy,
811812
c.enableProxy,
812813
c.proxyAll,
813814
c.connectUplinkToBridge)

pkg/agent/openflow/pipeline.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2112,7 +2112,7 @@ func (f *featureNetworkPolicy) ingressClassifierFlows() []binding.Flow {
21122112
Action().GotoTable(IngressMetricTable.GetID()).
21132113
Done(),
21142114
}
2115-
if f.proxyAll {
2115+
if f.enableAntreaPolicy && f.proxyAll {
21162116
// This generates the flow to match the NodePort Service packets and forward them to AntreaPolicyIngressRuleTable.
21172117
// Policies applied on NodePort Service will be enforced in AntreaPolicyIngressRuleTable.
21182118
flows = append(flows, IngressSecurityClassifierTable.ofTable.BuildFlow(priorityNormal+1).
@@ -2345,9 +2345,10 @@ func (f *featureService) serviceLBFlow(groupID binding.GroupIDType,
23452345
MatchRegMark(EpToSelectRegMark).
23462346
Action().LoadRegMark(regMarksToLoad...)
23472347
}
2348-
return flowBuilder.
2349-
Action().LoadToRegField(ServiceGroupIDField, uint32(groupID)).
2350-
Action().Group(groupID).Done()
2348+
if f.enableAntreaPolicy {
2349+
flowBuilder = flowBuilder.Action().LoadToRegField(ServiceGroupIDField, uint32(groupID))
2350+
}
2351+
return flowBuilder.Action().Group(groupID).Done()
23512352
}
23522353

23532354
// endpointDNATFlow generates the flow which transforms the Service Cluster IP to the Endpoint IP according to the Endpoint

pkg/agent/openflow/service.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ type featureService struct {
4444
networkConfig *config.NetworkConfig
4545
gatewayPort uint32
4646

47+
enableAntreaPolicy bool
4748
enableProxy bool
4849
proxyAll bool
4950
connectUplinkToBridge bool
@@ -63,6 +64,7 @@ func newFeatureService(
6364
networkConfig *config.NetworkConfig,
6465
serviceConfig *config.ServiceConfig,
6566
bridge binding.Bridge,
67+
enableAntreaPolicy,
6668
enableProxy,
6769
proxyAll,
6870
connectUplinkToBridge bool) *featureService {
@@ -113,6 +115,7 @@ func newFeatureService(
113115
gatewayMAC: nodeConfig.GatewayConfig.MAC,
114116
gatewayPort: nodeConfig.GatewayConfig.OFPort,
115117
networkConfig: networkConfig,
118+
enableAntreaPolicy: enableAntreaPolicy,
116119
enableProxy: enableProxy,
117120
proxyAll: proxyAll,
118121
connectUplinkToBridge: connectUplinkToBridge,

test/e2e/antreapolicy_test.go

Lines changed: 57 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -3679,80 +3679,85 @@ func testACNPICMPSupport(t *testing.T, data *TestData) {
36793679
func testACNPNodePortServiceSupport(t *testing.T, data *TestData) {
36803680
skipIfProxyAllDisabled(t, data)
36813681

3682-
// Create a client on Node 0, one NodePort Service whose Endpoint is on Node 0 and
3683-
// another NodePort Service whose Endpoint is on Node 1. Initiate traffic from this
3684-
// client to these two Services Node 1 NodePort to simulate the traffic from
3685-
// external client to NodePort.
3686-
clientName := "agnhost-client"
3687-
failOnError(data.createAgnhostPodOnNode(clientName, data.testNamespace, nodeName(0), true), t)
3688-
defer data.deletePodAndWait(defaultTimeout, clientName, data.testNamespace)
3689-
ips, err := data.podWaitForIPs(defaultTimeout, clientName, data.testNamespace)
3682+
// Create a NodePort Service.
3683+
ipProtocol := v1.IPv4Protocol
3684+
var nodePort int32
3685+
nodePortSvc, err := data.createNginxNodePortService("test-nodeport-svc", false, false, &ipProtocol)
36903686
failOnError(err, t)
3691-
3692-
var cidr string
3693-
if clusterInfo.podV4NetworkCIDR != "" {
3694-
cidr = ips.ipv4.String()
3695-
} else {
3696-
cidr = ips.ipv6.String()
3687+
for _, port := range nodePortSvc.Spec.Ports {
3688+
if port.NodePort != 0 {
3689+
nodePort = port.NodePort
3690+
break
3691+
}
36973692
}
3698-
cidr += "/32"
3699-
3700-
svc1, cleanup1 := data.createAgnhostServiceAndBackendPods(t, "svc1", data.testNamespace, nodeName(0), v1.ServiceTypeNodePort)
3701-
defer cleanup1()
37023693

3703-
svc2, cleanup2 := data.createAgnhostServiceAndBackendPods(t, "svc2", data.testNamespace, nodeName(1), v1.ServiceTypeNodePort)
3704-
defer cleanup2()
3694+
backendPodName := "test-nodeport-backend-pod"
3695+
require.NoError(t, data.createNginxPodOnNode(backendPodName, data.testNamespace, nodeName(0), false))
3696+
if err := data.podWaitForRunning(defaultTimeout, backendPodName, data.testNamespace); err != nil {
3697+
t.Fatalf("Error when waiting for Pod '%s' to be in the Running state", backendPodName)
3698+
}
3699+
defer deletePodWrapper(t, data, data.testNamespace, backendPodName)
3700+
3701+
// Create another netns to fake an external network on the host network Pod.
3702+
testNetns := "test-ns"
3703+
cmd := fmt.Sprintf(`ip netns add %[1]s && \
3704+
ip link add dev %[1]s-a type veth peer name %[1]s-b && \
3705+
ip link set dev %[1]s-a netns %[1]s && \
3706+
ip addr add %[3]s/%[4]d dev %[1]s-b && \
3707+
ip link set dev %[1]s-b up && \
3708+
ip netns exec %[1]s ip addr add %[2]s/%[4]d dev %[1]s-a && \
3709+
ip netns exec %[1]s ip link set dev %[1]s-a up && \
3710+
ip netns exec %[1]s ip route replace default via %[3]s && \
3711+
sleep 3600
3712+
`, testNetns, "1.1.1.1", "1.1.1.254", 24)
3713+
clientNames := []string{"client0", "client1"}
3714+
for idx, clientName := range clientNames {
3715+
if err := NewPodBuilder(clientName, data.testNamespace, agnhostImage).OnNode(nodeName(idx)).WithCommand([]string{"sh", "-c", cmd}).InHostNetwork().Privileged().Create(data); err != nil {
3716+
t.Fatalf("Failed to create client Pod: %v", err)
3717+
}
3718+
defer data.deletePodAndWait(defaultTimeout, clientName, data.testNamespace)
3719+
err = data.podWaitForRunning(defaultTimeout, clientName, data.testNamespace)
3720+
failOnError(err, t)
3721+
}
37053722

3723+
cidr := "1.1.1.1/24"
37063724
builder := &ClusterNetworkPolicySpecBuilder{}
37073725
builder = builder.SetName("test-acnp-nodeport-svc").
37083726
SetPriority(1.0).
37093727
SetAppliedToGroup([]ACNPAppliedToSpec{
37103728
{
37113729
Service: &crdv1alpha1.NamespacedName{
3712-
Name: svc1.Name,
3713-
Namespace: svc1.Namespace,
3714-
},
3715-
},
3716-
{
3717-
Service: &crdv1alpha1.NamespacedName{
3718-
Name: svc2.Name,
3719-
Namespace: svc2.Namespace,
3730+
Name: nodePortSvc.Name,
3731+
Namespace: nodePortSvc.Namespace,
37203732
},
37213733
},
37223734
})
37233735
builder.AddIngress(ProtocolTCP, nil, nil, nil, nil, nil, nil, nil, &cidr, nil, nil,
37243736
nil, nil, false, nil, crdv1alpha1.RuleActionReject, "", "", nil)
37253737

3726-
testcases := []podToAddrTestStep{
3727-
{
3728-
Pod(fmt.Sprintf("%s/%s", data.testNamespace, clientName)),
3729-
nodeIP(1),
3730-
svc1.Spec.Ports[0].NodePort,
3731-
Rejected,
3732-
},
3733-
{
3734-
Pod(fmt.Sprintf("%s/%s", data.testNamespace, clientName)),
3735-
nodeIP(1),
3736-
svc2.Spec.Ports[0].NodePort,
3737-
Rejected,
3738-
},
3739-
}
3740-
37413738
acnp, err := k8sUtils.CreateOrUpdateACNP(builder.Get())
37423739
failOnError(err, t)
37433740
failOnError(waitForResourceReady(t, timeout, acnp), t)
3744-
for _, tc := range testcases {
3745-
log.Tracef("Probing: %s -> %s:%d", cidr, tc.destAddr, tc.destPort)
3746-
connectivity, err := k8sUtils.ProbeAddr(tc.clientPod.Namespace(), "antrea-e2e", tc.clientPod.PodName(), tc.destAddr, tc.destPort, ProtocolTCP)
3747-
if err != nil {
3748-
t.Errorf("failure -- could not complete probe: %v", err)
3741+
for idx, clientName := range clientNames {
3742+
log.Tracef("Probing: 1.1.1.1 -> %s:%d", nodeIP(idx), nodePort)
3743+
// Connect to NodePort in the fake external network.
3744+
cmd = fmt.Sprintf("for i in $(seq 1 3); do ip netns exec %s /agnhost connect %s:%d --timeout=1s --protocol=tcp; done;", testNetns, nodeIP(idx), nodePort)
3745+
stdout, stderr, err := data.RunCommandFromPod(data.testNamespace, clientName, agnhostContainerName, []string{"sh", "-c", cmd})
3746+
connectivity := Connected
3747+
if err != nil || stderr != "" {
3748+
// log this error as trace since may be an expected failure
3749+
log.Tracef("1.1.1.1 -> %s:%d: error when running command: err - %v /// stdout - %s /// stderr - %s", nodeIP(idx), nodePort, err, stdout, stderr)
3750+
// If err != nil and stderr == "", then it means this probe failed because of
3751+
// the command instead of connectivity. For example, container name doesn't exist.
3752+
if stderr == "" {
3753+
connectivity = Error
3754+
}
3755+
connectivity = DecideProbeResult(stderr, 3)
37493756
}
3750-
if connectivity != tc.expectedConnectivity {
3751-
t.Errorf("failure -- wrong results for probe: Source %s --> Dest %s:%d connectivity: %v, expected: %v",
3752-
cidr, tc.destAddr, tc.destPort, connectivity, tc.expectedConnectivity)
3757+
if connectivity != Rejected {
3758+
t.Errorf("failure -- wrong results for probe: Source 1.1.1.1 --> Dest %s:%d connectivity: %v, expected: Rej", nodeIP(idx), nodePort, connectivity)
37533759
}
37543760
}
3755-
// cleanup test resources
37563761
failOnError(k8sUtils.DeleteACNP(builder.Name), t)
37573762
}
37583763

test/integration/agent/openflow_test.go

Lines changed: 82 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -604,7 +604,7 @@ type svcConfig struct {
604604
withSessionAffinity bool
605605
}
606606

607-
func TestProxyServiceFlows(t *testing.T) {
607+
func TestProxyServiceFlowsAntreaPolicyDisabled(t *testing.T) {
608608
// Reset OVS metrics (Prometheus) and reinitialize them to test.
609609
legacyregistry.Reset()
610610
metrics.InitializeOVSMetrics()
@@ -679,7 +679,77 @@ func TestProxyServiceFlows(t *testing.T) {
679679

680680
for _, tc := range tcs {
681681
groupID := ofconfig.GroupIDType(tc.gid)
682-
expTableFlows, expGroupBuckets := expectedProxyServiceGroupAndFlows(tc.gid, tc.svc, tc.endpoints, tc.stickyAge)
682+
expTableFlows, expGroupBuckets := expectedProxyServiceGroupAndFlows(tc.gid, tc.svc, tc.endpoints, tc.stickyAge, false)
683+
installServiceFlows(t, tc.gid, tc.svc, tc.endpoints, tc.stickyAge)
684+
for _, tableFlow := range expTableFlows {
685+
ofTestUtils.CheckFlowExists(t, ovsCtlClient, tableFlow.tableName, 0, true, tableFlow.flows)
686+
}
687+
ofTestUtils.CheckGroupExists(t, ovsCtlClient, groupID, "select", expGroupBuckets, true)
688+
689+
uninstallServiceFlowsFunc(t, tc.gid, tc.svc, tc.endpoints)
690+
for _, tableFlow := range expTableFlows {
691+
ofTestUtils.CheckFlowExists(t, ovsCtlClient, tableFlow.tableName, 0, false, tableFlow.flows)
692+
}
693+
ofTestUtils.CheckGroupExists(t, ovsCtlClient, groupID, "select", expGroupBuckets, false)
694+
}
695+
}
696+
697+
func TestProxyServiceFlowsAntreaPoilcyEnabled(t *testing.T) {
698+
// Reset OVS metrics (Prometheus) and reinitialize them to test.
699+
legacyregistry.Reset()
700+
metrics.InitializeOVSMetrics()
701+
702+
c = ofClient.NewClient(br, bridgeMgmtAddr, true, true, false, false, false, false, false, false, false)
703+
err := ofTestUtils.PrepareOVSBridge(br)
704+
require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge %s", br))
705+
706+
config := prepareConfiguration(true, false)
707+
_, err = c.Initialize(roundInfo, config.nodeConfig, &agentconfig.NetworkConfig{TrafficEncapMode: agentconfig.TrafficEncapModeEncap, IPv4Enabled: true}, &agentconfig.EgressConfig{}, &agentconfig.ServiceConfig{})
708+
require.Nil(t, err, "Failed to initialize OFClient")
709+
710+
defer func() {
711+
err = c.Disconnect()
712+
assert.Nil(t, err, fmt.Sprintf("Error while disconnecting from OVS bridge: %v", err))
713+
err = ofTestUtils.DeleteOVSBridge(br)
714+
assert.Nil(t, err, fmt.Sprintf("Error while deleting OVS bridge: %v", err))
715+
ofClient.CleanOFTableCache()
716+
ofClient.ResetOFTable()
717+
}()
718+
719+
endpoints := []k8sproxy.Endpoint{
720+
k8stypes.NewEndpointInfo(&k8sproxy.BaseEndpointInfo{
721+
Endpoint: net.JoinHostPort("192.168.1.2", "8081"),
722+
IsLocal: true,
723+
}),
724+
k8stypes.NewEndpointInfo(&k8sproxy.BaseEndpointInfo{
725+
Endpoint: net.JoinHostPort("10.20.1.11", "8081"),
726+
IsLocal: false,
727+
}),
728+
}
729+
730+
stickyMaxAgeSeconds := uint16(30)
731+
732+
tcs := []struct {
733+
svc svcConfig
734+
gid uint32
735+
endpoints []k8sproxy.Endpoint
736+
stickyAge uint16
737+
}{
738+
{
739+
svc: svcConfig{
740+
protocol: ofconfig.ProtocolTCP,
741+
ip: net.ParseIP("10.20.30.41"),
742+
port: uint16(8000),
743+
},
744+
gid: 2,
745+
endpoints: endpoints,
746+
stickyAge: stickyMaxAgeSeconds,
747+
},
748+
}
749+
750+
for _, tc := range tcs {
751+
groupID := ofconfig.GroupIDType(tc.gid)
752+
expTableFlows, expGroupBuckets := expectedProxyServiceGroupAndFlows(tc.gid, tc.svc, tc.endpoints, tc.stickyAge, true)
683753
installServiceFlows(t, tc.gid, tc.svc, tc.endpoints, tc.stickyAge)
684754
for _, tableFlow := range expTableFlows {
685755
ofTestUtils.CheckFlowExists(t, ovsCtlClient, tableFlow.tableName, 0, true, tableFlow.flows)
@@ -716,7 +786,7 @@ func uninstallServiceFlowsFunc(t *testing.T, gid uint32, svc svcConfig, endpoint
716786
}
717787
}
718788

719-
func expectedProxyServiceGroupAndFlows(gid uint32, svc svcConfig, endpointList []k8sproxy.Endpoint, stickyAge uint16) (tableFlows []expectTableFlows, groupBuckets []string) {
789+
func expectedProxyServiceGroupAndFlows(gid uint32, svc svcConfig, endpointList []k8sproxy.Endpoint, stickyAge uint16, antreaPolicyEnabled bool) (tableFlows []expectTableFlows, groupBuckets []string) {
720790
nw_proto := 6
721791
learnProtoField := "NXM_OF_TCP_DST[]"
722792
if svc.protocol == ofconfig.ProtocolUDP {
@@ -732,10 +802,17 @@ func expectedProxyServiceGroupAndFlows(gid uint32, svc svcConfig, endpointList [
732802
serviceLearnReg = 3
733803
}
734804
cookieAllocator := cookie.NewAllocator(roundInfo.RoundNum)
805+
806+
loadGourpID := ""
807+
ctTable := "EgressRule"
808+
if antreaPolicyEnabled {
809+
loadGourpID = fmt.Sprintf("set_field:0x%x->reg7,", gid)
810+
ctTable = "AntreaPolicyEgressRule"
811+
}
735812
svcFlows := expectTableFlows{tableName: "ServiceLB", flows: []*ofTestUtils.ExpectFlow{
736813
{
737814
MatchStr: fmt.Sprintf("priority=200,%s,reg4=0x10000/0x70000,nw_dst=%s,tp_dst=%d", string(svc.protocol), svc.ip.String(), svc.port),
738-
ActStr: fmt.Sprintf("set_field:0x%x/0x70000->reg4,set_field:0x200/0x200->reg0,set_field:0x%x->reg7,group:%d", serviceLearnReg<<16, gid, gid),
815+
ActStr: fmt.Sprintf("set_field:0x%x/0x70000->reg4,set_field:0x200/0x200->reg0,%sgroup:%d", serviceLearnReg<<16, loadGourpID, gid),
739816
},
740817
{
741818
MatchStr: fmt.Sprintf("priority=190,%s,reg4=0x30000/0x70000,nw_dst=%s,tp_dst=%d", string(svc.protocol), svc.ip.String(), svc.port),
@@ -754,7 +831,7 @@ func expectedProxyServiceGroupAndFlows(gid uint32, svc svcConfig, endpointList [
754831
unionVal := (0b010 << 16) + uint32(epPort)
755832
epDNATFlows.flows = append(epDNATFlows.flows, &ofTestUtils.ExpectFlow{
756833
MatchStr: fmt.Sprintf("priority=200,%s,reg3=%s,reg4=0x%x/0x7ffff", string(svc.protocol), epIP, unionVal),
757-
ActStr: fmt.Sprintf("ct(commit,table=EgressRule,zone=65520,nat(dst=%s:%d),exec(set_field:0x10/0x10->ct_mark,move:NXM_NX_REG0[0..3]->NXM_NX_CT_MARK[0..3])", ep.IP(), epPort),
834+
ActStr: fmt.Sprintf("ct(commit,table=%s,zone=65520,nat(dst=%s:%d),exec(set_field:0x10/0x10->ct_mark,move:NXM_NX_REG0[0..3]->NXM_NX_CT_MARK[0..3])", ctTable, ep.IP(), epPort),
758835
})
759836

760837
if ep.GetIsLocal() {

0 commit comments

Comments
 (0)