Skip to content

Commit 0f5df48

Browse files
authored
Add throughput calculation into flow-aggregator aggregation process. (#2692)
Signed-off-by: heanlan <hanlan@vmware.com>
1 parent 4d45cdf commit 0f5df48

10 files changed

Lines changed: 271 additions & 189 deletions

File tree

build/yamls/elk-flow-collector/logstash/ipfix.yml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,3 +183,27 @@
183183
144:
184184
- :string
185185
- :destinationPodLabels
186+
145:
187+
- :uint64
188+
- :throughput
189+
146:
190+
- :uint64
191+
- :reverseThroughput
192+
147:
193+
- :uint64
194+
- :throughputFromSourceNode
195+
148:
196+
- :uint64
197+
- :throughputFromDestinationNode
198+
149:
199+
- :uint64
200+
- :reverseThroughputFromSourceNode
201+
150:
202+
- :uint64
203+
- :reverseThroughputFromDestinationNode
204+
151:
205+
- :uint32
206+
- :flowEndSecondsFromSourceNode
207+
152:
208+
- :uint32
209+
- :flowEndSecondsFromDestinationNode

ci/kind/test-e2e-kind.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ COMMON_IMAGES_LIST=("k8s.gcr.io/e2e-test-images/agnhost:2.29" \
129129
"projects.registry.vmware.com/library/busybox" \
130130
"projects.registry.vmware.com/antrea/nginx" \
131131
"projects.registry.vmware.com/antrea/perftool" \
132-
"projects.registry.vmware.com/antrea/ipfix-collector:v0.5.10" \
132+
"projects.registry.vmware.com/antrea/ipfix-collector:v0.5.11" \
133133
"projects.registry.vmware.com/antrea/wireguard-go:0.0.20210424")
134134
for image in "${COMMON_IMAGES_LIST[@]}"; do
135135
for i in `seq 3`; do

docs/network-flow-visibility.md

Lines changed: 81 additions & 69 deletions
Large diffs are not rendered by default.

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ require (
4343
github.com/stretchr/testify v1.7.0
4444
github.com/ti-mo/conntrack v0.4.0
4545
github.com/vishvananda/netlink v1.1.1-0.20210510164352-d17758a128bf
46-
github.com/vmware/go-ipfix v0.5.10
46+
github.com/vmware/go-ipfix v0.5.11
4747
golang.org/x/crypto v0.0.0-20210503195802-e9a32991a82e
4848
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6
4949
golang.org/x/mod v0.4.2

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -659,8 +659,8 @@ github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc/go.mod h1:ZjcWmF
659659
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU=
660660
github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae h1:4hwBBUfQCFe3Cym0ZtKyq7L16eZUtYKs+BaHDN6mAns=
661661
github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=
662-
github.com/vmware/go-ipfix v0.5.10 h1:32ITLwn/IZcagB+bG0g9f8KsSk2atWG9uHHy8InIuUc=
663-
github.com/vmware/go-ipfix v0.5.10/go.mod h1:yzbG1rv+yJ8GeMrRm+MDhOV3akygNZUHLhC1pDoD2AY=
662+
github.com/vmware/go-ipfix v0.5.11 h1:1731EiUCTkhrK0YTxVbpT3YVyfyIj3ACua+QjL+9eq0=
663+
github.com/vmware/go-ipfix v0.5.11/go.mod h1:yzbG1rv+yJ8GeMrRm+MDhOV3akygNZUHLhC1pDoD2AY=
664664
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
665665
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
666666
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8=

pkg/flowaggregator/flowaggregator.go

Lines changed: 68 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -121,11 +121,31 @@ var (
121121
"sourcePodLabels",
122122
"destinationPodLabels",
123123
}
124+
antreaFlowEndSecondsElementList = []string{
125+
"flowEndSecondsFromSourceNode",
126+
"flowEndSecondsFromDestinationNode",
127+
}
128+
antreaThroughputElementList = []string{
129+
"throughput",
130+
"reverseThroughput",
131+
}
132+
antreaSourceThroughputElementList = []string{
133+
"throughputFromSourceNode",
134+
"reverseThroughputFromSourceNode",
135+
}
136+
antreaDestinationThroughputElementList = []string{
137+
"throughputFromDestinationNode",
138+
"reverseThroughputFromDestinationNode",
139+
}
124140
aggregationElements = &ipfixintermediate.AggregationElements{
125141
NonStatsElements: nonStatsElementList,
126142
StatsElements: statsElementList,
127143
AggregatedSourceStatsElements: antreaSourceStatsElementList,
128144
AggregatedDestinationStatsElements: antreaDestinationStatsElementList,
145+
AntreaFlowEndSecondsElements: antreaFlowEndSecondsElementList,
146+
ThroughputElements: antreaThroughputElementList,
147+
SourceThroughputElements: antreaSourceThroughputElementList,
148+
DestinationThroughputElements: antreaDestinationThroughputElementList,
129149
}
130150

131151
correlateFields = []string{
@@ -289,7 +309,8 @@ func (fa *flowAggregator) InitCollectingProcess() error {
289309
IsEncrypted: false,
290310
}
291311
}
292-
cpInput.NumExtraElements = len(antreaSourceStatsElementList) + len(antreaDestinationStatsElementList) + len(antreaLabelsElementList)
312+
cpInput.NumExtraElements = len(antreaSourceStatsElementList) + len(antreaDestinationStatsElementList) + len(antreaLabelsElementList) +
313+
len(antreaFlowEndSecondsElementList) + len(antreaThroughputElementList) + len(antreaSourceThroughputElementList) + len(antreaDestinationThroughputElementList)
293314
var err error
294315
fa.collectingProcess, err = ipfix.NewIPFIXCollectingProcess(cpInput)
295316
return err
@@ -457,7 +478,6 @@ func (fa *flowAggregator) sendFlowKeyRecord(key ipfixintermediate.FlowKey, recor
457478
if err = fa.aggregationProcess.ResetStatElementsInRecord(record.Record); err != nil {
458479
return err
459480
}
460-
461481
klog.V(4).Infof("Data set sent successfully: %d Bytes sent", sentBytes)
462482
fa.numRecordsExported = fa.numRecordsExported + 1
463483
return nil
@@ -474,33 +494,21 @@ func (fa *flowAggregator) sendTemplateSet(isIPv6 bool) (int, error) {
474494
templateID = fa.templateIDv6
475495
}
476496
for _, ie := range ianaInfoElements {
477-
element, err := fa.registry.GetInfoElement(ie, ipfixregistry.IANAEnterpriseID)
478-
if err != nil {
479-
return 0, fmt.Errorf("%s not present. returned error: %v", ie, err)
480-
}
481-
ie, err := ipfixentities.DecodeAndCreateInfoElementWithValue(element, nil)
497+
ie, err := fa.createInfoElementForTemplateSet(ie, ipfixregistry.IANAEnterpriseID)
482498
if err != nil {
483499
return 0, err
484500
}
485501
elements = append(elements, ie)
486502
}
487503
for _, ie := range ianaReverseInfoElements {
488-
element, err := fa.registry.GetInfoElement(ie, ipfixregistry.IANAReversedEnterpriseID)
489-
if err != nil {
490-
return 0, fmt.Errorf("%s not present. returned error: %v", ie, err)
491-
}
492-
ie, err := ipfixentities.DecodeAndCreateInfoElementWithValue(element, nil)
504+
ie, err := fa.createInfoElementForTemplateSet(ie, ipfixregistry.IANAReversedEnterpriseID)
493505
if err != nil {
494506
return 0, err
495507
}
496508
elements = append(elements, ie)
497509
}
498510
for _, ie := range antreaInfoElements {
499-
element, err := fa.registry.GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID)
500-
if err != nil {
501-
return 0, fmt.Errorf("%s not present. returned error: %v", ie, err)
502-
}
503-
ie, err := ipfixentities.DecodeAndCreateInfoElementWithValue(element, nil)
511+
ie, err := fa.createInfoElementForTemplateSet(ie, ipfixregistry.AntreaEnterpriseID)
504512
if err != nil {
505513
return 0, err
506514
}
@@ -511,34 +519,52 @@ func (fa *flowAggregator) sendTemplateSet(isIPv6 bool) (int, error) {
511519
for i := range statsElementList {
512520
// Add Antrea source stats fields
513521
ieName := antreaSourceStatsElementList[i]
514-
element, err := fa.registry.GetInfoElement(ieName, ipfixregistry.AntreaEnterpriseID)
515-
if err != nil {
516-
return 0, fmt.Errorf("%s not present. returned error: %v", ieName, err)
517-
}
518-
ie, err := ipfixentities.DecodeAndCreateInfoElementWithValue(element, nil)
522+
ie, err := fa.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID)
519523
if err != nil {
520524
return 0, err
521525
}
522526
elements = append(elements, ie)
523527
// Add Antrea destination stats fields
524528
ieName = antreaDestinationStatsElementList[i]
525-
element, err = fa.registry.GetInfoElement(ieName, ipfixregistry.AntreaEnterpriseID)
529+
ie, err = fa.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID)
526530
if err != nil {
527-
return 0, fmt.Errorf("%s not present. returned error: %v", ieName, err)
531+
return 0, err
528532
}
529-
ie, err = ipfixentities.DecodeAndCreateInfoElementWithValue(element, nil)
533+
elements = append(elements, ie)
534+
}
535+
for _, ie := range antreaFlowEndSecondsElementList {
536+
ie, err := fa.createInfoElementForTemplateSet(ie, ipfixregistry.AntreaEnterpriseID)
537+
if err != nil {
538+
return 0, err
539+
}
540+
elements = append(elements, ie)
541+
}
542+
for i := range antreaThroughputElementList {
543+
// Add common throughput fields
544+
ieName := antreaThroughputElementList[i]
545+
ie, err := fa.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID)
546+
if err != nil {
547+
return 0, err
548+
}
549+
elements = append(elements, ie)
550+
// Add source node specific throughput fields
551+
ieName = antreaSourceThroughputElementList[i]
552+
ie, err = fa.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID)
553+
if err != nil {
554+
return 0, err
555+
}
556+
elements = append(elements, ie)
557+
// Add destination node specific throughput fields
558+
ieName = antreaDestinationThroughputElementList[i]
559+
ie, err = fa.createInfoElementForTemplateSet(ieName, ipfixregistry.AntreaEnterpriseID)
530560
if err != nil {
531561
return 0, err
532562
}
533563
elements = append(elements, ie)
534564
}
535565
if fa.includePodLabels {
536566
for _, ie := range antreaLabelsElementList {
537-
element, err := fa.registry.GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID)
538-
if err != nil {
539-
return 0, fmt.Errorf("error when getting InformationElement %s from registry: %v", ie, err)
540-
}
541-
ie, err := ipfixentities.DecodeAndCreateInfoElementWithValue(element, nil)
567+
ie, err := fa.createInfoElementForTemplateSet(ie, ipfixregistry.AntreaEnterpriseID)
542568
if err != nil {
543569
return 0, err
544570
}
@@ -668,3 +694,15 @@ func (fa *flowAggregator) GetRecordMetrics() querier.Metrics {
668694
NumConnToCollector: fa.collectingProcess.GetNumConnToCollector(),
669695
}
670696
}
697+
698+
func (fa *flowAggregator) createInfoElementForTemplateSet(ieName string, enterpriseID uint32) (ipfixentities.InfoElementWithValue, error) {
699+
element, err := fa.registry.GetInfoElement(ieName, enterpriseID)
700+
if err != nil {
701+
return nil, fmt.Errorf("%s not present. returned error: %v", ieName, err)
702+
}
703+
ie, err := ipfixentities.DecodeAndCreateInfoElementWithValue(element, nil)
704+
if err != nil {
705+
return nil, err
706+
}
707+
return ie, nil
708+
}

