Skip to content

Commit c4e0568

Browse files
committed
Fix race condition in ConntrackConnectionStore and FlowExporter
Conntrack connection store's polling go routine and flow exporter both access to conntrack connection store, and there's a race condition error. In the polling go routine, `deleteIfStaleOrResetConn` and `AddOrUpdateConn` both grab the lock, modify `conn.IsPresent` field, and release the lock. Between the execution of these two functions, it is likely that FlowExporter's timer is triggered and it reads the wrong `conn.IsPresent` value in an intermidiate state. We fix it by holding the lock until we finish the execution of both two functions. Fixes: #3650 Signed-off-by: heanlan <hanlan@vmware.com>
1 parent bd82ef6 commit c4e0568

4 files changed

Lines changed: 58 additions & 33 deletions

File tree

pkg/agent/flowexporter/connections/connections.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,20 @@ func (cs *connectionStore) ForAllConnectionsDo(callback flowexporter.ConnectionM
7070
for k, v := range cs.connections {
7171
err := callback(k, v)
7272
if err != nil {
73-
klog.Errorf("Callback execution failed for flow with key: %v, conn: %v, k, v: %v", k, v, err)
73+
klog.ErrorS(err, "Callback execution failed for flow", "key", k, "conn", v)
74+
return err
75+
}
76+
}
77+
return nil
78+
}
79+
80+
// ForAllConnectionsDoWithoutLock execute the callback for each connection in connection
81+
// map, without grabbing the lock. Caller is expected to grab lock.
82+
func (cs *connectionStore) ForAllConnectionsDoWithoutLock(callback flowexporter.ConnectionMapCallBack) error {
83+
for k, v := range cs.connections {
84+
err := callback(k, v)
85+
if err != nil {
86+
klog.ErrorS(err, "Callback execution failed for flow", "key", k, "conn", v)
7487
return err
7588
}
7689
}

pkg/agent/flowexporter/connections/conntrack_connections.go

Lines changed: 41 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -96,29 +96,6 @@ func (cs *ConntrackConnectionStore) Run(stopCh <-chan struct{}) {
9696
// TODO: As optimization, only poll invalid/closed connections during every poll, and poll the established connections right before the export.
9797
func (cs *ConntrackConnectionStore) Poll() ([]int, error) {
9898
klog.V(2).Infof("Polling conntrack")
99-
// Reset IsPresent flag for all connections in connection map before dumping
100-
// flows in conntrack module. If the connection does not exist in conntrack
101-
// table and has been exported, then we will delete it from connection map.
102-
// In addition, if the connection was not exported for a specific time period,
103-
// then we consider it to be stale and delete it.
104-
deleteIfStaleOrResetConn := func(key flowexporter.ConnectionKey, conn *flowexporter.Connection) error {
105-
if !conn.IsPresent {
106-
// Delete the connection if it is ready to delete or it was not exported
107-
// in the time period as specified by the stale connection timeout.
108-
if conn.ReadyToDelete || time.Since(conn.LastExportTime) >= cs.staleConnectionTimeout {
109-
if err := cs.deleteConnWithoutLock(key); err != nil {
110-
return err
111-
}
112-
}
113-
} else {
114-
conn.IsPresent = false
115-
}
116-
return nil
117-
}
118-
119-
if err := cs.ForAllConnectionsDo(deleteIfStaleOrResetConn); err != nil {
120-
return []int{}, err
121-
}
12299

123100
var zones []uint16
124101
var connsLens []int
@@ -137,18 +114,52 @@ func (cs *ConntrackConnectionStore) Poll() ([]int, error) {
137114
}
138115
}
139116
var totalConns int
117+
var filteredConnsList []*flowexporter.Connection
140118
for _, zone := range zones {
141-
filteredConnsList, totalConnsPerZone, err := cs.connDumper.DumpFlows(zone)
119+
filteredConnsListPerZone, totalConnsPerZone, err := cs.connDumper.DumpFlows(zone)
142120
if err != nil {
143121
return []int{}, err
144122
}
145123
totalConns += totalConnsPerZone
146-
// Update only the Connection store. IPFIX records are generated based on Connection store.
147-
for _, conn := range filteredConnsList {
148-
cs.AddOrUpdateConn(conn)
149-
}
124+
filteredConnsList = append(filteredConnsList, filteredConnsListPerZone...)
150125
connsLens = append(connsLens, len(filteredConnsList))
151126
}
127+
128+
// Reset IsPresent flag for all connections in connection map before updating
129+
// the dumped flows information in connection map. If the connection does not
130+
// exist in conntrack table and has been exported, then we will delete it from
131+
// connection map. In addition, if the connection was not exported for a specific
132+
// time period, then we consider it to be stale and delete it.
133+
deleteIfStaleOrResetConn := func(key flowexporter.ConnectionKey, conn *flowexporter.Connection) error {
134+
if !conn.IsPresent {
135+
// Delete the connection if it is ready to delete or it was not exported
136+
// in the time period as specified by the stale connection timeout.
137+
if conn.ReadyToDelete || time.Since(conn.LastExportTime) >= cs.staleConnectionTimeout {
138+
if err := cs.deleteConnWithoutLock(key); err != nil {
139+
return err
140+
}
141+
}
142+
} else {
143+
conn.IsPresent = false
144+
}
145+
return nil
146+
}
147+
148+
// Hold the lock until we verify whether the connection exist in conntrack table,
149+
// and finish updating the connection store.
150+
cs.AcquireConnStoreLock()
151+
152+
if err := cs.ForAllConnectionsDoWithoutLock(deleteIfStaleOrResetConn); err != nil {
153+
return []int{}, err
154+
}
155+
156+
// Update only the Connection store. IPFIX records are generated based on Connection store.
157+
for _, conn := range filteredConnsList {
158+
cs.AddOrUpdateConn(conn)
159+
}
160+
161+
cs.ReleaseConnStoreLock()
162+
152163
metrics.TotalConnectionsInConnTrackTable.Set(float64(totalConns))
153164
maxConns, err := cs.connDumper.GetMaxConnections()
154165
if err != nil {
@@ -203,13 +214,12 @@ func (cs *ConntrackConnectionStore) addNetworkPolicyMetadata(conn *flowexporter.
203214
// AddOrUpdateConn updates the connection if it is already present, i.e., update timestamp, counters etc.,
204215
// or adds a new connection with the resolved K8s metadata.
205216
func (cs *ConntrackConnectionStore) AddOrUpdateConn(conn *flowexporter.Connection) {
217+
conn.IsPresent = true
206218
connKey := flowexporter.NewConnectionKey(conn)
207-
cs.mutex.Lock()
208-
defer cs.mutex.Unlock()
209219

210220
existingConn, exists := cs.connections[connKey]
211221
if exists {
212-
existingConn.IsPresent = true
222+
existingConn.IsPresent = conn.IsPresent
213223
if flowexporter.IsConnectionDying(existingConn) {
214224
return
215225
}

pkg/agent/flowexporter/connections/conntrack_connections_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ func TestConntrackConnectionStore_AddOrUpdateConn(t *testing.T) {
118118
FlowKey: tuple1,
119119
Labels: []byte{0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0},
120120
Mark: openflow.ServiceCTMark.GetValue(),
121+
IsPresent: true,
121122
IsActive: true,
122123
DestinationPodName: "pod1",
123124
DestinationPodNamespace: "ns1",

test/e2e/framework.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ const (
126126

127127
nginxLBService = "nginx-loadbalancer"
128128

129+
connectionStorePollingInterval = 1 * time.Second
129130
exporterActiveFlowExportTimeout = 2 * time.Second
130131
exporterIdleFlowExportTimeout = 1 * time.Second
131132
aggregatorActiveFlowRecordTimeout = 3500 * time.Millisecond
@@ -694,7 +695,7 @@ func (data *TestData) enableAntreaFlowExporter(ipfixCollector string) error {
694695
// Enable flow exporter feature and add related config params to antrea agent configmap.
695696
ac := func(config *agentconfig.AgentConfig) {
696697
config.FeatureGates["FlowExporter"] = true
697-
config.FlowPollInterval = "1s"
698+
config.FlowPollInterval = connectionStorePollingInterval.String()
698699
config.ActiveFlowExportTimeout = exporterActiveFlowExportTimeout.String()
699700
config.IdleFlowExportTimeout = exporterIdleFlowExportTimeout.String()
700701
if ipfixCollector != "" {

0 commit comments

Comments
 (0)