Skip to content

Commit d3d45da

Browse files
Added multi-flow encoding for inline monitoring services on PTX
1 parent a69d90d commit d3d45da

File tree

1 file changed

+54
-10
lines changed

1 file changed

+54
-10
lines changed

src/netflow_plugin/ipfix_collector.cpp

Lines changed: 54 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -874,10 +874,10 @@ bool ipfix_record_to_flow(uint32_t record_type, uint32_t record_length, const ui
874874
// It's packet header as is in variable length encoding
875875
ipfix_inline_headers++;
876876

877-
// This packet is ended using IPFIX variable length encoding and it may have two possible ways of length encoding
878-
// https://datatracker.ietf.org/doc/html/rfc7011#section-7
877+
// This packet is ended using IPFIX variable length encoding and it may have two possible ways of length
878+
// encoding https://datatracker.ietf.org/doc/html/rfc7011#section-7
879879
if (flow_meta.variable_field_length_encoding == variable_length_encoding_t::single_byte ||
880-
flow_meta.variable_field_length_encoding == variable_length_encoding_t::two_byte) {
880+
flow_meta.variable_field_length_encoding == variable_length_encoding_t::two_byte) {
881881

882882
if (logger.getPriority() == log4cpp::Priority::DEBUG) {
883883
logger << log4cpp::Priority::DEBUG << "Packet header length: " << flow_meta.variable_field_length;
@@ -1241,6 +1241,7 @@ bool ipfix_flowset_to_store(const uint8_t* pkt,
12411241
ssize_t flowset_maximum_length,
12421242
const template_t* field_template,
12431243
uint32_t client_ipv4_address,
1244+
uint32_t& flowset_length,
12441245
const std::string& client_addres_in_string_format) {
12451246
simple_packet_t packet;
12461247
packet.source = NETFLOW;
@@ -1503,6 +1504,7 @@ bool process_ipfix_data(const uint8_t* pkt,
15031504
uint32_t source_id,
15041505
const std::string& client_addres_in_string_format,
15051506
uint32_t client_ipv4_address) {
1507+
ipfix_total_packets++;
15061508

15071509
const ipfix_data_flowset_header_t* flowset_header = (const ipfix_data_flowset_header_t*)pkt;
15081510

@@ -1541,19 +1543,57 @@ bool process_ipfix_data(const uint8_t* pkt,
15411543
if (field_template->type == netflow_template_type_t::Data) {
15421544

15431545
if (field_template->ipfix_variable_length_elements_used) {
1546+
// When we have variable length fields we need to use different logic which relies on flow length calculated during process of reading flow
1547+
15441548
// Get clean flowsets length to use it as limit for our parser
15451549
ssize_t current_flowset_length_no_header = flowset_length - sizeof(ipfix_data_flowset_header_t);
15461550

15471551
if (logger.getPriority() == log4cpp::Priority::DEBUG) {
15481552
logger << log4cpp::Priority::DEBUG << "IPFIX variable field element was used";
15491553
}
15501554

1551-
// TODO: This implementation is rather limited as we read only single flowset here
1552-
// In many cases we use this stuff for IPFIX Inline monitoring and it uses just single flowset but it may change in future
1553-
// TODO: Juniper PTX uses multiple flowsets per packet
1554-
ipfix_flowset_to_store(pkt + sizeof(ipfix_data_flowset_header_t), ipfix_header, current_flowset_length_no_header,
1555-
field_template, client_ipv4_address, client_addres_in_string_format);
1555+
// Where all flows start in packet
1556+
const uint8_t* flow_section_start = pkt + sizeof(ipfix_data_flowset_header_t);
1557+
1558+
// Offset where flow starts
1559+
uint32_t flow_offset = 0;
1560+
1561+
// How much data we have in current flow set
1562+
uint32_t maximum_data_available_to_read = current_flowset_length_no_header;
1563+
1564+
// Run this loop until flow_offset reaches end of packet
1565+
while (flow_offset < current_flowset_length_no_header) {
1566+
// When variable fields present we need to read all fields before getting total length of flow
1567+
uint32_t read_flow_length = 0;
1568+
1569+
// In many cases we have just single flow per UDP packet but Juniper PTX uses multiple flows per packet
1570+
bool floset_processing_result =
1571+
ipfix_flowset_to_store(flow_section_start + flow_offset, ipfix_header, maximum_data_available_to_read,
1572+
field_template, client_ipv4_address, read_flow_length, client_addres_in_string_format);
1573+
1574+
// If we cannot process this flowset then we must stop processing here because we need correct value of flowset_length to jump to next record
1575+
if (!floset_processing_result) {
1576+
return false;
1577+
}
1578+
1579+
if (logger.getPriority() == log4cpp::Priority::DEBUG) {
1580+
logger << log4cpp::Priority::DEBUG << "Total flow length: " << read_flow_length;
1581+
}
1582+
1583+
// Shift flowset offset by length of data read in this iteration
1584+
flow_offset += read_flow_length;
1585+
1586+
// And in the same time reduce amount of available to read data
1587+
maximum_data_available_to_read -= read_flow_length;
1588+
}
15561589
} else {
1590+
// Check that template total length is not zero as we're going to divide by it
1591+
if (field_template->total_length == 0) {
1592+
logger << log4cpp::Priority::ERROR << "Zero IPFIX template length is not valid "
1593+
<< "client " << client_addres_in_string_format << " source_id: " << source_id;
1594+
1595+
return false;
1596+
}
15571597

15581598
// This logic is pretty reliable but it works only if we do not have variable sized fields in template
15591599
// In that case it's completely not applicable
@@ -1593,10 +1633,14 @@ bool process_ipfix_data(const uint8_t* pkt,
15931633
}
15941634

15951635
for (uint32_t i = 0; i < number_flowsets; i++) {
1636+
// We do not use it as we can use total_length directly instead of calculating it
1637+
uint32_t flowset_length = 0;
1638+
15961639
// We apply constraint that maximum potential length of flow set cannot exceed length of all fields in
15971640
// template In this case we have no fields with variable length which may affect it and we're safe
1641+
// We do not check response code as we can jump to next flow even if previous one failed
15981642
ipfix_flowset_to_store(pkt + offset, ipfix_header, field_template->total_length, field_template,
1599-
client_ipv4_address, client_addres_in_string_format);
1643+
client_ipv4_address, flowset_length, client_addres_in_string_format);
16001644

16011645
offset += field_template->total_length;
16021646
}
@@ -1692,7 +1736,7 @@ bool process_ipfix_packet(const uint8_t* packet,
16921736
uint32_t flowset_length = ntohs(flowset->length);
16931737

16941738
// One more check to ensure that we have enough space in packet to read whole flowset
1695-
if (offset + flowset_length > ipfix_packet_length) {
1739+
if (offset + flowset_length > ipfix_packet_length) {
16961740
logger << log4cpp::Priority::ERROR
16971741
<< "We tried to read from address outside IPFIX packet flowset agent IP: " << client_addres_in_string_format;
16981742
return false;

0 commit comments

Comments
 (0)