Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ func run(o *Options) error {
k8sClient,
informerFactory,
ofClient,
ovsctl.NewClient(o.config.OVSBridge),
ovsBridgeClient,
routeClient,
ifaceStore,
Expand Down
59 changes: 45 additions & 14 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,6 @@ func (i *Initializer) initInterfaceStore() error {
return intf
}
ifaceList := make([]*interfacestore.InterfaceConfig, 0, len(ovsPorts))
ovsCtlClient := ovsctl.NewClient(i.ovsBridge)
for index := range ovsPorts {
port := &ovsPorts[index]
ovsPort := &interfacestore.OVSPortConfig{
Expand All @@ -297,6 +296,8 @@ func (i *Initializer) initInterfaceStore() error {
case interfacestore.AntreaUplink:
intf = parseUplinkInterfaceFunc(port, ovsPort)
case interfacestore.AntreaTunnel:
fallthrough
case interfacestore.AntreaIPsecTunnel:
intf = parseTunnelInterfaceFunc(port, ovsPort)
case interfacestore.AntreaHost:
if port.Name == i.ovsBridge {
Expand All @@ -314,9 +315,6 @@ func (i *Initializer) initInterfaceStore() error {
intf = cniserver.ParseOVSPortInterfaceConfig(port, ovsPort, true)
case interfacestore.AntreaTrafficControl:
intf = trafficcontrol.ParseTrafficControlInterfaceConfig(port, ovsPort)
if err := ovsCtlClient.SetPortNoFlood(int(ovsPort.OFPort)); err != nil {
klog.ErrorS(err, "Failed to set port with no-flood config", "PortName", port.Name)
}
default:
klog.InfoS("Unknown Antrea interface type", "type", interfaceType)
}
Expand All @@ -340,7 +338,11 @@ func (i *Initializer) initInterfaceStore() error {
fallthrough
case port.IFType == ovsconfig.STTTunnel:
intf = parseTunnelInterfaceFunc(port, ovsPort)
antreaIFType = interfacestore.AntreaTunnel
if intf.Type == interfacestore.IPSecTunnelInterface {
antreaIFType = interfacestore.AntreaIPsecTunnel
} else {
antreaIFType = interfacestore.AntreaTunnel
}
case port.Name == i.ovsBridge:
intf = nil
antreaIFType = interfacestore.AntreaHost
Expand Down Expand Up @@ -368,6 +370,23 @@ func (i *Initializer) initInterfaceStore() error {
return nil
}

func (i *Initializer) restorePortConfigs() error {
ovsCtlClient := ovsctl.NewClient(i.ovsBridge)
interfaces := i.ifaceStore.ListInterfaces()
for _, intf := range interfaces {
switch intf.Type {
case interfacestore.IPSecTunnelInterface:
fallthrough
case interfacestore.TrafficControlInterface:
if err := ovsCtlClient.SetPortNoFlood(int(intf.OFPort)); err != nil {
return fmt.Errorf("failed to set port %s with no-flood: %w", intf.InterfaceName, err)
}
klog.InfoS("Set port no-flood successfully", "PortName", intf.InterfaceName)
}
}
return nil
}

// Initialize sets up agent initial configurations.
func (i *Initializer) Initialize() error {
klog.Info("Setting up node network")
Expand All @@ -386,6 +405,10 @@ func (i *Initializer) Initialize() error {
return err
}

if err := i.restorePortConfigs(); err != nil {
return err
}

// initializeWireGuard must be executed after setupOVSBridge as it requires gateway addresses on the OVS bridge.
if i.networkConfig.TrafficEncryptionMode == config.TrafficEncryptionModeWireGuard {
if err := i.initializeWireGuard(); err != nil {
Expand Down Expand Up @@ -487,14 +510,15 @@ func persistRoundNum(num uint64, bridgeClient ovsconfig.OVSBridgeClient, interva

// initOpenFlowPipeline sets up necessary Openflow entries, including pipeline, classifiers, conn_track, and gateway flows
// Every time the agent is (re)started, we go through the following sequence:
// 1. agent determines the new round number (this is done by incrementing the round number
// persisted in OVSDB, or if it's not available by picking round 1).
// 2. any existing flow for which the round number matches the round number obtained from step 1
// is deleted.
// 3. all required flows are installed, using the round number obtained from step 1.
// 4. after convergence, all existing flows for which the round number matches the previous round
// number (i.e. the round number which was persisted in OVSDB, if any) are deleted.
// 5. the new round number obtained from step 1 is persisted to OVSDB.
// 1. agent determines the new round number (this is done by incrementing the round number
// persisted in OVSDB, or if it's not available by picking round 1).
// 2. any existing flow for which the round number matches the round number obtained from step 1
// is deleted.
// 3. all required flows are installed, using the round number obtained from step 1.
// 4. after convergence, all existing flows for which the round number matches the previous round
// number (i.e. the round number which was persisted in OVSDB, if any) are deleted.
// 5. the new round number obtained from step 1 is persisted to OVSDB.
//
// The rationale for not persisting the new round number until after all previous flows have been
// deleted is to avoid a situation in which some stale flows are never deleted because of successive
// agent restarts (with the agent crashing before step 4 can be completed). With the sequence
Expand Down Expand Up @@ -552,6 +576,13 @@ func (i *Initializer) initOpenFlowPipeline() error {
i.ofClient.ReplayFlows()
klog.Info("Flow replay completed")

klog.InfoS("Restoring OF port configs to OVS bridge")
if err := i.restorePortConfigs(); err != nil {
klog.ErrorS(err, "Failed to restore OF port configs")
} else {
klog.InfoS("Port configs restoration completed")
}

if i.ovsBridgeClient.GetOVSDatapathType() == ovsconfig.OVSDatapathNetdev {
// we don't set flow-restore-wait when using the OVS netdev datapath
return
Expand All @@ -561,7 +592,7 @@ func (i *Initializer) initOpenFlowPipeline() error {
// happen that ovsBridgeClient's connection is not ready when ofClient completes flow replay. We retry it
// with a timeout that is longer time than ovsBridgeClient's maximum connecting retry interval (8 seconds)
// to ensure the flag can be removed successfully.
err := wait.PollImmediate(200*time.Millisecond, 10*time.Second, func() (done bool, err error) {
err = wait.PollImmediate(200*time.Millisecond, 10*time.Second, func() (done bool, err error) {
if err := i.FlowRestoreComplete(); err != nil {
return false, nil
}
Expand Down
46 changes: 17 additions & 29 deletions pkg/agent/controller/noderoute/node_route_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ import (
"antrea.io/antrea/pkg/agent/util"
"antrea.io/antrea/pkg/agent/wireguard"
"antrea.io/antrea/pkg/ovs/ovsconfig"
"antrea.io/antrea/pkg/ovs/ovsctl"
utilip "antrea.io/antrea/pkg/util/ip"
"antrea.io/antrea/pkg/util/k8s"
"antrea.io/antrea/pkg/util/runtime"
)

const (
Expand All @@ -65,14 +65,14 @@ type Controller struct {
kubeClient clientset.Interface
ovsBridgeClient ovsconfig.OVSBridgeClient
ofClient openflow.Client
ovsCtlClient ovsctl.OVSCtlClient
routeClient route.Interface
interfaceStore interfacestore.InterfaceStore
networkConfig *config.NetworkConfig
nodeConfig *config.NodeConfig
nodeInformer coreinformers.NodeInformer
nodeLister corelisters.NodeLister
nodeListerSynced cache.InformerSynced
svcLister corelisters.ServiceLister
queue workqueue.RateLimitingInterface
// installedNodes records routes and flows installation states of Nodes.
// The key is the host name of the Node, the value is the nodeRouteInfo of the Node.
Expand All @@ -92,6 +92,7 @@ func NewNodeRouteController(
kubeClient clientset.Interface,
informerFactory informers.SharedInformerFactory,
client openflow.Client,
ovsCtlClient ovsctl.OVSCtlClient,
ovsBridgeClient ovsconfig.OVSBridgeClient,
routeClient route.Interface,
interfaceStore interfacestore.InterfaceStore,
Expand All @@ -102,19 +103,18 @@ func NewNodeRouteController(
ipsecCertificateManager ipseccertificate.Manager,
) *Controller {
nodeInformer := informerFactory.Core().V1().Nodes()
svcLister := informerFactory.Core().V1().Services()
controller := &Controller{
kubeClient: kubeClient,
ovsBridgeClient: ovsBridgeClient,
ofClient: client,
ovsCtlClient: ovsCtlClient,
routeClient: routeClient,
interfaceStore: interfaceStore,
networkConfig: networkConfig,
nodeConfig: nodeConfig,
nodeInformer: nodeInformer,
nodeLister: nodeInformer.Lister(),
nodeListerSynced: nodeInformer.Informer().HasSynced,
svcLister: svcLister.Lister(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "noderoute"),
installedNodes: cache.NewIndexer(nodeRouteInfoKeyFunc, cache.Indexers{nodeRouteInfoPodCIDRIndexName: nodeRouteInfoPodCIDRIndexFunc}),
wireGuardClient: wireguardClient,
Expand Down Expand Up @@ -203,27 +203,10 @@ func (c *Controller) removeStaleGatewayRoutes() error {
desiredPodCIDRs = append(desiredPodCIDRs, podCIDRs...)
}

// TODO: This is not the best place to keep the ClusterIP Service routes.
desiredClusterIPSvcIPs := map[string]bool{}
if c.proxyAll && runtime.IsWindowsPlatform() {
// The route for virtual IP -> antrea-gw0 should be always kept.
desiredClusterIPSvcIPs[config.VirtualServiceIPv4.String()] = true

svcs, err := c.svcLister.List(labels.Everything())
for _, svc := range svcs {
for _, ip := range svc.Spec.ClusterIPs {
desiredClusterIPSvcIPs[ip] = true
}
}
if err != nil {
return fmt.Errorf("error when listing ClusterIP Service IPs: %v", err)
}
}

// routeClient will remove orphaned routes whose destinations are not in desiredPodCIDRs.
// If proxyAll enabled, it will also remove routes that are for Windows ClusterIP Services
// which no longer exist.
if err := c.routeClient.Reconcile(desiredPodCIDRs, desiredClusterIPSvcIPs); err != nil {
if err := c.routeClient.Reconcile(desiredPodCIDRs); err != nil {
return err
}
return nil
Expand All @@ -244,7 +227,7 @@ func (c *Controller) removeStaleTunnelPorts() error {
// will not include it in the set.
desiredInterfaces := make(map[string]bool)
// knownInterfaces is the list of interfaces currently in the local cache.
knownInterfaces := c.interfaceStore.GetInterfaceKeysByType(interfacestore.TunnelInterface)
knownInterfaces := c.interfaceStore.GetInterfaceKeysByType(interfacestore.IPSecTunnelInterface)

if c.networkConfig.TrafficEncryptionMode == config.TrafficEncryptionModeIPSec {
for _, node := range nodes {
Expand Down Expand Up @@ -669,15 +652,14 @@ func (c *Controller) createIPSecTunnelPort(nodeName string, nodeIP net.IP) (int3
}
c.interfaceStore.DeleteInterface(interfaceConfig)
exists = false
} else {
if interfaceConfig.OFPort != 0 {
klog.V(2).InfoS("Found cached IPsec tunnel interface", "node", nodeName, "interface", interfaceConfig.InterfaceName, "port", interfaceConfig.OFPort)
return interfaceConfig.OFPort, nil
}
}
}

if !exists {
ovsExternalIDs := map[string]interface{}{ovsExternalIDNodeName: nodeName}
ovsExternalIDs := map[string]interface{}{
ovsExternalIDNodeName: nodeName,
interfacestore.AntreaInterfaceTypeKey: interfacestore.AntreaIPsecTunnel,
}
portUUID, err := c.ovsBridgeClient.CreateTunnelPortExt(
portName,
c.networkConfig.TunnelType,
Expand Down Expand Up @@ -713,6 +695,12 @@ func (c *Controller) createIPSecTunnelPort(nodeName string, nodeIP net.IP) (int3
// Let NodeRouteController retry at errors.
return 0, fmt.Errorf("failed to get of_port of IPsec tunnel port for Node %s", nodeName)
}

// Set the port with no-flood to reject ARP flood packets.
if err := c.ovsCtlClient.SetPortNoFlood(int(ofPort)); err != nil {
return 0, fmt.Errorf("failed to set port %s with no-flood config: %w", portName, err)
}

interfaceConfig.OFPort = ofPort
return ofPort, nil
}
Expand Down
31 changes: 24 additions & 7 deletions pkg/agent/controller/noderoute/node_route_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"antrea.io/antrea/pkg/agent/util"
"antrea.io/antrea/pkg/ovs/ovsconfig"
ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing"
ovsctltest "antrea.io/antrea/pkg/ovs/ovsctl/testing"
utilip "antrea.io/antrea/pkg/util/ip"
)

Expand All @@ -58,6 +59,7 @@ type fakeController struct {
ovsClient *ovsconfigtest.MockOVSBridgeClient
routeClient *routetest.MockInterface
interfaceStore interfacestore.InterfaceStore
ovsCtlClient *ovsctltest.MockOVSCtlClient
}

type fakeIPsecCertificateManager struct{}
Expand All @@ -75,7 +77,9 @@ func newController(t *testing.T, networkConfig *config.NetworkConfig) (*fakeCont
routeClient := routetest.NewMockInterface(ctrl)
interfaceStore := interfacestore.NewInterfaceStore()
ipsecCertificateManager := &fakeIPsecCertificateManager{}
c := NewNodeRouteController(clientset, informerFactory, ofClient, ovsClient, routeClient, interfaceStore, networkConfig, &config.NodeConfig{GatewayConfig: &config.GatewayConfig{
ovsCtlClient := ovsctltest.NewMockOVSCtlClient(ctrl)

c := NewNodeRouteController(clientset, informerFactory, ofClient, ovsCtlClient, ovsClient, routeClient, interfaceStore, networkConfig, &config.NodeConfig{GatewayConfig: &config.GatewayConfig{
IPv4: nil,
MAC: gatewayMAC,
}}, nil, false, ipsecCertificateManager)
Expand All @@ -86,6 +90,7 @@ func newController(t *testing.T, networkConfig *config.NetworkConfig) (*fakeCont
ofClient: ofClient,
ovsClient: ovsClient,
routeClient: routeClient,
ovsCtlClient: ovsCtlClient,
interfaceStore: interfaceStore,
}, ctrl.Finish
}
Expand Down Expand Up @@ -254,7 +259,7 @@ func setup(t *testing.T, ifaces []*interfacestore.InterfaceConfig, authenticatio
func TestRemoveStaleTunnelPorts(t *testing.T) {
c, closeFn := setup(t, []*interfacestore.InterfaceConfig{
{
Type: interfacestore.TunnelInterface,
Type: interfacestore.IPSecTunnelInterface,
InterfaceName: util.GenerateNodeTunnelInterfaceName("xyz-k8s-0-1"),
TunnelInterfaceConfig: &interfacestore.TunnelInterfaceConfig{
NodeName: "xyz-k8s-0-1",
Expand Down Expand Up @@ -302,7 +307,7 @@ func TestRemoveStaleTunnelPorts(t *testing.T) {
func TestCreateIPSecTunnelPortPSK(t *testing.T) {
c, closeFn := setup(t, []*interfacestore.InterfaceConfig{
{
Type: interfacestore.TunnelInterface,
Type: interfacestore.IPSecTunnelInterface,
InterfaceName: "mismatchedname",
TunnelInterfaceConfig: &interfacestore.TunnelInterfaceConfig{
NodeName: "xyz-k8s-0-2",
Expand All @@ -315,7 +320,7 @@ func TestCreateIPSecTunnelPortPSK(t *testing.T) {
},
},
{
Type: interfacestore.TunnelInterface,
Type: interfacestore.IPSecTunnelInterface,
InterfaceName: util.GenerateNodeTunnelInterfaceName("xyz-k8s-0-3"),
TunnelInterfaceConfig: &interfacestore.TunnelInterfaceConfig{
NodeName: "xyz-k8s-0-3",
Expand All @@ -339,16 +344,25 @@ func TestCreateIPSecTunnelPortPSK(t *testing.T) {

node1PortName := util.GenerateNodeTunnelInterfaceName("xyz-k8s-0-1")
node2PortName := util.GenerateNodeTunnelInterfaceName("xyz-k8s-0-2")
node3PortName := util.GenerateNodeTunnelInterfaceName("xyz-k8s-0-3")
c.ovsClient.EXPECT().CreateTunnelPortExt(
node1PortName, ovsconfig.TunnelType("vxlan"), int32(0),
false, "", nodeIP1.String(), "", "changeme", nil,
map[string]interface{}{ovsExternalIDNodeName: "xyz-k8s-0-1"}).Times(1)
map[string]interface{}{ovsExternalIDNodeName: "xyz-k8s-0-1",
interfacestore.AntreaInterfaceTypeKey: interfacestore.AntreaIPsecTunnel,
}).Times(1)
c.ovsClient.EXPECT().CreateTunnelPortExt(
node2PortName, ovsconfig.TunnelType("vxlan"), int32(0),
false, "", nodeIP2.String(), "", "changeme", nil,
map[string]interface{}{ovsExternalIDNodeName: "xyz-k8s-0-2"}).Times(1)
map[string]interface{}{ovsExternalIDNodeName: "xyz-k8s-0-2",
interfacestore.AntreaInterfaceTypeKey: interfacestore.AntreaIPsecTunnel,
}).Times(1)
c.ovsClient.EXPECT().GetOFPort(node1PortName, false).Return(int32(1), nil)
c.ovsCtlClient.EXPECT().SetPortNoFlood(1)
c.ovsClient.EXPECT().GetOFPort(node2PortName, false).Return(int32(2), nil)
c.ovsCtlClient.EXPECT().SetPortNoFlood(2)
c.ovsClient.EXPECT().GetOFPort(node3PortName, false).Return(int32(5), nil)
c.ovsCtlClient.EXPECT().SetPortNoFlood(5)
c.ovsClient.EXPECT().DeletePort("123").Times(1)

tests := []struct {
Expand Down Expand Up @@ -405,8 +419,11 @@ func TestCreateIPSecTunnelPortCert(t *testing.T) {
c.ovsClient.EXPECT().CreateTunnelPortExt(
node1PortName, ovsconfig.TunnelType("vxlan"), int32(0),
false, "", nodeIP1.String(), "xyz-k8s-0-1", "", nil,
map[string]interface{}{ovsExternalIDNodeName: "xyz-k8s-0-1"}).Times(1)
map[string]interface{}{ovsExternalIDNodeName: "xyz-k8s-0-1",
interfacestore.AntreaInterfaceTypeKey: interfacestore.AntreaIPsecTunnel,
}).Times(1)
c.ovsClient.EXPECT().GetOFPort(node1PortName, false).Return(int32(1), nil)
c.ovsCtlClient.EXPECT().SetPortNoFlood(1)

tests := []struct {
name string
Expand Down
13 changes: 11 additions & 2 deletions pkg/agent/interfacestore/interface_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ func getInterfaceKey(obj interface{}) (string, error) {
var key string
if interfaceConfig.Type == ContainerInterface {
key = util.GenerateContainerInterfaceKey(interfaceConfig.ContainerID)
} else if interfaceConfig.Type == TunnelInterface && interfaceConfig.NodeName != "" {
// Tunnel interface for a Node.
} else if interfaceConfig.Type == IPSecTunnelInterface {
// IPsec tunnel interface for a Node.
key = util.GenerateNodeTunnelInterfaceKey(interfaceConfig.NodeName)
} else {
// Use the interface name as the key by default.
Expand Down Expand Up @@ -123,6 +123,15 @@ func (c *interfaceCache) GetInterface(interfaceKey string) (*InterfaceConfig, bo
return iface.(*InterfaceConfig), found
}

// ListInterfacesByType lists all interfaces from local cache.
func (c *interfaceCache) ListInterfaces() []*InterfaceConfig {
interfaceConfigs := make([]*InterfaceConfig, 0)
for _, iface := range c.cache.List() {
interfaceConfigs = append(interfaceConfigs, iface.(*InterfaceConfig))
}
return interfaceConfigs
}

// GetInterfaceByName retrieves interface from local cache given the interface
// name.
func (c *interfaceCache) GetInterfaceByName(interfaceName string) (*InterfaceConfig, bool) {
Expand Down
Loading