Skip to content

Commit fd17b26

Browse files
prognantjszwedko
andauthored
chore(datadog_traces sink): Compute APM stats (#12806)
* +code path for APM stats computation * +aggregator skeleton * +placeholders for full sketches * +switch to msgpack for stats * +minor adjustment * +adjust encoding to exactly match intake expectation * +fix sketch offset * +fmt * +adjusments & test * +address reviews & fix ci * +make check-event happy * +side cases & inline doc * +additional doc * +additional comments Co-authored-by: Jesse Szwedko <[email protected]>
1 parent f4eb365 commit fd17b26

File tree

10 files changed

+877
-49
lines changed

10 files changed

+877
-49
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -673,7 +673,7 @@ sinks-datadog_archives = ["sinks-aws_s3", "sinks-azure_blob", "sinks-gcp"]
673673
sinks-datadog_events = []
674674
sinks-datadog_logs = []
675675
sinks-datadog_metrics = ["protobuf-build", "sinks-azure_blob"]
676-
sinks-datadog_traces = ["protobuf-build"]
676+
sinks-datadog_traces = ["protobuf-build", "rmpv", "rmp-serde", "serde_bytes"]
677677
sinks-elasticsearch = ["aws-core", "aws-sigv4", "transforms-metric_to_log"]
678678
sinks-file = ["async-compression"]
679679
sinks-gcp = ["base64", "gcp", "gouth"]

build.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ fn main() {
102102
println!("cargo:rerun-if-changed=proto/dd_trace.proto");
103103
println!("cargo:rerun-if-changed=proto/dnstap.proto");
104104
println!("cargo:rerun-if-changed=proto/ddsketch.proto");
105+
println!("cargo:rerun-if-changed=proto/ddsketch_full.proto");
105106
println!("cargo:rerun-if-changed=proto/google/pubsub/v1/pubsub.proto");
106107
println!("cargo:rerun-if-changed=proto/vector.proto");
107108

@@ -115,6 +116,7 @@ fn main() {
115116
"lib/vector-core/proto/event.proto",
116117
"proto/dnstap.proto",
117118
"proto/ddsketch.proto",
119+
"proto/ddsketch_full.proto",
118120
"proto/dd_trace.proto",
119121
"proto/google/pubsub/v1/pubsub.proto",
120122
"proto/vector.proto",

lib/vector-core/src/metrics/ddsketch.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,14 @@ impl AgentDDSketch {
259259
})
260260
}
261261

262+
pub fn gamma(&self) -> f64 {
263+
self.config.gamma_v
264+
}
265+
266+
pub fn bin_index_offset(&self) -> i32 {
267+
self.config.norm_bias
268+
}
269+
262270
#[allow(dead_code)]
263271
fn bin_count(&self) -> usize {
264272
self.bins.len()

proto/ddsketch_full.proto

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
// Copied from https://github.com/DataDog/sketches-go/blob/0a92170/ddsketch/pb/ddsketch.proto
2+
3+
syntax = "proto3";
4+
5+
package ddsketch_full;
6+
7+
// A DDSketch is essentially a histogram that partitions the range of positive values into an infinite number of
8+
// indexed bins whose size grows exponentially. It keeps track of the number of values (or possibly floating-point
9+
// weights) added to each bin. Negative values are partitioned like positive values, symmetrically to zero.
10+
// The value zero as well as its close neighborhood that would be mapped to extreme bin indexes is mapped to a specific
11+
// counter.
12+
message DDSketch {
13+
// The mapping between positive values and the bin indexes they belong to.
14+
IndexMapping mapping = 1;
15+
16+
// The store for keeping track of positive values.
17+
Store positiveValues = 2;
18+
19+
// The store for keeping track of negative values. A negative value v is mapped using its positive opposite -v.
20+
Store negativeValues = 3;
21+
22+
// The count for the value zero and its close neighborhood (whose width depends on the mapping).
23+
double zeroCount = 4;
24+
}
25+
26+
// How to map positive values to the bins they belong to.
27+
message IndexMapping {
28+
// The gamma parameter of the mapping, such that bin index that a value v belongs to is roughly equal to
29+
// log(v)/log(gamma).
30+
double gamma = 1;
31+
32+
// An offset that can be used to shift all bin indexes.
33+
double indexOffset = 2;
34+
35+
// To speed up the computation of the index a value belongs to, the computation of the log may be approximated using
36+
// the fact that the log to the base 2 of powers of 2 can be computed at a low cost from the binary representation of
37+
// the input value. Other values can be approximated by interpolating between successive powers of 2 (linearly,
38+
// quadratically or cubically).
39+
// NONE means that the log is to be computed exactly (no interpolation).
40+
Interpolation interpolation = 3;
41+
enum Interpolation {
42+
NONE = 0;
43+
LINEAR = 1;
44+
QUADRATIC = 2;
45+
CUBIC = 3;
46+
}
47+
}
48+
49+
// A Store maps bin indexes to their respective counts.
50+
// Counts can be encoded sparsely using binCounts, but also in a contiguous way using contiguousBinCounts and
51+
// contiguousBinIndexOffset. Given that non-empty bins are in practice usually contiguous or close to one another, the
52+
// latter contiguous encoding method is usually more efficient than the sparse one.
53+
// Both encoding methods can be used conjointly. If a bin appears in both the sparse and the contiguous encodings, its
54+
// count value is the sum of the counts in each encodings.
55+
message Store {
56+
// The bin counts, encoded sparsely.
57+
map<sint32, double> binCounts = 1;
58+
59+
// The bin counts, encoded contiguously. The values of contiguousBinCounts are the counts for the bins of indexes
60+
// o, o+1, o+2, etc., where o is contiguousBinIndexOffset.
61+
repeated double contiguousBinCounts = 2 [packed = true];
62+
sint32 contiguousBinIndexOffset = 3;
63+
}

src/sinks/datadog/traces/config.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,6 @@ pub enum DatadogTracesEndpoint {
9898
/// Store traces & APM stats endpoints actual URIs.
9999
pub struct DatadogTracesEndpointConfiguration {
100100
traces_endpoint: Uri,
101-
// Unused so far
102101
stats_endpoint: Uri,
103102
}
104103

@@ -186,7 +185,6 @@ impl SinkConfig for DatadogTracesConfig {
186185
}
187186

188187
fn input(&self) -> Input {
189-
// Metric will be accepted as soon as APM stats will be handled
190188
Input::trace()
191189
}
192190

src/sinks/datadog/traces/integration_tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ async fn to_real_traces_endpoint() {
2727
let (batch, receiver) = BatchNotifier::new_with_receiver();
2828

2929
let trace = vec![Event::Trace(
30-
simple_trace_event().with_batch_notifier(&batch),
30+
simple_trace_event("a_trace".to_string()).with_batch_notifier(&batch),
3131
)];
3232

3333
let stream = map_event_batch_stream(stream::iter(trace), Some(batch));

src/sinks/datadog/traces/mod.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,15 @@ mod config;
1515
mod request_builder;
1616
mod service;
1717
mod sink;
18+
mod stats;
19+
20+
pub(crate) mod ddsketch_full {
21+
include!(concat!(env!("OUT_DIR"), "/ddsketch_full.rs"));
22+
}
23+
24+
pub(crate) mod dd_proto {
25+
include!(concat!(env!("OUT_DIR"), "/dd_trace.rs"));
26+
}
1827

1928
use crate::{config::SinkDescription, sinks::datadog::traces::config::DatadogTracesConfig};
2029

src/sinks/datadog/traces/request_builder.rs

Lines changed: 55 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,21 @@ use std::{collections::BTreeMap, io::Write, sync::Arc};
22

33
use bytes::Bytes;
44
use prost::Message;
5+
use rmp_serde;
56
use snafu::Snafu;
67
use vector_core::event::{EventFinalizers, Finalizable};
78

89
use super::{
910
config::{DatadogTracesEndpoint, DatadogTracesEndpointConfiguration},
11+
dd_proto,
1012
service::TraceApiRequest,
13+
sink::PartitionKey,
14+
stats,
1115
};
1216
use crate::{
1317
event::{Event, TraceEvent, Value},
14-
sinks::{
15-
datadog::traces::sink::PartitionKey,
16-
util::{Compression, Compressor, IncrementalRequestBuilder},
17-
},
18+
sinks::util::{Compression, Compressor, IncrementalRequestBuilder},
1819
};
19-
mod dd_proto {
20-
include!(concat!(env!("OUT_DIR"), "/dd_trace.rs"));
21-
}
2220

2321
#[derive(Debug, Snafu)]
2422
pub enum RequestBuilderError {
@@ -79,6 +77,7 @@ pub struct RequestMetadata {
7977
endpoint: DatadogTracesEndpoint,
8078
finalizers: EventFinalizers,
8179
uncompressed_size: usize,
80+
content_type: String,
8281
}
8382

8483
impl IncrementalRequestBuilder<(PartitionKey, Vec<Event>)> for DatadogTracesRequestBuilder {
@@ -91,14 +90,21 @@ impl IncrementalRequestBuilder<(PartitionKey, Vec<Event>)> for DatadogTracesRequ
9190
&mut self,
9291
input: (PartitionKey, Vec<Event>),
9392
) -> Vec<Result<(Self::Metadata, Self::Payload), Self::Error>> {
94-
let (mut key, events) = input;
93+
let (key, events) = input;
9594
let mut results = Vec::new();
9695
let n = events.len();
97-
9896
let traces_event = events
9997
.into_iter()
10098
.filter_map(|e| e.try_into_trace())
101-
.collect();
99+
.collect::<Vec<TraceEvent>>();
100+
101+
results.push(build_apm_stats_request(
102+
&key,
103+
&traces_event,
104+
self.compression,
105+
&self.api_key,
106+
));
107+
102108
self.trace_encoder
103109
.encode_trace(&key, traces_event)
104110
.into_iter()
@@ -108,12 +114,13 @@ impl IncrementalRequestBuilder<(PartitionKey, Vec<Event>)> for DatadogTracesRequ
108114
let metadata = RequestMetadata {
109115
api_key: key
110116
.api_key
111-
.take()
117+
.clone()
112118
.unwrap_or_else(|| Arc::clone(&self.api_key)),
113119
batch_size: n,
114120
endpoint: DatadogTracesEndpoint::Traces,
115121
finalizers: processed.take_finalizers(),
116122
uncompressed_size,
123+
content_type: "application/x-protobuf".to_string(),
117124
};
118125
let mut compressor = Compressor::from(self.compression);
119126
match compressor.write_all(&payload) {
@@ -136,10 +143,7 @@ impl IncrementalRequestBuilder<(PartitionKey, Vec<Event>)> for DatadogTracesRequ
136143

137144
fn build_request(&mut self, metadata: Self::Metadata, payload: Self::Payload) -> Self::Request {
138145
let mut headers = BTreeMap::<String, String>::new();
139-
headers.insert(
140-
"Content-Type".to_string(),
141-
"application/x-protobuf".to_string(),
142-
);
146+
headers.insert("Content-Type".to_string(), metadata.content_type);
143147
headers.insert("DD-API-KEY".to_string(), metadata.api_key.to_string());
144148
if let Some(ce) = self.compression.content_encoding() {
145149
headers.insert("Content-Encoding".to_string(), ce.to_string());
@@ -391,3 +395,39 @@ impl DatadogTracesEncoder {
391395
}
392396
}
393397
}
398+
399+
fn build_apm_stats_request(
400+
key: &PartitionKey,
401+
events: &[TraceEvent],
402+
compression: Compression,
403+
default_api_key: &Arc<str>,
404+
) -> Result<(RequestMetadata, Bytes), RequestBuilderError> {
405+
let payload = stats::compute_apm_stats(key, events);
406+
let encoded_payload =
407+
rmp_serde::to_vec_named(&payload).map_err(|e| RequestBuilderError::FailedToEncode {
408+
message: "APM stats encoding failed.",
409+
reason: e.to_string(),
410+
dropped_events: 0,
411+
})?;
412+
let uncompressed_size = encoded_payload.len();
413+
let metadata = RequestMetadata {
414+
api_key: key
415+
.api_key
416+
.clone()
417+
.unwrap_or_else(|| Arc::clone(default_api_key)),
418+
batch_size: 0,
419+
endpoint: DatadogTracesEndpoint::APMStats,
420+
finalizers: EventFinalizers::default(),
421+
uncompressed_size,
422+
content_type: "application/msgpack".to_string(),
423+
};
424+
let mut compressor = Compressor::from(compression);
425+
match compressor.write_all(&encoded_payload) {
426+
Ok(()) => Ok((metadata, compressor.into_inner().freeze())),
427+
Err(e) => Err(RequestBuilderError::FailedToEncode {
428+
message: "APM stats payload compression failed.",
429+
reason: e.to_string(),
430+
dropped_events: 0,
431+
}),
432+
}
433+
}

0 commit comments

Comments
 (0)