diff --git a/relay-profiling/src/error.rs b/relay-profiling/src/error.rs index 4930221d64f..7bc716195b1 100644 --- a/relay-profiling/src/error.rs +++ b/relay-profiling/src/error.rs @@ -10,6 +10,12 @@ pub enum ProfileError { InvalidBase64Value, #[error("invalid sampled profile")] InvalidSampledProfile, + /// Error associated with an invalid [`ProfileType`](crate::ProfileType). + /// + /// Error is currently emitted when the inferred profile type from the payload + /// does not match the profile type inferred from the envelope item headers. + #[error("profile type invalid or mismatched")] + InvalidProfileType, #[error("cannot serialize payload")] CannotSerializePayload, #[error("not enough samples")] diff --git a/relay-profiling/src/lib.rs b/relay-profiling/src/lib.rs index 53854009a0f..323d5c930d0 100644 --- a/relay-profiling/src/lib.rs +++ b/relay-profiling/src/lib.rs @@ -82,12 +82,31 @@ const MAX_PROFILE_CHUNK_DURATION: Duration = Duration::from_secs(66); /// Same format as event IDs. pub type ProfileId = EventId; -#[derive(Debug, Clone, Copy)] +/// Determines the type/use of a [`ProfileChunk`]. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum ProfileType { + /// A backend profile. Backend, + /// A UI profile. Ui, } +impl ProfileType { + /// Converts a platform to a [`ProfileType`]. + /// + /// The profile type is currently determined based on the contained profile + /// platform. It determines the data category this profile chunk belongs to. + /// + /// This needs to be synchronized with the implementation in Sentry: + /// + pub fn from_platform(platform: &str) -> Self { + match platform { + "cocoa" | "android" | "javascript" => Self::Ui, + _ => Self::Backend, + } + } +} + #[derive(Debug, Deserialize)] struct MinimalProfile { #[serde(alias = "profile_id", alias = "chunk_id")] @@ -307,16 +326,9 @@ impl ProfileChunk { /// Returns the [`ProfileType`] this chunk belongs to. /// - /// The profile type is currently determined based on the contained profile - /// platform. It determines the data category this profile chunk belongs to. - /// - /// This needs to be synchronized with the implementation in Sentry: - /// + /// This is currently determined from the platform via [`ProfileType::from_platform`]. pub fn profile_type(&self) -> ProfileType { - match self.profile.platform.as_str() { - "cocoa" | "android" | "javascript" => ProfileType::Ui, - _ => ProfileType::Backend, - } + ProfileType::from_platform(&self.profile.platform) } /// Applies inbound filters to the profile chunk. diff --git a/relay-profiling/src/outcomes.rs b/relay-profiling/src/outcomes.rs index 400795a436d..4ea58ff52bd 100644 --- a/relay-profiling/src/outcomes.rs +++ b/relay-profiling/src/outcomes.rs @@ -8,6 +8,7 @@ pub fn discard_reason(err: ProfileError) -> &'static str { ProfileError::InvalidJson(_) => "profiling_invalid_json", ProfileError::InvalidSampledProfile => "profiling_invalid_sampled_profile", ProfileError::InvalidTransactionMetadata => "profiling_invalid_transaction_metadata", + ProfileError::InvalidProfileType => "profiling_invalid_profile_type", ProfileError::MalformedSamples => "profiling_malformed_samples", ProfileError::MalformedStacks => "profiling_malformed_stacks", ProfileError::MissingProfileMetadata => "profiling_invalid_profile_metadata", diff --git a/relay-server/src/envelope.rs b/relay-server/src/envelope.rs index d87e2c7bc06..51c4fbd6ce0 100644 --- a/relay-server/src/envelope.rs +++ b/relay-server/src/envelope.rs @@ -504,21 +504,32 @@ pub struct ItemHeaders { /// /// Can be omitted if the item does not contain new lines. In this case, the item payload is /// parsed until the first newline is encountered. - #[serde(default, skip_serializing_if = "Option::is_none")] + #[serde(skip_serializing_if = "Option::is_none")] length: Option, /// If this is an attachment item, this may contain the attachment type. - #[serde(default, skip_serializing_if = "Option::is_none")] + #[serde(skip_serializing_if = "Option::is_none")] attachment_type: Option, /// Content type of the payload. - #[serde(default, skip_serializing_if = "Option::is_none")] + #[serde(skip_serializing_if = "Option::is_none")] content_type: Option, /// If this is an attachment item, this may contain the original file name. - #[serde(default, skip_serializing_if = "Option::is_none")] + #[serde(skip_serializing_if = "Option::is_none")] filename: Option, + /// The platform this item was produced for. + /// + /// Currently only used for [`ItemType::ProfileChunk`]. + /// It contains the same platform as specified in the profile chunk payload, + /// hoisted into the header to be able to determine the correct data category. + /// + /// This is currently considered optional for profile chunks, but may change + /// to required in the future. + #[serde(skip_serializing_if = "Option::is_none")] + platform: Option, + /// The routing_hint may be used to specify how the envelpope should be routed in when /// published to kafka. /// @@ -681,6 +692,7 @@ impl Item { fully_normalized: false, ingest_span_in_eap: false, profile_type: None, + platform: None, }, payload: Bytes::new(), } @@ -733,7 +745,7 @@ impl Item { ItemType::Span | ItemType::OtelSpan => smallvec![(DataCategory::Span, 1)], // NOTE: semantically wrong, but too expensive to parse. ItemType::OtelTracesData => smallvec![(DataCategory::Span, 1)], - ItemType::ProfileChunk => match self.headers.profile_type { + ItemType::ProfileChunk => match self.profile_type() { Some(ProfileType::Backend) => smallvec![(DataCategory::ProfileChunk, 1)], Some(ProfileType::Ui) => smallvec![(DataCategory::ProfileChunkUi, 1)], None => smallvec![], @@ -902,9 +914,23 @@ impl Item { self.headers.ingest_span_in_eap = ingest_span_in_eap; } + /// Returns the associated platform. + /// + /// Note: this is currently only used for [`ItemType::ProfileChunk`]. + pub fn platform(&self) -> Option<&str> { + self.headers.platform.as_deref() + } + /// Returns the associated profile type of a profile chunk. + /// + /// This primarily uses the profile type set via [`Self::set_profile_type`], + /// but if not set, it infers the [`ProfileType`] from the [`Self::platform`]. + /// + /// Returns `None`, if neither source is available. pub fn profile_type(&self) -> Option { - self.headers.profile_type + self.headers + .profile_type + .or_else(|| self.platform().map(ProfileType::from_platform)) } /// Set the profile type of the profile chunk. diff --git a/relay-server/src/services/processor/profile_chunk.rs b/relay-server/src/services/processor/profile_chunk.rs index c03d7ef1347..9c6411f091e 100644 --- a/relay-server/src/services/processor/profile_chunk.rs +++ b/relay-server/src/services/processor/profile_chunk.rs @@ -59,8 +59,29 @@ pub fn process( Ok(chunk) => chunk, Err(err) => return error_to_action(err), }; - // Important: set the profile type to get outcomes in the correct category. - item.set_profile_type(chunk.profile_type()); + + // Validate the item inferred profile type with the one from the payload, + // or if missing set it. + // + // This is currently necessary to ensure profile chunks are emitted in the correct + // data category, as well as rate limited with the correct data category. + // + // In the future we plan to make the profile type on the item header a necessity. + // For more context see also: . + match item.profile_type() { + Some(profile_type) => { + // Validate the profile type inferred from the item header (either set before + // or from the platform) against the profile type from the parsed chunk itself. + if profile_type != chunk.profile_type() { + return error_to_action(relay_profiling::ProfileError::InvalidProfileType); + } + } + None => { + // Important: set the profile type to get outcomes in the correct category, + // if there isn't already one on the profile. + item.set_profile_type(chunk.profile_type()); + } + } if let Err(err) = chunk.filter(client_ip, filter_settings, global_config) { return error_to_action(err); diff --git a/tests/integration/test_profile_chunks.py b/tests/integration/test_profile_chunks.py index 8102643d9c5..9d5aad36e3a 100644 --- a/tests/integration/test_profile_chunks.py +++ b/tests/integration/test_profile_chunks.py @@ -10,6 +10,23 @@ RELAY_ROOT = Path(__file__).parent.parent.parent +TEST_CONFIG = { + "outcomes": { + "emit_outcomes": True, + "batch_size": 1, + "batch_interval": 1, + "aggregator": { + "bucket_interval": 1, + "flush_interval": 1, + }, + }, + "aggregator": { + "bucket_interval": 1, + "initial_delay": 0, + }, +} + + @pytest.mark.parametrize("num_intermediate_relays", [0, 1, 2]) def test_profile_chunk_outcomes( mini_sentry, @@ -35,29 +52,13 @@ def test_profile_chunk_outcomes( project_config.setdefault("features", []).append( "organizations:continuous-profiling" ) - config = { - "outcomes": { - "emit_outcomes": True, - "batch_size": 1, - "batch_interval": 1, - "aggregator": { - "bucket_interval": 1, - "flush_interval": 1, - }, - "source": "processing-relay", - }, - "aggregator": { - "bucket_interval": 1, - "initial_delay": 0, - }, - } # The innermost Relay needs to be in processing mode - upstream = relay_with_processing(config) + upstream = relay_with_processing(TEST_CONFIG) # build a chain of relays for i in range(num_intermediate_relays): - config = deepcopy(config) + config = deepcopy(TEST_CONFIG) if i == 0: # Emulate a PoP Relay config["outcomes"]["source"] = "pop-relay" @@ -103,24 +104,7 @@ def test_profile_chunk_outcomes_invalid( "organizations:continuous-profiling" ) - config = { - "outcomes": { - "emit_outcomes": True, - "batch_size": 1, - "batch_interval": 1, - "aggregator": { - "bucket_interval": 1, - "flush_interval": 1, - }, - "source": "pop-relay", - }, - "aggregator": { - "bucket_interval": 1, - "initial_delay": 0, - }, - } - - upstream = relay_with_processing(config) + upstream = relay_with_processing(TEST_CONFIG) envelope = Envelope() payload = { @@ -144,18 +128,19 @@ def test_profile_chunk_outcomes_invalid( "project_id": 42, "quantity": 1, "reason": "profiling_platform_not_supported", - "source": "pop-relay", }, ] profiles_consumer.assert_empty() +@pytest.mark.parametrize("item_header_platform", [None, "cocoa"]) def test_profile_chunk_outcomes_rate_limited( mini_sentry, relay_with_processing, outcomes_consumer, profiles_consumer, + item_header_platform, ): """ Tests that Relay reports correct outcomes when profile chunks are rate limited. @@ -186,23 +171,7 @@ def test_profile_chunk_outcomes_rate_limited( } ] - config = { - "outcomes": { - "emit_outcomes": True, - "batch_size": 1, - "batch_interval": 1, - "aggregator": { - "bucket_interval": 1, - "flush_interval": 0, - }, - }, - "aggregator": { - "bucket_interval": 1, - "initial_delay": 0, - }, - } - - upstream = relay_with_processing(config) + upstream = relay_with_processing(TEST_CONFIG) # Load a valid profile chunk from test fixtures with open( @@ -213,7 +182,13 @@ def test_profile_chunk_outcomes_rate_limited( # Create and send envelope containing the profile chunk envelope = Envelope() - envelope.add_item(Item(payload=PayloadRef(bytes=profile), type="profile_chunk")) + envelope.add_item( + Item( + payload=PayloadRef(bytes=profile), + type="profile_chunk", + headers={"platform": item_header_platform}, + ) + ) upstream.send_envelope(project_id, envelope) # Verify the rate limited outcome was emitted with correct properties @@ -235,3 +210,70 @@ def test_profile_chunk_outcomes_rate_limited( # Verify no profiles were forwarded to the consumer profiles_consumer.assert_empty() + + +@pytest.mark.parametrize( + "platform, category", + [ + ("cocoa", "profile_chunk_ui"), + ("node", "profile_chunk"), + (None, "profile_chunk"), # Special case, currently this will forward + ], +) +def test_profile_chunk_outcomes_rate_limited_fast( + mini_sentry, + relay, + platform, + category, +): + """ + Tests that Relay reports correct outcomes when profile chunks are rate limited already in the + fast-path, using the item header. + + The test is parameterized to also *not* send the necessary item header, in which case this currently + asserts the chunk is let through. Once Relay's behaviour is changed to reject or profile chunks + without the necessary headers or the profile type is defaulted this test needs to be adjusted accordingly. + """ + project_id = 42 + project_config = mini_sentry.add_full_project_config(project_id)["config"] + + project_config.setdefault("features", []).append( + "organizations:continuous-profiling" + ) + + project_config["quotas"] = [ + { + "id": f"test_rate_limiting_{uuid.uuid4().hex}", + "categories": [category], + "limit": 0, + "reasonCode": "profile_chunks_exceeded", + } + ] + + upstream = relay(mini_sentry) + + with open( + RELAY_ROOT / "relay-profiling/tests/fixtures/sample/v2/valid.json", + "rb", + ) as f: + profile = f.read() + + envelope = Envelope() + envelope.add_item( + Item( + payload=PayloadRef(bytes=profile), + type="profile_chunk", + headers={"platform": platform}, + ) + ) + upstream.send_envelope(project_id, envelope) + + if platform is None: + envelope = mini_sentry.captured_events.get(timeout=1) + assert [item.type for item in envelope.items] == ["profile_chunk"] + else: + outcome = mini_sentry.get_client_report() + assert outcome["rate_limited_events"] == [ + {"category": category, "quantity": 1, "reason": "profile_chunks_exceeded"} + ] + assert mini_sentry.captured_events.empty()