Skip to content

Commit a6093c3

Browse files
committed
Fix race condition in ConntrackConnectionStore and FlowExporter
Signed-off-by: heanlan <hanlan@vmware.com>
1 parent bd82ef6 commit a6093c3

4 files changed

Lines changed: 25 additions & 5 deletions

File tree

pkg/agent/flowexporter/connections/connections.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,19 @@ func (cs *connectionStore) ForAllConnectionsDo(callback flowexporter.ConnectionM
7777
return nil
7878
}
7979

80+
// ForAllConnectionsDoWithoutLock execute the callback for each connection in connection
81+
// map, without grabbing the lock. Caller is expected to grab lock.
82+
func (cs *connectionStore) ForAllConnectionsDoWithoutLock(callback flowexporter.ConnectionMapCallBack) error {
83+
for k, v := range cs.connections {
84+
err := callback(k, v)
85+
if err != nil {
86+
klog.Errorf("Callback execution failed for flow with key: %v, conn: %v, k, v: %v", k, v, err)
87+
return err
88+
}
89+
}
90+
return nil
91+
}
92+
8093
// AddConnToMap adds the connection to connections map given connection key.
8194
// This is used only for unit tests.
8295
func (cs *connectionStore) AddConnToMap(connKey *flowexporter.ConnectionKey, conn *flowexporter.Connection) {

pkg/agent/flowexporter/connections/conntrack_connections.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,10 @@ func (cs *ConntrackConnectionStore) Poll() ([]int, error) {
116116
return nil
117117
}
118118

119-
if err := cs.ForAllConnectionsDo(deleteIfStaleOrResetConn); err != nil {
119+
// Hold the lock until verify whether the connection exist in conntrack table.
120+
cs.AcquireConnStoreLock()
121+
122+
if err := cs.ForAllConnectionsDoWithoutLock(deleteIfStaleOrResetConn); err != nil {
120123
return []int{}, err
121124
}
122125

@@ -149,6 +152,9 @@ func (cs *ConntrackConnectionStore) Poll() ([]int, error) {
149152
}
150153
connsLens = append(connsLens, len(filteredConnsList))
151154
}
155+
156+
cs.ReleaseConnStoreLock()
157+
152158
metrics.TotalConnectionsInConnTrackTable.Set(float64(totalConns))
153159
maxConns, err := cs.connDumper.GetMaxConnections()
154160
if err != nil {
@@ -203,13 +209,12 @@ func (cs *ConntrackConnectionStore) addNetworkPolicyMetadata(conn *flowexporter.
203209
// AddOrUpdateConn updates the connection if it is already present, i.e., update timestamp, counters etc.,
204210
// or adds a new connection with the resolved K8s metadata.
205211
func (cs *ConntrackConnectionStore) AddOrUpdateConn(conn *flowexporter.Connection) {
212+
conn.IsPresent = true
206213
connKey := flowexporter.NewConnectionKey(conn)
207-
cs.mutex.Lock()
208-
defer cs.mutex.Unlock()
209214

210215
existingConn, exists := cs.connections[connKey]
211216
if exists {
212-
existingConn.IsPresent = true
217+
existingConn.IsPresent = conn.IsPresent
213218
if flowexporter.IsConnectionDying(existingConn) {
214219
return
215220
}

pkg/agent/flowexporter/connections/conntrack_connections_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ func TestConntrackConnectionStore_AddOrUpdateConn(t *testing.T) {
118118
FlowKey: tuple1,
119119
Labels: []byte{0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0},
120120
Mark: openflow.ServiceCTMark.GetValue(),
121+
IsPresent: true,
121122
IsActive: true,
122123
DestinationPodName: "pod1",
123124
DestinationPodNamespace: "ns1",

test/e2e/framework.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ const (
126126

127127
nginxLBService = "nginx-loadbalancer"
128128

129+
connectionStorePollingInterval = 1 * time.Second
129130
exporterActiveFlowExportTimeout = 2 * time.Second
130131
exporterIdleFlowExportTimeout = 1 * time.Second
131132
aggregatorActiveFlowRecordTimeout = 3500 * time.Millisecond
@@ -694,7 +695,7 @@ func (data *TestData) enableAntreaFlowExporter(ipfixCollector string) error {
694695
// Enable flow exporter feature and add related config params to antrea agent configmap.
695696
ac := func(config *agentconfig.AgentConfig) {
696697
config.FeatureGates["FlowExporter"] = true
697-
config.FlowPollInterval = "1s"
698+
config.FlowPollInterval = connectionStorePollingInterval.String()
698699
config.ActiveFlowExportTimeout = exporterActiveFlowExportTimeout.String()
699700
config.IdleFlowExportTimeout = exporterIdleFlowExportTimeout.String()
700701
if ipfixCollector != "" {

0 commit comments

Comments
 (0)