Skip to content

Commit fea5344

Browse files
committed
direct_consumer: clarity fixes
The results of partitioning were incorrectly named in filter_stale_subscriptions. Fix the names and additionally pull the iterators from the subspan rather than doing wasteful (and dangerous) iterator math.
1 parent b9d0492 commit fea5344

1 file changed

Lines changed: 6 additions & 7 deletions

File tree

src/v/kafka/client/direct_consumer/direct_consumer.cc

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ direct_consumer::fetch_next(std::chrono::milliseconds timeout) {
8181
} catch (ss::condition_variable_timed_out&) {
8282
co_return chunked_vector<fetched_topic_data>{};
8383
}
84+
// fallthrough loop case
8485
co_return chunked_vector<fetched_topic_data>{};
8586
}
8687

@@ -90,7 +91,7 @@ void direct_consumer::filter_stale_subscriptions(
9091
for (auto& topic_data : responses_to_filter) {
9192
auto& partition_data = topic_data.partitions;
9293

93-
auto non_stale_subsegment = std::ranges::partition(
94+
auto stale_subsegment = std::ranges::partition(
9495
partition_data,
9596
[this, &topic_data](const fetched_partition_data& data) mutable {
9697
auto maybe_current_subscription_epoch = find_subscription_epoch(
@@ -113,20 +114,18 @@ void direct_consumer::filter_stale_subscriptions(
113114
return !is_stale;
114115
});
115116

116-
auto erase_iterator = partition_data.end()
117-
- non_stale_subsegment.size();
118-
117+
// erase everything that is stale
118+
auto erase_iterator = stale_subsegment.begin();
119119
partition_data.erase_to_end(erase_iterator);
120120
}
121121

122122
// remove newly empty topics
123-
auto non_empty_subsegment = std::ranges::partition(
123+
auto empty_subsegment = std::ranges::partition(
124124
responses_to_filter, [](const fetched_topic_data& topic_data) {
125125
return !topic_data.partitions.empty();
126126
});
127127

128-
responses_to_filter.erase_to_end(
129-
responses_to_filter.end() - non_empty_subsegment.size());
128+
responses_to_filter.erase_to_end(empty_subsegment.begin());
130129
}
131130

132131
void direct_consumer::update_offsets_filter_redundant_fetches(

0 commit comments

Comments
 (0)