Skip to content

ref(profiling): Infer profile type from item header #4636

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions relay-profiling/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ pub enum ProfileError {
InvalidBase64Value,
#[error("invalid sampled profile")]
InvalidSampledProfile,
/// Error associated with an invalid [`ProfileType`](crate::ProfileType).
///
/// Error is currently emitted when the inferred profile type from the payload
/// does not match the profile type inferred from the envelope item headers.
#[error("profile type invalid or mismatched")]
InvalidProfileType,
#[error("cannot serialize payload")]
CannotSerializePayload,
#[error("not enough samples")]
Expand Down
32 changes: 22 additions & 10 deletions relay-profiling/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,31 @@ const MAX_PROFILE_CHUNK_DURATION: Duration = Duration::from_secs(66);
/// Same format as event IDs.
pub type ProfileId = EventId;

#[derive(Debug, Clone, Copy)]
/// Determines the type/use of a [`ProfileChunk`].
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ProfileType {
/// A backend profile.
Backend,
/// A UI profile.
Ui,
}

impl ProfileType {
/// Converts a platform to a [`ProfileType`].
///
/// The profile type is currently determined based on the contained profile
/// platform. It determines the data category this profile chunk belongs to.
///
/// This needs to be synchronized with the implementation in Sentry:
/// <https://github.com/getsentry/sentry/blob/ed2e1c8bcd0d633e6f828fcfbeefbbdd98ef3dba/src/sentry/profiles/task.py#L995>
pub fn from_platform(platform: &str) -> Self {
match platform {
"cocoa" | "android" | "javascript" => Self::Ui,
_ => Self::Backend,
}
}
}

#[derive(Debug, Deserialize)]
struct MinimalProfile {
#[serde(alias = "profile_id", alias = "chunk_id")]
Expand Down Expand Up @@ -307,16 +326,9 @@ impl ProfileChunk {

/// Returns the [`ProfileType`] this chunk belongs to.
///
/// The profile type is currently determined based on the contained profile
/// platform. It determines the data category this profile chunk belongs to.
///
/// This needs to be synchronized with the implementation in Sentry:
/// <https://github.com/getsentry/sentry/blob/ed2e1c8bcd0d633e6f828fcfbeefbbdd98ef3dba/src/sentry/profiles/task.py#L995>
/// This is currently determined from the platform via [`ProfileType::from_platform`].
pub fn profile_type(&self) -> ProfileType {
match self.profile.platform.as_str() {
"cocoa" | "android" | "javascript" => ProfileType::Ui,
_ => ProfileType::Backend,
}
ProfileType::from_platform(&self.profile.platform)
}

/// Applies inbound filters to the profile chunk.
Expand Down
1 change: 1 addition & 0 deletions relay-profiling/src/outcomes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub fn discard_reason(err: ProfileError) -> &'static str {
ProfileError::InvalidJson(_) => "profiling_invalid_json",
ProfileError::InvalidSampledProfile => "profiling_invalid_sampled_profile",
ProfileError::InvalidTransactionMetadata => "profiling_invalid_transaction_metadata",
ProfileError::InvalidProfileType => "profiling_invalid_profile_type",
ProfileError::MalformedSamples => "profiling_malformed_samples",
ProfileError::MalformedStacks => "profiling_malformed_stacks",
ProfileError::MissingProfileMetadata => "profiling_invalid_profile_metadata",
Expand Down
38 changes: 32 additions & 6 deletions relay-server/src/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,21 +504,32 @@ pub struct ItemHeaders {
///
/// Can be omitted if the item does not contain new lines. In this case, the item payload is
/// parsed until the first newline is encountered.
#[serde(default, skip_serializing_if = "Option::is_none")]
#[serde(skip_serializing_if = "Option::is_none")]
length: Option<u32>,

/// If this is an attachment item, this may contain the attachment type.
#[serde(default, skip_serializing_if = "Option::is_none")]
#[serde(skip_serializing_if = "Option::is_none")]
attachment_type: Option<AttachmentType>,

/// Content type of the payload.
#[serde(default, skip_serializing_if = "Option::is_none")]
#[serde(skip_serializing_if = "Option::is_none")]
content_type: Option<ContentType>,

/// If this is an attachment item, this may contain the original file name.
#[serde(default, skip_serializing_if = "Option::is_none")]
#[serde(skip_serializing_if = "Option::is_none")]
filename: Option<String>,

/// The platform this item was produced for.
///
/// Currently only used for [`ItemType::ProfileChunk`].
/// It contains the same platform as specified in the profile chunk payload,
/// hoisted into the header to be able to determine the correct data category.
///
/// This is currently considered optional for profile chunks, but may change
/// to required in the future.
#[serde(skip_serializing_if = "Option::is_none")]
platform: Option<String>,

/// The routing_hint may be used to specify how the envelpope should be routed in when
/// published to kafka.
///
Expand Down Expand Up @@ -681,6 +692,7 @@ impl Item {
fully_normalized: false,
ingest_span_in_eap: false,
profile_type: None,
platform: None,
},
payload: Bytes::new(),
}
Expand Down Expand Up @@ -733,7 +745,7 @@ impl Item {
ItemType::Span | ItemType::OtelSpan => smallvec![(DataCategory::Span, 1)],
// NOTE: semantically wrong, but too expensive to parse.
ItemType::OtelTracesData => smallvec![(DataCategory::Span, 1)],
ItemType::ProfileChunk => match self.headers.profile_type {
ItemType::ProfileChunk => match self.profile_type() {
Some(ProfileType::Backend) => smallvec![(DataCategory::ProfileChunk, 1)],
Some(ProfileType::Ui) => smallvec![(DataCategory::ProfileChunkUi, 1)],
None => smallvec![],
Expand Down Expand Up @@ -902,9 +914,23 @@ impl Item {
self.headers.ingest_span_in_eap = ingest_span_in_eap;
}

/// Returns the associated platform.
///
/// Note: this is currently only used for [`ItemType::ProfileChunk`].
pub fn platform(&self) -> Option<&str> {
self.headers.platform.as_deref()
}

/// Returns the associated profile type of a profile chunk.
///
/// This primarily uses the profile type set via [`Self::set_profile_type`],
/// but if not set, it infers the [`ProfileType`] from the [`Self::platform`].
///
/// Returns `None`, if neither source is available.
pub fn profile_type(&self) -> Option<ProfileType> {
self.headers.profile_type
self.headers
.profile_type
.or_else(|| self.platform().map(ProfileType::from_platform))
}

/// Set the profile type of the profile chunk.
Expand Down
25 changes: 23 additions & 2 deletions relay-server/src/services/processor/profile_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,29 @@ pub fn process(
Ok(chunk) => chunk,
Err(err) => return error_to_action(err),
};
// Important: set the profile type to get outcomes in the correct category.
item.set_profile_type(chunk.profile_type());

// Validate the item inferred profile type with the one from the payload,
// or if missing set it.
//
// This is currently necessary to ensure profile chunks are emitted in the correct
// data category, as well as rate limited with the correct data category.
//
// In the future we plan to make the profile type on the item header a necessity.
// For more context see also: <https://github.com/getsentry/relay/pull/4595>.
match item.profile_type() {
Some(profile_type) => {
// Validate the profile type inferred from the item header (either set before
// or from the platform) against the profile type from the parsed chunk itself.
if profile_type != chunk.profile_type() {
return error_to_action(relay_profiling::ProfileError::InvalidProfileType);
}
}
None => {
// Important: set the profile type to get outcomes in the correct category,
// if there isn't already one on the profile.
item.set_profile_type(chunk.profile_type());
}
}

if let Err(err) = chunk.filter(client_ip, filter_settings, global_config) {
return error_to_action(err);
Expand Down
152 changes: 97 additions & 55 deletions tests/integration/test_profile_chunks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,23 @@
RELAY_ROOT = Path(__file__).parent.parent.parent


TEST_CONFIG = {
"outcomes": {
"emit_outcomes": True,
"batch_size": 1,
"batch_interval": 1,
"aggregator": {
"bucket_interval": 1,
"flush_interval": 1,
},
},
"aggregator": {
"bucket_interval": 1,
"initial_delay": 0,
},
}


@pytest.mark.parametrize("num_intermediate_relays", [0, 1, 2])
def test_profile_chunk_outcomes(
mini_sentry,
Expand All @@ -35,29 +52,13 @@ def test_profile_chunk_outcomes(
project_config.setdefault("features", []).append(
"organizations:continuous-profiling"
)
config = {
"outcomes": {
"emit_outcomes": True,
"batch_size": 1,
"batch_interval": 1,
"aggregator": {
"bucket_interval": 1,
"flush_interval": 1,
},
"source": "processing-relay",
},
"aggregator": {
"bucket_interval": 1,
"initial_delay": 0,
},
}

# The innermost Relay needs to be in processing mode
upstream = relay_with_processing(config)
upstream = relay_with_processing(TEST_CONFIG)

# build a chain of relays
for i in range(num_intermediate_relays):
config = deepcopy(config)
config = deepcopy(TEST_CONFIG)
if i == 0:
# Emulate a PoP Relay
config["outcomes"]["source"] = "pop-relay"
Expand Down Expand Up @@ -103,24 +104,7 @@ def test_profile_chunk_outcomes_invalid(
"organizations:continuous-profiling"
)

config = {
"outcomes": {
"emit_outcomes": True,
"batch_size": 1,
"batch_interval": 1,
"aggregator": {
"bucket_interval": 1,
"flush_interval": 1,
},
"source": "pop-relay",
},
"aggregator": {
"bucket_interval": 1,
"initial_delay": 0,
},
}

upstream = relay_with_processing(config)
upstream = relay_with_processing(TEST_CONFIG)

envelope = Envelope()
payload = {
Expand All @@ -144,18 +128,19 @@ def test_profile_chunk_outcomes_invalid(
"project_id": 42,
"quantity": 1,
"reason": "profiling_platform_not_supported",
"source": "pop-relay",
},
]

profiles_consumer.assert_empty()


@pytest.mark.parametrize("item_header_platform", [None, "cocoa"])
def test_profile_chunk_outcomes_rate_limited(
mini_sentry,
relay_with_processing,
outcomes_consumer,
profiles_consumer,
item_header_platform,
):
"""
Tests that Relay reports correct outcomes when profile chunks are rate limited.
Expand Down Expand Up @@ -186,23 +171,7 @@ def test_profile_chunk_outcomes_rate_limited(
}
]

config = {
"outcomes": {
"emit_outcomes": True,
"batch_size": 1,
"batch_interval": 1,
"aggregator": {
"bucket_interval": 1,
"flush_interval": 0,
},
},
"aggregator": {
"bucket_interval": 1,
"initial_delay": 0,
},
}

upstream = relay_with_processing(config)
upstream = relay_with_processing(TEST_CONFIG)

# Load a valid profile chunk from test fixtures
with open(
Expand All @@ -213,7 +182,13 @@ def test_profile_chunk_outcomes_rate_limited(

# Create and send envelope containing the profile chunk
envelope = Envelope()
envelope.add_item(Item(payload=PayloadRef(bytes=profile), type="profile_chunk"))
envelope.add_item(
Item(
payload=PayloadRef(bytes=profile),
type="profile_chunk",
headers={"platform": item_header_platform},
)
)
upstream.send_envelope(project_id, envelope)

# Verify the rate limited outcome was emitted with correct properties
Expand All @@ -235,3 +210,70 @@ def test_profile_chunk_outcomes_rate_limited(

# Verify no profiles were forwarded to the consumer
profiles_consumer.assert_empty()


@pytest.mark.parametrize(
"platform, category",
[
("cocoa", "profile_chunk_ui"),
("node", "profile_chunk"),
(None, "profile_chunk"), # Special case, currently this will forward
],
)
def test_profile_chunk_outcomes_rate_limited_fast(
mini_sentry,
relay,
platform,
category,
):
"""
Tests that Relay reports correct outcomes when profile chunks are rate limited already in the
fast-path, using the item header.

The test is parameterized to also *not* send the necessary item header, in which case this currently
asserts the chunk is let through. Once Relay's behaviour is changed to reject or profile chunks
without the necessary headers or the profile type is defaulted this test needs to be adjusted accordingly.
"""
project_id = 42
project_config = mini_sentry.add_full_project_config(project_id)["config"]

project_config.setdefault("features", []).append(
"organizations:continuous-profiling"
)

project_config["quotas"] = [
{
"id": f"test_rate_limiting_{uuid.uuid4().hex}",
"categories": [category],
"limit": 0,
"reasonCode": "profile_chunks_exceeded",
}
]

upstream = relay(mini_sentry)

with open(
RELAY_ROOT / "relay-profiling/tests/fixtures/sample/v2/valid.json",
"rb",
) as f:
profile = f.read()

envelope = Envelope()
envelope.add_item(
Item(
payload=PayloadRef(bytes=profile),
type="profile_chunk",
headers={"platform": platform},
)
)
upstream.send_envelope(project_id, envelope)

if platform is None:
envelope = mini_sentry.captured_events.get(timeout=1)
assert [item.type for item in envelope.items] == ["profile_chunk"]
else:
outcome = mini_sentry.get_client_report()
assert outcome["rate_limited_events"] == [
{"category": category, "quantity": 1, "reason": "profile_chunks_exceeded"}
]
assert mini_sentry.captured_events.empty()