Skip to content

Commit 6b1daf1

Browse files
Debarati GhatakDebarati Ghatak
authored andcommitted
fix(core): add fix for stopping multiple event locking idempotent logs
1 parent 46e830a commit 6b1daf1

File tree

4 files changed

+100
-2
lines changed

4 files changed

+100
-2
lines changed

crates/diesel_models/src/query/events.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,20 @@ impl Event {
3232
.await
3333
}
3434

35+
pub async fn find_by_merchant_id_idempotent_event_id(
36+
conn: &PgPooledConn,
37+
merchant_id: &common_utils::id_type::MerchantId,
38+
idempotent_event_id: &str,
39+
) -> StorageResult<Self> {
40+
generics::generic_find_one::<<Self as HasTable>::Table, _, _>(
41+
conn,
42+
dsl::merchant_id
43+
.eq(merchant_id.to_owned())
44+
.and(dsl::idempotent_event_id.eq(idempotent_event_id.to_owned())),
45+
)
46+
.await
47+
}
48+
3549
pub async fn list_initial_attempts_by_merchant_id_primary_object_id(
3650
conn: &PgPooledConn,
3751
merchant_id: &common_utils::id_type::MerchantId,

crates/router/src/core/webhooks/outgoing.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,10 +157,10 @@ pub(crate) async fn create_event_and_trigger_outgoing_webhook(
157157

158158
if (state
159159
.store
160-
.find_event_by_merchant_id_event_id(
160+
.find_event_by_merchant_id_idempotent_event_id(
161161
key_manager_state,
162162
&merchant_id,
163-
&event_id,
163+
&idempotent_event_id,
164164
merchant_context.get_merchant_key_store(),
165165
)
166166
.await)

crates/router/src/db/events.rs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,14 @@ where
3838
merchant_key_store: &domain::MerchantKeyStore,
3939
) -> CustomResult<domain::Event, errors::StorageError>;
4040

41+
async fn find_event_by_merchant_id_idempotent_event_id(
42+
&self,
43+
state: &KeyManagerState,
44+
merchant_id: &common_utils::id_type::MerchantId,
45+
idempotent_event_id: &str,
46+
merchant_key_store: &domain::MerchantKeyStore,
47+
) -> CustomResult<domain::Event, errors::StorageError>;
48+
4149
async fn list_initial_events_by_merchant_id_primary_object_id(
4250
&self,
4351
state: &KeyManagerState,
@@ -157,6 +165,31 @@ impl EventInterface for Store {
157165
.change_context(errors::StorageError::DecryptionError)
158166
}
159167

168+
#[instrument(skip_all)]
169+
async fn find_event_by_merchant_id_idempotent_event_id(
170+
&self,
171+
state: &KeyManagerState,
172+
merchant_id: &common_utils::id_type::MerchantId,
173+
idempotent_event_id: &str,
174+
merchant_key_store: &domain::MerchantKeyStore,
175+
) -> CustomResult<domain::Event, errors::StorageError> {
176+
let conn = connection::pg_connection_read(self).await?;
177+
storage::Event::find_by_merchant_id_idempotent_event_id(
178+
&conn,
179+
merchant_id,
180+
idempotent_event_id,
181+
)
182+
.await
183+
.map_err(|error| report!(errors::StorageError::from(error)))?
184+
.convert(
185+
state,
186+
merchant_key_store.key.get_inner(),
187+
merchant_key_store.merchant_id.clone().into(),
188+
)
189+
.await
190+
.change_context(errors::StorageError::DecryptionError)
191+
}
192+
160193
#[instrument(skip_all)]
161194
async fn list_initial_events_by_merchant_id_primary_object_id(
162195
&self,
@@ -460,6 +493,40 @@ impl EventInterface for MockDb {
460493
)
461494
}
462495

496+
async fn find_event_by_merchant_id_idempotent_event_id(
497+
&self,
498+
state: &KeyManagerState,
499+
merchant_id: &common_utils::id_type::MerchantId,
500+
idempotent_event_id: &str,
501+
merchant_key_store: &domain::MerchantKeyStore,
502+
) -> CustomResult<domain::Event, errors::StorageError> {
503+
let locked_events = self.events.lock().await;
504+
locked_events
505+
.iter()
506+
.find(|event| {
507+
event.merchant_id == Some(merchant_id.to_owned()) && event.idempotent_event_id == Some(idempotent_event_id.to_string())
508+
})
509+
.cloned()
510+
.async_map(|event| async {
511+
event
512+
.convert(
513+
state,
514+
merchant_key_store.key.get_inner(),
515+
merchant_key_store.merchant_id.clone().into(),
516+
)
517+
.await
518+
.change_context(errors::StorageError::DecryptionError)
519+
})
520+
.await
521+
.transpose()?
522+
.ok_or(
523+
errors::StorageError::ValueNotFound(format!(
524+
"No event available with merchant_id = {merchant_id:?} and idempotent_event_id = {idempotent_event_id}"
525+
))
526+
.into(),
527+
)
528+
}
529+
463530
async fn list_initial_events_by_merchant_id_primary_object_id(
464531
&self,
465532
state: &KeyManagerState,

crates/router/src/db/kafka_store.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -749,6 +749,23 @@ impl EventInterface for KafkaStore {
749749
.await
750750
}
751751

752+
async fn find_event_by_merchant_id_idempotent_event_id(
753+
&self,
754+
state: &KeyManagerState,
755+
merchant_id: &id_type::MerchantId,
756+
idempotent_event_id: &str,
757+
merchant_key_store: &domain::MerchantKeyStore,
758+
) -> CustomResult<domain::Event, errors::StorageError> {
759+
self.diesel_store
760+
.find_event_by_merchant_id_idempotent_event_id(
761+
state,
762+
merchant_id,
763+
idempotent_event_id,
764+
merchant_key_store,
765+
)
766+
.await
767+
}
768+
752769
async fn list_initial_events_by_merchant_id_primary_object_id(
753770
&self,
754771
state: &KeyManagerState,

0 commit comments

Comments
 (0)