pkg/flowaggregator/flowaggregator_test.go

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -225,28 +225,40 @@ func TestFlowAggregator_sendTemplateSet(t *testing.T) {
225225
// Following consists of all elements that are in ianaInfoElements and antreaInfoElements (globals)
226226
// Only the element name is needed, other arguments have dummy values.
227227
elemList := make([]ipfixentities.InfoElementWithValue, 0)
228-
for i, ie := range ianaInfoElements {
228+
for _, ie := range ianaInfoElements {
229229
elemList = append(elemList, createElement(ie, ipfixregistry.IANAEnterpriseID))
230-
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAEnterpriseID).Return(elemList[i].GetInfoElement(), nil)
230+
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAEnterpriseID).Return(elemList[len(elemList)-1].GetInfoElement(), nil)
231231
}
232-
for i, ie := range ianaReverseInfoElements {
232+
for _, ie := range ianaReverseInfoElements {
233233
elemList = append(elemList, createElement(ie, ipfixregistry.IANAReversedEnterpriseID))
234-
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAReversedEnterpriseID).Return(elemList[i+len(ianaInfoElements)].GetInfoElement(), nil)
234+
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.IANAReversedEnterpriseID).Return(elemList[len(elemList)-1].GetInfoElement(), nil)
235235
}
236-
for i, ie := range antreaInfoElements {
236+
for _, ie := range antreaInfoElements {
237237
elemList = append(elemList, createElement(ie, ipfixregistry.AntreaEnterpriseID))
238-
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[i+len(ianaInfoElements)+len(ianaReverseInfoElements)].GetInfoElement(), nil)
238+
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[len(elemList)-1].GetInfoElement(), nil)
239239
}
240240
for i := range statsElementList {
241241
elemList = append(elemList, createElement(antreaSourceStatsElementList[i], ipfixregistry.AntreaEnterpriseID))
242-
mockIPFIXRegistry.EXPECT().GetInfoElement(antreaSourceStatsElementList[i], ipfixregistry.AntreaEnterpriseID).Return(elemList[i*2+len(ianaInfoElements)+len(ianaReverseInfoElements)+len(antreaInfoElements)].GetInfoElement(), nil)
242+
mockIPFIXRegistry.EXPECT().GetInfoElement(antreaSourceStatsElementList[i], ipfixregistry.AntreaEnterpriseID).Return(elemList[len(elemList)-1].GetInfoElement(), nil)
243243
elemList = append(elemList, createElement(antreaDestinationStatsElementList[i], ipfixregistry.AntreaEnterpriseID))
244-
mockIPFIXRegistry.EXPECT().GetInfoElement(antreaDestinationStatsElementList[i], ipfixregistry.AntreaEnterpriseID).Return(elemList[i*2+1+len(ianaInfoElements)+len(ianaReverseInfoElements)+len(antreaInfoElements)].GetInfoElement(), nil)
244+
mockIPFIXRegistry.EXPECT().GetInfoElement(antreaDestinationStatsElementList[i], ipfixregistry.AntreaEnterpriseID).Return(elemList[len(elemList)-1].GetInfoElement(), nil)
245+
}
246+
for _, ie := range antreaFlowEndSecondsElementList {
247+
elemList = append(elemList, createElement(ie, ipfixregistry.AntreaEnterpriseID))
248+
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[len(elemList)-1].GetInfoElement(), nil)
249+
}
250+
for i := range antreaThroughputElementList {
251+
elemList = append(elemList, createElement(antreaThroughputElementList[i], ipfixregistry.AntreaEnterpriseID))
252+
mockIPFIXRegistry.EXPECT().GetInfoElement(antreaThroughputElementList[i], ipfixregistry.AntreaEnterpriseID).Return(elemList[len(elemList)-1].GetInfoElement(), nil)
253+
elemList = append(elemList, createElement(antreaSourceThroughputElementList[i], ipfixregistry.AntreaEnterpriseID))
254+
mockIPFIXRegistry.EXPECT().GetInfoElement(antreaSourceThroughputElementList[i], ipfixregistry.AntreaEnterpriseID).Return(elemList[len(elemList)-1].GetInfoElement(), nil)
255+
elemList = append(elemList, createElement(antreaDestinationThroughputElementList[i], ipfixregistry.AntreaEnterpriseID))
256+
mockIPFIXRegistry.EXPECT().GetInfoElement(antreaDestinationThroughputElementList[i], ipfixregistry.AntreaEnterpriseID).Return(elemList[len(elemList)-1].GetInfoElement(), nil)
245257
}
246258
if tc.includePodLabels {
247-
for i, ie := range antreaLabelsElementList {
259+
for _, ie := range antreaLabelsElementList {
248260
elemList = append(elemList, createElement(ie, ipfixregistry.AntreaEnterpriseID))
249-
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[i+len(ianaInfoElements)+len(ianaReverseInfoElements)+len(antreaInfoElements)+len(antreaSourceStatsElementList)+len(antreaDestinationStatsElementList)].GetInfoElement(), nil)
261+
mockIPFIXRegistry.EXPECT().GetInfoElement(ie, ipfixregistry.AntreaEnterpriseID).Return(elemList[len(elemList)-1].GetInfoElement(), nil)
250262
}
251263
}
252264
mockTempSet.EXPECT().ResetSet()

plugins/octant/go.sum

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -973,7 +973,7 @@ github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17
973973
github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=
974974
github.com/vmware-tanzu/octant v0.24.0 h1:PMLU2QG6czSdCl6/xn7lHkHGi+Dv43rxL+AZL6sPJ24=
975975
github.com/vmware-tanzu/octant v0.24.0/go.mod h1:yK0Nu7wzzvV25/T8nf4NQFMUUA7sLcp0gk96CjOTUiQ=
976-
github.com/vmware/go-ipfix v0.5.10/go.mod h1:yzbG1rv+yJ8GeMrRm+MDhOV3akygNZUHLhC1pDoD2AY=
976+
github.com/vmware/go-ipfix v0.5.11/go.mod h1:yzbG1rv+yJ8GeMrRm+MDhOV3akygNZUHLhC1pDoD2AY=
977977
github.com/willf/bitset v1.1.11-0.20200630133818-d5bec3311243/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4=
978978
github.com/willf/bitset v1.1.11/go.mod h1:83CECat5yLh5zVOf4P1ErAgKA5UDvKtgyUABdr3+MjI=
979979
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=

0 commit comments

Comments
 (0)