Skip to content

Commit 021d81e

Browse files
jrevillardclaude
andcommitted
fix: address PR nearai#403 code review feedback
- Remove .serena/ directory (committed by mistake) - Remove is_webhook_message_processed (TOCTOU race condition) - Use atomic INSERT-first pattern in record_webhook_message_processed - Return bool to indicate new vs duplicate message - Fix pending ACK leak on timeout (cleanup before removal) - Fix metadata parsing efficiency (from_value vs to_string+from_str) - Consolidate WhatsAppMetadata structs (Option<String> for all fields) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 6018dd1 commit 021d81e

8 files changed

Lines changed: 82 additions & 148 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,4 @@ bench-results/
1919
# WASM build artifacts (loaded from disk, not bundled)
2020
*.wasm
2121

22+
.serena/

src/agent/thread_ops.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -279,15 +279,20 @@ impl Agent {
279279
// For WhatsApp, metadata contains: phone_number_id, sender_phone, message_id, timestamp
280280
#[derive(serde::Deserialize)]
281281
struct WhatsAppMetadata {
282-
message_id: String,
282+
message_id: Option<String>,
283283
}
284284

285285
let ack_key = if let Ok(meta) =
286-
serde_json::from_str::<WhatsAppMetadata>(&message.metadata.to_string())
286+
serde_json::from_value::<WhatsAppMetadata>(message.metadata.clone())
287287
{
288-
format!("{}:{}", message.channel, meta.message_id)
288+
if let Some(msg_id) = meta.message_id {
289+
format!("{}:{}", message.channel, msg_id)
290+
} else {
291+
// Fallback to user_id if no message_id in metadata
292+
format!("{}:{}", message.channel, message.user_id)
293+
}
289294
} else {
290-
// Fallback to user_id if no message_id in metadata
295+
// Fallback to user_id if metadata parsing fails
291296
format!("{}:{}", message.channel, message.user_id)
292297
};
293298

src/channels/wasm/router.rs

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -819,41 +819,33 @@ async fn webhook_handler(
819819
(format!("{}:{}", channel_name, msg.user_id), None)
820820
};
821821

822-
// Check for duplicate messages if database is available
823-
// Record immediately after check to prevent race conditions
822+
// Atomic deduplication: try to INSERT first
823+
// Returns true if inserted (new message), false if duplicate
824+
// This eliminates TOCTOU race condition between SELECT and INSERT
824825
if let Some(ref db) = state.db
825826
&& let Some(msg_id) = &external_msg_id
826827
{
827-
match db.is_webhook_message_processed(channel_name, msg_id).await {
828-
Ok(is_processed) => {
829-
if is_processed {
828+
match db
829+
.record_webhook_message_processed(channel_name, msg_id)
830+
.await
831+
{
832+
Ok(was_inserted) => {
833+
if !was_inserted {
830834
tracing::info!(
831835
channel = %channel_name,
832836
message_id = %msg_id,
833837
"Duplicate webhook message detected, skipping"
834838
);
835839
continue; // Skip this duplicate message
836840
}
837-
// Not a duplicate - record immediately to prevent races
838-
// This ensures concurrent webhooks for the same message will be caught
839-
if let Err(e) = db
840-
.record_webhook_message_processed(channel_name, msg_id)
841-
.await
842-
{
843-
tracing::warn!(
844-
channel = %channel_name,
845-
message_id = %msg_id,
846-
error = %e,
847-
"Failed to record message as processed, dedup may not work on retry"
848-
);
849-
}
841+
// New message - will process below
850842
}
851843
Err(e) => {
852844
tracing::warn!(
853845
channel = %channel_name,
854846
message_id = %msg_id,
855847
error = %e,
856-
"Failed to check dedup, proceeding anyway"
848+
"Failed to record message, proceeding anyway (fail open)"
857849
);
858850
// Continue processing on error (fail open)
859851
}
@@ -894,6 +886,12 @@ async fn webhook_handler(
894886
// Using join_all ensures we don't accumulate timeouts (3 messages ≠ 30s wait)
895887
let ack_timeout = state.webhook_ack_timeout;
896888

889+
// Keep track of ack_keys for cleanup on timeout
890+
let ack_keys: Vec<String> = ack_receivers
891+
.iter()
892+
.map(|(key, _, _)| key.clone())
893+
.collect();
894+
897895
let ack_futures: Vec<_> = ack_receivers
898896
.into_iter()
899897
.map(|(ack_key, ack_rx, _)| async move {
@@ -921,6 +919,10 @@ async fn webhook_handler(
921919
timeout_secs = ack_timeout.as_secs(),
922920
"Webhook ACK wait timed out, returning 500 to trigger retry"
923921
);
922+
// Clean up pending ACKs that were registered but never signaled
923+
for ack_key in &ack_keys {
924+
state.router.pending_acks.write().await.remove(ack_key);
925+
}
924926
// Return 500 so WhatsApp retries - deduplication will handle duplicates
925927
return (
926928
StatusCode::INTERNAL_SERVER_ERROR,

src/channels/wasm/wrapper.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1276,11 +1276,9 @@ impl WasmChannel {
12761276
let workspace_store = self.workspace_store.clone();
12771277

12781278
// Pre-resolve host credentials for automatic injection (before spawn_blocking)
1279-
let host_credentials = resolve_channel_host_credentials(
1280-
&self.capabilities,
1281-
self.secrets_store.as_deref(),
1282-
)
1283-
.await;
1279+
let host_credentials =
1280+
resolve_channel_host_credentials(&self.capabilities, self.secrets_store.as_deref())
1281+
.await;
12841282

12851283
// Prepare request data
12861284
let method = method.to_string();

src/db/libsql/mod.rs

Lines changed: 32 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -464,46 +464,47 @@ mod tests {
464464
use crate::db::WebhookDedupStore;
465465

466466
#[tokio::test]
467-
async fn test_webhook_dedup_not_processed_initially() {
467+
async fn test_webhook_dedup_first_record_returns_true() {
468468
// Use a temp file so connections share state (in-memory DBs are connection-local)
469469
let dir = tempfile::tempdir().unwrap();
470-
let db_path = dir.path().join("test_dedup_initial.db");
470+
let db_path = dir.path().join("test_dedup_first.db");
471471
let backend = LibSqlBackend::new_local(&db_path).await.unwrap();
472472
backend.run_migrations().await.unwrap();
473473

474-
// Message should not be marked as processed initially
475-
let is_processed = backend
476-
.is_webhook_message_processed("whatsapp", "wamid.test123")
474+
// First record should return true (new message inserted)
475+
let was_inserted = backend
476+
.record_webhook_message_processed("whatsapp", "wamid.test123")
477477
.await
478478
.unwrap();
479479
assert!(
480-
!is_processed,
481-
"New message should not be marked as processed"
480+
was_inserted,
481+
"First record should return true (new message)"
482482
);
483483
}
484484

485485
#[tokio::test]
486-
async fn test_webhook_dedup_record_and_check() {
486+
async fn test_webhook_dedup_duplicate_returns_false() {
487487
// Use a temp file so connections share state (in-memory DBs are connection-local)
488488
let dir = tempfile::tempdir().unwrap();
489-
let db_path = dir.path().join("test_dedup_record.db");
489+
let db_path = dir.path().join("test_dedup_duplicate.db");
490490
let backend = LibSqlBackend::new_local(&db_path).await.unwrap();
491491
backend.run_migrations().await.unwrap();
492492

493-
// Record message as processed
494-
backend
495-
.record_webhook_message_processed("whatsapp", "wamid.test456")
493+
// First record returns true
494+
let was_inserted = backend
495+
.record_webhook_message_processed("whatsapp", "wamid.duplicate")
496496
.await
497497
.unwrap();
498+
assert!(was_inserted, "First record should return true");
498499

499-
// Now it should be marked as processed
500-
let is_processed = backend
501-
.is_webhook_message_processed("whatsapp", "wamid.test456")
500+
// Second record (duplicate) returns false
501+
let was_inserted = backend
502+
.record_webhook_message_processed("whatsapp", "wamid.duplicate")
502503
.await
503504
.unwrap();
504505
assert!(
505-
is_processed,
506-
"Recorded message should be marked as processed"
506+
!was_inserted,
507+
"Duplicate record should return false (already processed)"
507508
);
508509
}
509510

@@ -515,53 +516,31 @@ mod tests {
515516
let backend = LibSqlBackend::new_local(&db_path).await.unwrap();
516517
backend.run_migrations().await.unwrap();
517518

518-
// Record message for whatsapp
519-
backend
519+
// Record message for whatsapp - should return true
520+
let was_inserted = backend
520521
.record_webhook_message_processed("whatsapp", "msg789")
521522
.await
522523
.unwrap();
524+
assert!(was_inserted, "First channel record should return true");
523525

524-
// Same message_id on different channel should NOT be processed
525-
let is_processed = backend
526-
.is_webhook_message_processed("telegram", "msg789")
526+
// Same message_id on different channel should also return true (independent)
527+
let was_inserted = backend
528+
.record_webhook_message_processed("telegram", "msg789")
527529
.await
528530
.unwrap();
529531
assert!(
530-
!is_processed,
532+
was_inserted,
531533
"Different channel should have independent dedup state"
532534
);
533535

534-
// Original channel should be processed
535-
let is_processed = backend
536-
.is_webhook_message_processed("whatsapp", "msg789")
537-
.await
538-
.unwrap();
539-
assert!(is_processed, "Original channel should be processed");
540-
}
541-
542-
#[tokio::test]
543-
async fn test_webhook_dedup_duplicate_record_is_idempotent() {
544-
// Use a temp file so connections share state (in-memory DBs are connection-local)
545-
let dir = tempfile::tempdir().unwrap();
546-
let db_path = dir.path().join("test_dedup_idempotent.db");
547-
let backend = LibSqlBackend::new_local(&db_path).await.unwrap();
548-
backend.run_migrations().await.unwrap();
549-
550-
// Record same message twice (simulating webhook retry)
551-
backend
552-
.record_webhook_message_processed("whatsapp", "wamid.duplicate")
553-
.await
554-
.unwrap();
555-
backend
556-
.record_webhook_message_processed("whatsapp", "wamid.duplicate")
557-
.await
558-
.unwrap();
559-
560-
// Should still be marked as processed (no error, no duplicate row)
561-
let is_processed = backend
562-
.is_webhook_message_processed("whatsapp", "wamid.duplicate")
536+
// Duplicate on original channel should return false
537+
let was_inserted = backend
538+
.record_webhook_message_processed("whatsapp", "msg789")
563539
.await
564540
.unwrap();
565-
assert!(is_processed);
541+
assert!(
542+
!was_inserted,
543+
"Duplicate on original channel should return false"
544+
);
566545
}
567546
}

src/db/libsql/webhook_dedup.rs

Lines changed: 9 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -10,45 +10,23 @@ use crate::error::DatabaseError;
1010

1111
#[async_trait]
1212
impl WebhookDedupStore for LibSqlBackend {
13-
async fn is_webhook_message_processed(
13+
async fn record_webhook_message_processed(
1414
&self,
1515
channel: &str,
1616
external_message_id: &str,
1717
) -> Result<bool, DatabaseError> {
1818
let conn = self.connect().await?;
19-
let mut rows = conn
20-
.query(
21-
"SELECT 1 FROM webhook_message_dedup \
22-
WHERE channel = ?1 AND external_message_id = ?2",
23-
params![channel, external_message_id],
19+
let id = Uuid::new_v4().to_string();
20+
let rows_affected = conn
21+
.execute(
22+
"INSERT INTO webhook_message_dedup (id, channel, external_message_id) \
23+
VALUES (?1, ?2, ?3) \
24+
ON CONFLICT(channel, external_message_id) DO NOTHING",
25+
params![id, channel, external_message_id],
2426
)
2527
.await
2628
.map_err(|e| DatabaseError::Query(e.to_string()))?;
27-
28-
let exists = rows
29-
.next()
30-
.await
31-
.map_err(|e| DatabaseError::Query(e.to_string()))?
32-
.is_some();
33-
Ok(exists)
34-
}
35-
36-
async fn record_webhook_message_processed(
37-
&self,
38-
channel: &str,
39-
external_message_id: &str,
40-
) -> Result<(), DatabaseError> {
41-
let conn = self.connect().await?;
42-
let id = Uuid::new_v4().to_string();
43-
conn.execute(
44-
"INSERT INTO webhook_message_dedup (id, channel, external_message_id) \
45-
VALUES (?1, ?2, ?3) \
46-
ON CONFLICT(channel, external_message_id) DO NOTHING",
47-
params![id, channel, external_message_id],
48-
)
49-
.await
50-
.map_err(|e| DatabaseError::Query(e.to_string()))?;
51-
Ok(())
29+
Ok(rows_affected > 0)
5230
}
5331

5432
async fn cleanup_old_webhook_dedup_records(&self) -> Result<u64, DatabaseError> {

src/db/mod.rs

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -340,24 +340,14 @@ pub trait SettingsStore: Send + Sync {
340340
/// when platforms like WhatsApp retry after a 500 response.
341341
#[async_trait]
342342
pub trait WebhookDedupStore: Send + Sync {
343-
/// Check if a message has already been processed.
344-
///
345-
/// Returns `true` if the message was already processed (duplicate),
346-
/// `false` if this is a new message.
347-
async fn is_webhook_message_processed(
348-
&self,
349-
channel: &str,
350-
external_message_id: &str,
351-
) -> Result<bool, DatabaseError>;
352-
353-
/// Record that a message has been successfully processed.
354-
///
355-
/// Should be called AFTER the message is fully processed (persisted to DB).
343+
/// Try to record that a message is processed, atomically.
344+
/// Returns `true` if this is a new message (was inserted), `false` if it was a duplicate.
345+
/// Uses INSERT ... ON CONFLICT DO NOTHING pattern for atomic dedup with no race condition.
356346
async fn record_webhook_message_processed(
357347
&self,
358348
channel: &str,
359349
external_message_id: &str,
360-
) -> Result<(), DatabaseError>;
350+
) -> Result<bool, DatabaseError>;
361351

362352
/// Clean up old dedup records (older than 7 days).
363353
///

src/db/postgres.rs

Lines changed: 3 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -673,30 +673,11 @@ impl WorkspaceStore for PgBackend {
673673

674674
#[async_trait]
675675
impl WebhookDedupStore for PgBackend {
676-
async fn is_webhook_message_processed(
677-
&self,
678-
channel: &str,
679-
external_message_id: &str,
680-
) -> Result<bool, DatabaseError> {
681-
let client = self.store.pool().get().await?;
682-
let stmt = client
683-
.prepare(
684-
"SELECT 1 FROM webhook_message_dedup \
685-
WHERE channel = $1 AND external_message_id = $2",
686-
)
687-
.await?;
688-
689-
let rows = client
690-
.query(&stmt, &[&channel, &external_message_id])
691-
.await?;
692-
Ok(!rows.is_empty())
693-
}
694-
695676
async fn record_webhook_message_processed(
696677
&self,
697678
channel: &str,
698679
external_message_id: &str,
699-
) -> Result<(), DatabaseError> {
680+
) -> Result<bool, DatabaseError> {
700681
let client = self.store.pool().get().await?;
701682
let stmt = client
702683
.prepare(
@@ -706,10 +687,10 @@ impl WebhookDedupStore for PgBackend {
706687
)
707688
.await?;
708689

709-
client
690+
let rows_affected = client
710691
.execute(&stmt, &[&channel, &external_message_id])
711692
.await?;
712-
Ok(())
693+
Ok(rows_affected > 0)
713694
}
714695

715696
async fn cleanup_old_webhook_dedup_records(&self) -> Result<u64, DatabaseError> {

0 commit comments

Comments
 (0)