@@ -12,7 +12,7 @@ use futures::prelude::*;
12
12
use parking_lot:: RwLock ;
13
13
use serde_json:: Value as SerdeValue ;
14
14
15
- use relay_common:: { clone, metric, LogError } ;
15
+ use relay_common:: { clone, metric, LogError , ProjectId } ;
16
16
use relay_config:: { Config , HttpEncoding , RelayMode } ;
17
17
use relay_general:: pii:: { PiiAttachmentsProcessor , PiiProcessor } ;
18
18
use relay_general:: processor:: { process_value, ProcessingState } ;
@@ -92,6 +92,9 @@ enum ProcessingError {
92
92
#[ fail( display = "failed to resolve project information" ) ]
93
93
ProjectFailed ( #[ cause] ProjectError ) ,
94
94
95
+ #[ fail( display = "missing project id in DSN" ) ]
96
+ MissingProjectId ,
97
+
95
98
#[ fail( display = "invalid security report type" ) ]
96
99
InvalidSecurityType ,
97
100
@@ -154,7 +157,8 @@ impl ProcessingError {
154
157
| Self :: ScheduleFailed ( _)
155
158
| Self :: ProjectFailed ( _)
156
159
| Self :: Timeout
157
- | Self :: ProcessingFailed ( _) => Some ( Outcome :: Invalid ( DiscardReason :: Internal ) ) ,
160
+ | Self :: ProcessingFailed ( _)
161
+ | Self :: MissingProjectId => Some ( Outcome :: Invalid ( DiscardReason :: Internal ) ) ,
158
162
#[ cfg( feature = "processing" ) ]
159
163
Self :: StoreFailed ( _) | Self :: QuotasFailed ( _) => {
160
164
Some ( Outcome :: Invalid ( DiscardReason :: Internal ) )
@@ -201,6 +205,13 @@ struct ProcessEnvelopeState {
201
205
/// The state of the project that this envelope belongs to.
202
206
project_state : Arc < ProjectState > ,
203
207
208
+ /// The id of the project that this envelope is ingested into.
209
+ ///
210
+ /// This identifier can differ from the one stated in the Envelope's DSN if the key was moved to
211
+ /// a new project or on the legacy endpoint. In that case, normalization will update the project
212
+ /// ID.
213
+ project_id : ProjectId ,
214
+
204
215
/// UTC date time converted from the `start_time` instant.
205
216
received_at : DateTime < Utc > ,
206
217
}
@@ -283,8 +294,8 @@ impl EventProcessor {
283
294
fn process_sessions ( & self , state : & mut ProcessEnvelopeState ) -> Result < ( ) , ProcessingError > {
284
295
let envelope = & mut state. envelope ;
285
296
let received = state. received_at ;
297
+ let project_id = state. project_id ;
286
298
287
- let project_id = envelope. meta ( ) . project_id ( ) . value ( ) ;
288
299
let clock_drift_processor =
289
300
ClockDriftProcessor :: new ( envelope. sent_at ( ) , received) . at_least ( MINIMUM_CLOCK_DRIFT ) ;
290
301
let client = envelope. meta ( ) . client ( ) . map ( str:: to_owned) ;
@@ -383,12 +394,30 @@ impl EventProcessor {
383
394
envelope. set_retention ( retention) ;
384
395
}
385
396
397
+ // Prefer the project's project ID, and fall back to the stated project id from the
398
+ // envelope. The project ID is available in all modes, other than in proxy mode, where
399
+ // events for unknown projects are forwarded blindly.
400
+ //
401
+ // Neither ID can be available in proxy mode on the /store/ endpoint. This is not supported,
402
+ // since we cannot process an event without project ID, so drop it.
403
+ let project_id = project_state
404
+ . project_id
405
+ . or_else ( || envelope. meta ( ) . project_id ( ) )
406
+ . ok_or ( ProcessingError :: MissingProjectId ) ?;
407
+
408
+ // Ensure the project ID is updated to the stored instance for this project cache. This can
409
+ // differ in two cases:
410
+ // 1. The envelope was sent to the legacy `/store/` endpoint without a project ID.
411
+ // 2. The DSN was moved and the envelope sent to the old project ID.
412
+ envelope. meta_mut ( ) . set_project_id ( project_id) ;
413
+
386
414
Ok ( ProcessEnvelopeState {
387
415
envelope,
388
416
event : Annotated :: empty ( ) ,
389
417
metrics : Metrics :: default ( ) ,
390
418
rate_limits : RateLimits :: new ( ) ,
391
419
project_state,
420
+ project_id,
392
421
received_at : relay_common:: instant_to_date_time ( start_time) ,
393
422
} )
394
423
}
@@ -885,7 +914,7 @@ impl EventProcessor {
885
914
}
886
915
887
916
let store_config = StoreConfig {
888
- project_id : Some ( envelope . meta ( ) . project_id ( ) . value ( ) ) ,
917
+ project_id : Some ( state . project_id . value ( ) ) ,
889
918
client_ip : envelope. meta ( ) . client_addr ( ) . map ( IpAddr :: from) ,
890
919
client : envelope. meta ( ) . client ( ) . map ( str:: to_owned) ,
891
920
key_id,
@@ -1361,7 +1390,6 @@ impl Handler<HandleEnvelope> for EventManager {
1361
1390
} = message;
1362
1391
1363
1392
let event_id = envelope. event_id ( ) ;
1364
- let project_id = envelope. meta ( ) . project_id ( ) ;
1365
1393
let remote_addr = envelope. meta ( ) . client_addr ( ) ;
1366
1394
1367
1395
// Compute whether this envelope contains an event. This is used in error handling to
@@ -1371,13 +1399,15 @@ impl Handler<HandleEnvelope> for EventManager {
1371
1399
1372
1400
let scoping = Rc :: new ( RefCell :: new ( envelope. meta ( ) . get_partial_scoping ( ) ) ) ;
1373
1401
1374
- metric ! ( set( RelaySets :: UniqueProjects ) = project_id. value( ) as i64 ) ;
1375
-
1376
1402
let future = project
1377
1403
. send ( CheckEnvelope :: fetched ( envelope) )
1378
1404
. map_err ( ProcessingError :: ScheduleFailed )
1379
1405
. and_then ( |result| result. map_err ( ProcessingError :: ProjectFailed ) )
1380
1406
. and_then ( clone ! ( scoping, |response| {
1407
+ // Use the project id from the loaded project state to account for redirects.
1408
+ let project_id = response. scoping. project_id. value( ) ;
1409
+ metric!( set( RelaySets :: UniqueProjects ) = project_id as i64 ) ;
1410
+
1381
1411
scoping. replace( response. scoping) ;
1382
1412
1383
1413
let checked = response. result. map_err( ProcessingError :: EventRejected ) ?;
@@ -1452,6 +1482,7 @@ impl Handler<HandleEnvelope> for EventManager {
1452
1482
}
1453
1483
1454
1484
log:: trace!( "sending event to sentry endpoint" ) ;
1485
+ let project_id = scoping. borrow( ) . project_id;
1455
1486
let request = SendRequest :: post( format!( "/api/{}/envelope/" , project_id) ) . build(
1456
1487
move |builder| {
1457
1488
// Override the `sent_at` timestamp. Since the event went through basic
0 commit comments