Skip to content

Commit a0e2769

Browse files
authored
fix(sources): emit ComponentEventsDropped when source send is cancelled (vectordotdev#18859)
* fix(sources): emit ComponentEventsDropped when source send is cancelled * feedback * fix docs
1 parent 811b7f7 commit a0e2769

File tree

5 files changed

+169
-9
lines changed

5 files changed

+169
-9
lines changed

src/source_sender/mod.rs

Lines changed: 159 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ use std::{collections::HashMap, fmt};
55
use chrono::Utc;
66
use futures::{Stream, StreamExt};
77
use metrics::{register_histogram, Histogram};
8+
use tracing::Span;
89
use vector_buffers::topology::channel::{self, LimitedReceiver, LimitedSender};
10+
use vector_common::internal_event::{ComponentEventsDropped, UNINTENTIONAL};
911
#[cfg(test)]
1012
use vector_core::event::{into_event_stream, EventStatus};
1113
use vector_core::{
@@ -206,6 +208,9 @@ impl SourceSender {
206208
recv
207209
}
208210

211+
/// Send an event to the default output.
212+
///
213+
/// This internally handles emitting [EventsSent] and [ComponentEventsDropped] events.
209214
pub async fn send_event(&mut self, event: impl Into<EventArray>) -> Result<(), ClosedError> {
210215
self.inner
211216
.as_mut()
@@ -214,6 +219,9 @@ impl SourceSender {
214219
.await
215220
}
216221

222+
/// Send a stream of events to the default output.
223+
///
224+
/// This internally handles emitting [EventsSent] and [ComponentEventsDropped] events.
217225
pub async fn send_event_stream<S, E>(&mut self, events: S) -> Result<(), ClosedError>
218226
where
219227
S: Stream<Item = E> + Unpin,
@@ -226,10 +234,14 @@ impl SourceSender {
226234
.await
227235
}
228236

237+
/// Send a batch of events to the default output.
238+
///
239+
/// This internally handles emitting [EventsSent] and [ComponentEventsDropped] events.
229240
pub async fn send_batch<I, E>(&mut self, events: I) -> Result<(), ClosedError>
230241
where
231242
E: Into<Event> + ByteSizeOf,
232243
I: IntoIterator<Item = E>,
244+
<I as IntoIterator>::IntoIter: ExactSizeIterator,
233245
{
234246
self.inner
235247
.as_mut()
@@ -238,10 +250,14 @@ impl SourceSender {
238250
.await
239251
}
240252

253+
/// Send a batch of events event to a named output.
254+
///
255+
/// This internally handles emitting [EventsSent] and [ComponentEventsDropped] events.
241256
pub async fn send_batch_named<I, E>(&mut self, name: &str, events: I) -> Result<(), ClosedError>
242257
where
243258
E: Into<Event> + ByteSizeOf,
244259
I: IntoIterator<Item = E>,
260+
<I as IntoIterator>::IntoIter: ExactSizeIterator,
245261
{
246262
self.named_inners
247263
.get_mut(name)
@@ -251,6 +267,47 @@ impl SourceSender {
251267
}
252268
}
253269

270+
/// UnsentEvents tracks the number of events yet to be sent in the buffer. This is used to
271+
/// increment the appropriate counters when a future is not polled to completion. Particularly,
272+
/// this is known to happen in a Warp server when a client sends a new HTTP request on a TCP
273+
/// connection that already has a pending request.
274+
///
275+
/// If its internal count is greater than 0 when dropped, the appropriate [ComponentEventsDropped]
276+
/// event is emitted.
277+
struct UnsentEventCount {
278+
count: usize,
279+
span: Span,
280+
}
281+
282+
impl UnsentEventCount {
283+
fn new(count: usize) -> Self {
284+
Self {
285+
count,
286+
span: Span::current(),
287+
}
288+
}
289+
290+
fn decr(&mut self, count: usize) {
291+
self.count = self.count.saturating_sub(count);
292+
}
293+
294+
fn discard(&mut self) {
295+
self.count = 0;
296+
}
297+
}
298+
299+
impl Drop for UnsentEventCount {
300+
fn drop(&mut self) {
301+
if self.count > 0 {
302+
let _enter = self.span.enter();
303+
emit!(ComponentEventsDropped::<UNINTENTIONAL> {
304+
count: self.count,
305+
reason: "Source send cancelled."
306+
});
307+
}
308+
}
309+
}
310+
254311
#[derive(Clone)]
255312
struct Inner {
256313
inner: LimitedSender<EventArray>,
@@ -322,7 +379,15 @@ impl Inner {
322379
}
323380

324381
async fn send_event(&mut self, event: impl Into<EventArray>) -> Result<(), ClosedError> {
325-
self.send(event.into()).await
382+
let event: EventArray = event.into();
383+
// It's possible that the caller stops polling this future while it is blocked waiting
384+
// on `self.send()`. When that happens, we use `UnsentEventCount` to correctly emit
385+
// `ComponentEventsDropped` events.
386+
let count = event.len();
387+
let mut unsent_event_count = UnsentEventCount::new(count);
388+
let res = self.send(event).await;
389+
unsent_event_count.discard();
390+
res
326391
}
327392

328393
async fn send_event_stream<S, E>(&mut self, events: S) -> Result<(), ClosedError>
@@ -341,10 +406,22 @@ impl Inner {
341406
where
342407
E: Into<Event> + ByteSizeOf,
343408
I: IntoIterator<Item = E>,
409+
<I as IntoIterator>::IntoIter: ExactSizeIterator,
344410
{
411+
// It's possible that the caller stops polling this future while it is blocked waiting
412+
// on `self.send()`. When that happens, we use `UnsentEventCount` to correctly emit
413+
// `ComponentEventsDropped` events.
345414
let events = events.into_iter().map(Into::into);
415+
let mut unsent_event_count = UnsentEventCount::new(events.len());
346416
for events in array::events_into_arrays(events, Some(CHUNK_SIZE)) {
347-
self.send(events).await?;
417+
let count = events.len();
418+
self.send(events).await.map_err(|err| {
419+
// The unsent event count is discarded here because the caller emits the
420+
// `StreamClosedError`.
421+
unsent_event_count.discard();
422+
err
423+
})?;
424+
unsent_event_count.decr(count);
348425
}
349426
Ok(())
350427
}
@@ -394,6 +471,7 @@ fn get_timestamp_millis(value: &Value) -> Option<i64> {
394471
mod tests {
395472
use chrono::{DateTime, Duration};
396473
use rand::{thread_rng, Rng};
474+
use tokio::time::timeout;
397475
use vector_core::event::{LogEvent, Metric, MetricKind, MetricValue, TraceEvent};
398476
use vrl::event_path;
399477

@@ -483,4 +561,83 @@ mod tests {
483561
_ => panic!("source_lag_time_seconds has invalid type"),
484562
}
485563
}
564+
565+
#[tokio::test]
566+
async fn emits_component_discarded_events_total_for_send_event() {
567+
metrics::init_test();
568+
let (mut sender, _recv) = SourceSender::new_test_sender_with_buffer(1);
569+
570+
let event = Event::Metric(Metric::new(
571+
"name",
572+
MetricKind::Absolute,
573+
MetricValue::Gauge { value: 123.4 },
574+
));
575+
576+
// First send will succeed.
577+
sender
578+
.send_event(event.clone())
579+
.await
580+
.expect("First send should not fail");
581+
582+
// Second send will timeout, so the future will not be polled to completion.
583+
let res = timeout(
584+
std::time::Duration::from_millis(100),
585+
sender.send_event(event.clone()),
586+
)
587+
.await;
588+
assert!(res.is_err(), "Send should have timed out.");
589+
590+
let component_discarded_events_total = Controller::get()
591+
.expect("There must be a controller")
592+
.capture_metrics()
593+
.into_iter()
594+
.filter(|metric| metric.name() == "component_discarded_events_total")
595+
.collect::<Vec<_>>();
596+
assert_eq!(component_discarded_events_total.len(), 1);
597+
598+
let component_discarded_events_total = &component_discarded_events_total[0];
599+
let MetricValue::Counter { value } = component_discarded_events_total.value() else {
600+
panic!("component_discarded_events_total has invalid type")
601+
};
602+
assert_eq!(*value, 1.0);
603+
}
604+
605+
#[tokio::test]
606+
async fn emits_component_discarded_events_total_for_send_batch() {
607+
metrics::init_test();
608+
let (mut sender, _recv) = SourceSender::new_test_sender_with_buffer(1);
609+
610+
let expected_drop = 100;
611+
let events: Vec<Event> = (0..(CHUNK_SIZE + expected_drop))
612+
.map(|_| {
613+
Event::Metric(Metric::new(
614+
"name",
615+
MetricKind::Absolute,
616+
MetricValue::Gauge { value: 123.4 },
617+
))
618+
})
619+
.collect();
620+
621+
// `CHUNK_SIZE` events will be sent into buffer but then the future will not be polled to completion.
622+
let res = timeout(
623+
std::time::Duration::from_millis(100),
624+
sender.send_batch(events),
625+
)
626+
.await;
627+
assert!(res.is_err(), "Send should have timed out.");
628+
629+
let component_discarded_events_total = Controller::get()
630+
.expect("There must be a controller")
631+
.capture_metrics()
632+
.into_iter()
633+
.filter(|metric| metric.name() == "component_discarded_events_total")
634+
.collect::<Vec<_>>();
635+
assert_eq!(component_discarded_events_total.len(), 1);
636+
637+
let component_discarded_events_total = &component_discarded_events_total[0];
638+
let MetricValue::Counter { value } = component_discarded_events_total.value() else {
639+
panic!("component_discarded_events_total has invalid type")
640+
};
641+
assert_eq!(*value, expected_drop as f64);
642+
}
486643
}

src/sources/mongodb_metrics/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,13 +139,13 @@ impl SourceConfig for MongoDbMetricsConfig {
139139
while interval.next().await.is_some() {
140140
let start = Instant::now();
141141
let metrics = join_all(sources.iter().map(|mongodb| mongodb.collect())).await;
142-
let count = metrics.len();
143142
emit!(CollectionCompleted {
144143
start,
145144
end: Instant::now()
146145
});
147146

148-
let metrics = metrics.into_iter().flatten();
147+
let metrics: Vec<Metric> = metrics.into_iter().flatten().collect();
148+
let count = metrics.len();
149149

150150
if (cx.out.send_batch(metrics).await).is_err() {
151151
emit!(StreamClosedError { count });

src/sources/nginx_metrics/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,13 +127,13 @@ impl SourceConfig for NginxMetricsConfig {
127127
while interval.next().await.is_some() {
128128
let start = Instant::now();
129129
let metrics = join_all(sources.iter().map(|nginx| nginx.collect())).await;
130-
let count = metrics.len();
131130
emit!(CollectionCompleted {
132131
start,
133132
end: Instant::now()
134133
});
135134

136-
let metrics = metrics.into_iter().flatten();
135+
let metrics: Vec<Metric> = metrics.into_iter().flatten().collect();
136+
let count = metrics.len();
137137

138138
if (cx.out.send_batch(metrics).await).is_err() {
139139
emit!(StreamClosedError { count });

src/sources/postgresql_metrics.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -220,13 +220,14 @@ impl SourceConfig for PostgresqlMetricsConfig {
220220
while interval.next().await.is_some() {
221221
let start = Instant::now();
222222
let metrics = join_all(sources.iter_mut().map(|source| source.collect())).await;
223-
let count = metrics.len();
224223
emit!(CollectionCompleted {
225224
start,
226225
end: Instant::now()
227226
});
228227

229-
let metrics = metrics.into_iter().flatten();
228+
let metrics: Vec<Metric> = metrics.into_iter().flatten().collect();
229+
let count = metrics.len();
230+
230231
if (cx.out.send_batch(metrics).await).is_err() {
231232
emit!(StreamClosedError { count });
232233
return Err(());

src/topology/test/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -629,7 +629,9 @@ async fn topology_swap_transform_is_atomic() {
629629
}
630630
};
631631
let input = async move {
632-
in1.send_batch(iter::from_fn(events)).await.unwrap();
632+
in1.send_event_stream(stream::iter(iter::from_fn(events)))
633+
.await
634+
.unwrap();
633635
};
634636
let output = out1.for_each(move |_| {
635637
recv_counter.fetch_add(1, Ordering::Release);

0 commit comments

Comments
 (0)