Skip to content

Commit 2d827be

Browse files
committed
ref(profiling): Infer profile type from item header
1 parent 771a8d0 commit 2d827be

File tree

6 files changed

+181
-73
lines changed

6 files changed

+181
-73
lines changed

relay-profiling/src/error.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,12 @@ pub enum ProfileError {
1010
InvalidBase64Value,
1111
#[error("invalid sampled profile")]
1212
InvalidSampledProfile,
13+
/// Error associated with an invalid [`ProfileType`](crate::ProfileType).
14+
///
15+
/// Error is currently emitted when the inferred profile type from the payload
16+
/// does not match the profile type inferred from the envelope item headers.
17+
#[error("profile type invalid or mismatched")]
18+
InvalidProfileType,
1319
#[error("cannot serialize payload")]
1420
CannotSerializePayload,
1521
#[error("not enough samples")]

relay-profiling/src/lib.rs

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -82,12 +82,31 @@ const MAX_PROFILE_CHUNK_DURATION: Duration = Duration::from_secs(66);
8282
/// Same format as event IDs.
8383
pub type ProfileId = EventId;
8484

85-
#[derive(Debug, Clone, Copy)]
85+
/// Determines the type/use of a [`ProfileChunk`].
86+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8687
pub enum ProfileType {
88+
/// A backend profile.
8789
Backend,
90+
/// A UI profile.
8891
Ui,
8992
}
9093

94+
impl ProfileType {
95+
/// Converts a platform to a [`ProfileType`].
96+
///
97+
/// The profile type is currently determined based on the contained profile
98+
/// platform. It determines the data category this profile chunk belongs to.
99+
///
100+
/// This needs to be synchronized with the implementation in Sentry:
101+
/// <https://github.com/getsentry/sentry/blob/ed2e1c8bcd0d633e6f828fcfbeefbbdd98ef3dba/src/sentry/profiles/task.py#L995>
102+
pub fn from_platform(platform: &str) -> Self {
103+
match platform {
104+
"cocoa" | "android" | "javascript" => Self::Ui,
105+
_ => Self::Backend,
106+
}
107+
}
108+
}
109+
91110
#[derive(Debug, Deserialize)]
92111
struct MinimalProfile {
93112
#[serde(alias = "profile_id", alias = "chunk_id")]
@@ -307,16 +326,9 @@ impl ProfileChunk {
307326

308327
/// Returns the [`ProfileType`] this chunk belongs to.
309328
///
310-
/// The profile type is currently determined based on the contained profile
311-
/// platform. It determines the data category this profile chunk belongs to.
312-
///
313-
/// This needs to be synchronized with the implementation in Sentry:
314-
/// <https://github.com/getsentry/sentry/blob/ed2e1c8bcd0d633e6f828fcfbeefbbdd98ef3dba/src/sentry/profiles/task.py#L995>
329+
/// This is currently determined from the platform via [`ProfileType::from_platform`].
315330
pub fn profile_type(&self) -> ProfileType {
316-
match self.profile.platform.as_str() {
317-
"cocoa" | "android" | "javascript" => ProfileType::Ui,
318-
_ => ProfileType::Backend,
319-
}
331+
ProfileType::from_platform(&self.profile.platform)
320332
}
321333

322334
/// Applies inbound filters to the profile chunk.

relay-profiling/src/outcomes.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ pub fn discard_reason(err: ProfileError) -> &'static str {
88
ProfileError::InvalidJson(_) => "profiling_invalid_json",
99
ProfileError::InvalidSampledProfile => "profiling_invalid_sampled_profile",
1010
ProfileError::InvalidTransactionMetadata => "profiling_invalid_transaction_metadata",
11+
ProfileError::InvalidProfileType => "profiling_invalid_profile_type",
1112
ProfileError::MalformedSamples => "profiling_malformed_samples",
1213
ProfileError::MalformedStacks => "profiling_malformed_stacks",
1314
ProfileError::MissingProfileMetadata => "profiling_invalid_profile_metadata",

relay-server/src/envelope.rs

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -504,21 +504,32 @@ pub struct ItemHeaders {
504504
///
505505
/// Can be omitted if the item does not contain new lines. In this case, the item payload is
506506
/// parsed until the first newline is encountered.
507-
#[serde(default, skip_serializing_if = "Option::is_none")]
507+
#[serde(skip_serializing_if = "Option::is_none")]
508508
length: Option<u32>,
509509

510510
/// If this is an attachment item, this may contain the attachment type.
511-
#[serde(default, skip_serializing_if = "Option::is_none")]
511+
#[serde(skip_serializing_if = "Option::is_none")]
512512
attachment_type: Option<AttachmentType>,
513513

514514
/// Content type of the payload.
515-
#[serde(default, skip_serializing_if = "Option::is_none")]
515+
#[serde(skip_serializing_if = "Option::is_none")]
516516
content_type: Option<ContentType>,
517517

518518
/// If this is an attachment item, this may contain the original file name.
519-
#[serde(default, skip_serializing_if = "Option::is_none")]
519+
#[serde(skip_serializing_if = "Option::is_none")]
520520
filename: Option<String>,
521521

522+
/// The platform this item was produced for.
523+
///
524+
/// Currently only used for [`ItemType::ProfileChunk`].
525+
/// It contains the same platform as specified in the profile chunk payload,
526+
/// hoisted into the header to be able to determine the correct data category.
527+
///
528+
/// This is currently considered optional for profile chunks, but may change
529+
/// to required in the future.
530+
#[serde(skip_serializing_if = "Option::is_none")]
531+
platform: Option<String>,
532+
522533
/// The routing_hint may be used to specify how the envelpope should be routed in when
523534
/// published to kafka.
524535
///
@@ -681,6 +692,7 @@ impl Item {
681692
fully_normalized: false,
682693
ingest_span_in_eap: false,
683694
profile_type: None,
695+
platform: None,
684696
},
685697
payload: Bytes::new(),
686698
}
@@ -733,7 +745,7 @@ impl Item {
733745
ItemType::Span | ItemType::OtelSpan => smallvec![(DataCategory::Span, 1)],
734746
// NOTE: semantically wrong, but too expensive to parse.
735747
ItemType::OtelTracesData => smallvec![(DataCategory::Span, 1)],
736-
ItemType::ProfileChunk => match self.headers.profile_type {
748+
ItemType::ProfileChunk => match self.profile_type() {
737749
Some(ProfileType::Backend) => smallvec![(DataCategory::ProfileChunk, 1)],
738750
Some(ProfileType::Ui) => smallvec![(DataCategory::ProfileChunkUi, 1)],
739751
None => smallvec![],
@@ -902,9 +914,23 @@ impl Item {
902914
self.headers.ingest_span_in_eap = ingest_span_in_eap;
903915
}
904916

917+
/// Returns the associated platform.
918+
///
919+
/// Note: this is currently only used for [`ItemType::ProfileChunk`].
920+
pub fn platform(&self) -> Option<&str> {
921+
self.headers.platform.as_deref()
922+
}
923+
905924
/// Returns the associated profile type of a profile chunk.
925+
///
926+
/// This primarily uses the profile type set via [`Self::set_profile_type`],
927+
/// but if not set, it infers the [`ProfileType`] from the [`Self::platform`].
928+
///
929+
/// Returns `None`, if neither source is available.
906930
pub fn profile_type(&self) -> Option<ProfileType> {
907-
self.headers.profile_type
931+
self.headers
932+
.profile_type
933+
.or_else(|| self.platform().map(ProfileType::from_platform))
908934
}
909935

910936
/// Set the profile type of the profile chunk.

relay-server/src/services/processor/profile_chunk.rs

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,29 @@ pub fn process(
5959
Ok(chunk) => chunk,
6060
Err(err) => return error_to_action(err),
6161
};
62-
// Important: set the profile type to get outcomes in the correct category.
63-
item.set_profile_type(chunk.profile_type());
62+
63+
// Validate the item inferred profile type with the one from the payload,
64+
// or if missing set it.
65+
//
66+
// This is currently necessary to ensure profile chunks are emitted in the correct
67+
// data category, as well as rate limited with the correct data category.
68+
//
69+
// In the future we plan to make the profile type on the item header a necessity.
70+
// For more context see also: <https://github.com/getsentry/relay/pull/4595>.
71+
match item.profile_type() {
72+
Some(profile_type) => {
73+
// Validate the profile type inferred from the item header (either set before
74+
// or from the platform) against the profile type from the parsed chunk itself.
75+
if profile_type != chunk.profile_type() {
76+
return error_to_action(relay_profiling::ProfileError::InvalidProfileType);
77+
}
78+
}
79+
None => {
80+
// Important: set the profile type to get outcomes in the correct category,
81+
// if there isn't already one on the profile.
82+
item.set_profile_type(chunk.profile_type());
83+
}
84+
}
6485

6586
if let Err(err) = chunk.filter(client_ip, filter_settings, global_config) {
6687
return error_to_action(err);

tests/integration/test_profile_chunks.py

Lines changed: 97 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,23 @@
1010
RELAY_ROOT = Path(__file__).parent.parent.parent
1111

1212

13+
TEST_CONFIG = {
14+
"outcomes": {
15+
"emit_outcomes": True,
16+
"batch_size": 1,
17+
"batch_interval": 1,
18+
"aggregator": {
19+
"bucket_interval": 1,
20+
"flush_interval": 1,
21+
},
22+
},
23+
"aggregator": {
24+
"bucket_interval": 1,
25+
"initial_delay": 0,
26+
},
27+
}
28+
29+
1330
@pytest.mark.parametrize("num_intermediate_relays", [0, 1, 2])
1431
def test_profile_chunk_outcomes(
1532
mini_sentry,
@@ -35,29 +52,13 @@ def test_profile_chunk_outcomes(
3552
project_config.setdefault("features", []).append(
3653
"organizations:continuous-profiling"
3754
)
38-
config = {
39-
"outcomes": {
40-
"emit_outcomes": True,
41-
"batch_size": 1,
42-
"batch_interval": 1,
43-
"aggregator": {
44-
"bucket_interval": 1,
45-
"flush_interval": 1,
46-
},
47-
"source": "processing-relay",
48-
},
49-
"aggregator": {
50-
"bucket_interval": 1,
51-
"initial_delay": 0,
52-
},
53-
}
5455

5556
# The innermost Relay needs to be in processing mode
56-
upstream = relay_with_processing(config)
57+
upstream = relay_with_processing(TEST_CONFIG)
5758

5859
# build a chain of relays
5960
for i in range(num_intermediate_relays):
60-
config = deepcopy(config)
61+
config = deepcopy(TEST_CONFIG)
6162
if i == 0:
6263
# Emulate a PoP Relay
6364
config["outcomes"]["source"] = "pop-relay"
@@ -103,24 +104,7 @@ def test_profile_chunk_outcomes_invalid(
103104
"organizations:continuous-profiling"
104105
)
105106

106-
config = {
107-
"outcomes": {
108-
"emit_outcomes": True,
109-
"batch_size": 1,
110-
"batch_interval": 1,
111-
"aggregator": {
112-
"bucket_interval": 1,
113-
"flush_interval": 1,
114-
},
115-
"source": "pop-relay",
116-
},
117-
"aggregator": {
118-
"bucket_interval": 1,
119-
"initial_delay": 0,
120-
},
121-
}
122-
123-
upstream = relay_with_processing(config)
107+
upstream = relay_with_processing(TEST_CONFIG)
124108

125109
envelope = Envelope()
126110
payload = {
@@ -144,18 +128,19 @@ def test_profile_chunk_outcomes_invalid(
144128
"project_id": 42,
145129
"quantity": 1,
146130
"reason": "profiling_platform_not_supported",
147-
"source": "pop-relay",
148131
},
149132
]
150133

151134
profiles_consumer.assert_empty()
152135

153136

137+
@pytest.mark.parametrize("item_header_platform", [None, "cocoa"])
154138
def test_profile_chunk_outcomes_rate_limited(
155139
mini_sentry,
156140
relay_with_processing,
157141
outcomes_consumer,
158142
profiles_consumer,
143+
item_header_platform,
159144
):
160145
"""
161146
Tests that Relay reports correct outcomes when profile chunks are rate limited.
@@ -186,23 +171,7 @@ def test_profile_chunk_outcomes_rate_limited(
186171
}
187172
]
188173

189-
config = {
190-
"outcomes": {
191-
"emit_outcomes": True,
192-
"batch_size": 1,
193-
"batch_interval": 1,
194-
"aggregator": {
195-
"bucket_interval": 1,
196-
"flush_interval": 0,
197-
},
198-
},
199-
"aggregator": {
200-
"bucket_interval": 1,
201-
"initial_delay": 0,
202-
},
203-
}
204-
205-
upstream = relay_with_processing(config)
174+
upstream = relay_with_processing(TEST_CONFIG)
206175

207176
# Load a valid profile chunk from test fixtures
208177
with open(
@@ -213,7 +182,13 @@ def test_profile_chunk_outcomes_rate_limited(
213182

214183
# Create and send envelope containing the profile chunk
215184
envelope = Envelope()
216-
envelope.add_item(Item(payload=PayloadRef(bytes=profile), type="profile_chunk"))
185+
envelope.add_item(
186+
Item(
187+
payload=PayloadRef(bytes=profile),
188+
type="profile_chunk",
189+
headers={"platform": item_header_platform},
190+
)
191+
)
217192
upstream.send_envelope(project_id, envelope)
218193

219194
# Verify the rate limited outcome was emitted with correct properties
@@ -235,3 +210,70 @@ def test_profile_chunk_outcomes_rate_limited(
235210

236211
# Verify no profiles were forwarded to the consumer
237212
profiles_consumer.assert_empty()
213+
214+
215+
@pytest.mark.parametrize(
216+
"platform, category",
217+
[
218+
("cocoa", "profile_chunk_ui"),
219+
("node", "profile_chunk"),
220+
(None, "profile_chunk"), # Special case, currently this will forward
221+
],
222+
)
223+
def test_profile_chunk_outcomes_rate_limited_fast(
224+
mini_sentry,
225+
relay,
226+
platform,
227+
category,
228+
):
229+
"""
230+
Tests that Relay reports correct outcomes when profile chunks are rate limited already in the
231+
fast-path, using the item header.
232+
233+
The test is parameterized to also *not* send the necessary item header, in which case this currently
234+
asserts the chunk is let through. Once Relay's behaviour is changed to reject or profile chunks
235+
without the necessary headers or the profile type is defaulted this test needs to be adjusted accordingly.
236+
"""
237+
project_id = 42
238+
project_config = mini_sentry.add_full_project_config(project_id)["config"]
239+
240+
project_config.setdefault("features", []).append(
241+
"organizations:continuous-profiling"
242+
)
243+
244+
project_config["quotas"] = [
245+
{
246+
"id": f"test_rate_limiting_{uuid.uuid4().hex}",
247+
"categories": [category],
248+
"limit": 0,
249+
"reasonCode": "profile_chunks_exceeded",
250+
}
251+
]
252+
253+
upstream = relay(mini_sentry)
254+
255+
with open(
256+
RELAY_ROOT / "relay-profiling/tests/fixtures/sample/v2/valid.json",
257+
"rb",
258+
) as f:
259+
profile = f.read()
260+
261+
envelope = Envelope()
262+
envelope.add_item(
263+
Item(
264+
payload=PayloadRef(bytes=profile),
265+
type="profile_chunk",
266+
headers={"platform": platform},
267+
)
268+
)
269+
upstream.send_envelope(project_id, envelope)
270+
271+
if platform is None:
272+
envelope = mini_sentry.captured_events.get(timeout=1)
273+
assert [item.type for item in envelope.items] == ["profile_chunk"]
274+
else:
275+
outcome = mini_sentry.get_client_report()
276+
assert outcome["rate_limited_events"] == [
277+
{"category": category, "quantity": 1, "reason": "profile_chunks_exceeded"}
278+
]
279+
assert mini_sentry.captured_events.empty()

0 commit comments

Comments
 (0)