Skip to content

Commit 8646aad

Browse files
committed
feat: add unique IDs to status updates sent outside
This allows for deduplication if status updates are sent over multiple transports.
1 parent 616faff commit 8646aad

File tree

3 files changed

+127
-31
lines changed

3 files changed

+127
-31
lines changed

src/debug_logging.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,15 @@ pub async fn debug_logging_loop(context: &Context, events: Receiver<DebugEventLo
5454
match context
5555
.write_status_update_inner(
5656
&msg_id,
57-
StatusUpdateItem {
57+
&StatusUpdateItem {
5858
payload: json!({
5959
"event": event,
6060
"time": time,
6161
}),
6262
info: None,
6363
summary: None,
6464
document: None,
65+
uid: None,
6566
},
6667
)
6768
.await
@@ -70,10 +71,15 @@ pub async fn debug_logging_loop(context: &Context, events: Receiver<DebugEventLo
7071
eprintln!("Can't log event to webxdc status update: {err:#}");
7172
}
7273
Ok(serial) => {
73-
context.emit_event(EventType::WebxdcStatusUpdate {
74-
msg_id,
75-
status_update_serial: serial,
76-
});
74+
if let Some(serial) = serial {
75+
context.emit_event(EventType::WebxdcStatusUpdate {
76+
msg_id,
77+
status_update_serial: serial,
78+
});
79+
} else {
80+
// This should not happen as the update has no `uid`.
81+
error!(context, "Debug logging update is not created.");
82+
};
7783
}
7884
}
7985
}

src/sql/migrations.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -763,6 +763,28 @@ CREATE INDEX smtp_messageid ON imap(rfc724_mid);
763763
.await?;
764764
}
765765

766+
if dbversion < 105 {
767+
// Create UNIQUE uid column and drop unused update_item_read column.
768+
sql.execute_migration(
769+
r#"CREATE TABLE new_msgs_status_updates (
770+
id INTEGER PRIMARY KEY AUTOINCREMENT,
771+
msg_id INTEGER,
772+
update_item TEXT DEFAULT '',
773+
uid TEXT UNIQUE
774+
);
775+
INSERT OR IGNORE INTO new_msgs_status_updates SELECT
776+
id, msg_id, update_item, NULL
777+
FROM msgs_status_updates;
778+
DROP TABLE msgs_status_updates;
779+
ALTER TABLE new_msgs_status_updates RENAME TO msgs_status_updates;
780+
CREATE INDEX msgs_status_updates_index1 ON msgs_status_updates (msg_id);
781+
CREATE INDEX msgs_status_updates_index2 ON msgs_status_updates (uid);
782+
"#,
783+
105,
784+
)
785+
.await?;
786+
}
787+
766788
let new_version = sql
767789
.get_raw_config_int(VERSION_CFG)
768790
.await?

src/webxdc.rs

Lines changed: 94 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
//! - `id` - status update serial number
66
//! - `msg_id` - ID of the message in the `msgs` table
77
//! - `update_item` - JSON representation of the status update
8+
//! - `uid` - "id" field of the update, used for deduplication
89
//!
910
//! Status updates are scheduled for sending by adding a record
1011
//! to `smtp_status_updates_table` SQL table.
@@ -37,6 +38,7 @@ use crate::mimefactory::wrapped_base64_encode;
3738
use crate::mimeparser::SystemMessage;
3839
use crate::param::Param;
3940
use crate::param::Params;
41+
use crate::tools::create_id;
4042
use crate::tools::strip_rtlo_characters;
4143
use crate::tools::{create_smeared_timestamp, get_abs_path};
4244

@@ -178,6 +180,13 @@ pub struct StatusUpdateItem {
178180
/// for a voting app.
179181
#[serde(skip_serializing_if = "Option::is_none")]
180182
pub summary: Option<String>,
183+
184+
/// Unique ID for deduplication.
185+
/// This can be used if the message is sent over multiple transports.
186+
///
187+
/// If there is no ID, message is always considered to be unique.
188+
#[serde(skip_serializing_if = "Option::is_none")]
189+
pub uid: Option<String>,
181190
}
182191

183192
/// Update items as passed to the UIs.
@@ -317,7 +326,14 @@ impl Context {
317326
timestamp: i64,
318327
can_info_msg: bool,
319328
from_id: ContactId,
320-
) -> Result<StatusUpdateSerial> {
329+
) -> Result<Option<StatusUpdateSerial>> {
330+
let Some(status_update_serial) = self
331+
.write_status_update_inner(&instance.id, &status_update_item)
332+
.await?
333+
else {
334+
return Ok(None);
335+
};
336+
321337
if can_info_msg {
322338
if let Some(ref info) = status_update_item.info {
323339
if let Some(info_msg_id) =
@@ -376,34 +392,49 @@ impl Context {
376392
self.emit_msgs_changed(instance.chat_id, instance.id);
377393
}
378394

379-
let status_update_serial = self
380-
.write_status_update_inner(&instance.id, status_update_item)
381-
.await?;
382-
383395
if instance.viewtype == Viewtype::Webxdc {
384396
self.emit_event(EventType::WebxdcStatusUpdate {
385397
msg_id: instance.id,
386398
status_update_serial,
387399
});
388400
}
389401

390-
Ok(status_update_serial)
402+
Ok(Some(status_update_serial))
391403
}
392404

405+
/// Inserts a status update item into `msgs_status_updates` table.
406+
///
407+
/// Returns serial ID of the status update if a new item is inserted.
393408
pub(crate) async fn write_status_update_inner(
394409
&self,
395410
instance_id: &MsgId,
396-
status_update_item: StatusUpdateItem,
397-
) -> Result<StatusUpdateSerial> {
398-
let rowid = self
411+
status_update_item: &StatusUpdateItem,
412+
) -> Result<Option<StatusUpdateSerial>> {
413+
let uid = status_update_item.uid.as_deref();
414+
let Some(rowid) = self
399415
.sql
400-
.insert(
401-
"INSERT INTO msgs_status_updates (msg_id, update_item) VALUES(?, ?);",
402-
(instance_id, serde_json::to_string(&status_update_item)?),
416+
.query_row_optional(
417+
"INSERT INTO msgs_status_updates (msg_id, update_item, uid) VALUES(?, ?, ?)
418+
ON CONFLICT (uid) DO NOTHING
419+
RETURNING id",
420+
(
421+
instance_id,
422+
serde_json::to_string(&status_update_item)?,
423+
uid,
424+
),
425+
|row| {
426+
let id: u32 = row.get(0)?;
427+
Ok(id)
428+
},
403429
)
404-
.await?;
430+
.await?
431+
else {
432+
let uid = uid.unwrap_or("-");
433+
info!(self, "Ignoring duplicate status update with uid={uid}");
434+
return Ok(None);
435+
};
405436
let status_update_serial = StatusUpdateSerial(u32::try_from(rowid)?);
406-
Ok(status_update_serial)
437+
Ok(Some(status_update_serial))
407438
}
408439

409440
/// Returns the update_item with `status_update_serial` from the webxdc with message id `msg_id`.
@@ -449,7 +480,7 @@ impl Context {
449480
pub async fn send_webxdc_status_update_struct(
450481
&self,
451482
instance_msg_id: MsgId,
452-
status_update: StatusUpdateItem,
483+
mut status_update: StatusUpdateItem,
453484
descr: &str,
454485
) -> Result<()> {
455486
let mut instance = Message::load_from_db(self, instance_msg_id).await?;
@@ -467,6 +498,7 @@ impl Context {
467498
MessageState::Undefined | MessageState::OutPreparing | MessageState::OutDraft
468499
);
469500

501+
status_update.uid = Some(create_id());
470502
let status_update_serial: StatusUpdateSerial = self
471503
.create_status_update_record(
472504
&mut instance,
@@ -475,7 +507,8 @@ impl Context {
475507
send_now,
476508
ContactId::SELF,
477509
)
478-
.await?;
510+
.await?
511+
.context("Failed to create status update")?;
479512

480513
if send_now {
481514
self.sql.insert(
@@ -655,7 +688,10 @@ impl Context {
655688
let (update_item_str, serial) = row;
656689
let update_item = StatusUpdateItemAndSerial
657690
{
658-
item: serde_json::from_str(&update_item_str)?,
691+
item: StatusUpdateItem {
692+
uid: None, // Erase UIDs, apps, bots and tests don't need to know them.
693+
..serde_json::from_str(&update_item_str)?
694+
},
659695
serial,
660696
max_serial,
661697
};
@@ -1348,18 +1384,39 @@ mod tests {
13481384
info: None,
13491385
document: None,
13501386
summary: None,
1387+
uid: Some("iecie2Ze".to_string()),
13511388
},
13521389
1640178619,
13531390
true,
13541391
ContactId::SELF,
13551392
)
1356-
.await?;
1393+
.await?
1394+
.unwrap();
13571395
assert_eq!(
13581396
t.get_webxdc_status_updates(instance.id, StatusUpdateSerial(0))
13591397
.await?,
13601398
r#"[{"payload":{"foo":"bar"},"serial":1,"max_serial":1}]"#
13611399
);
13621400

1401+
// Update with duplicate update ID is received.
1402+
// Whatever the payload is, update should be ignored just because ID is duplicate.
1403+
let update_id1_duplicate = t
1404+
.create_status_update_record(
1405+
&mut instance,
1406+
StatusUpdateItem {
1407+
payload: json!({"nothing": "this should be ignored"}),
1408+
info: None,
1409+
document: None,
1410+
summary: None,
1411+
uid: Some("iecie2Ze".to_string()),
1412+
},
1413+
1640178619,
1414+
true,
1415+
ContactId::SELF,
1416+
)
1417+
.await?;
1418+
assert_eq!(update_id1_duplicate, None);
1419+
13631420
assert!(t
13641421
.send_webxdc_status_update(instance.id, "\n\n\n", "")
13651422
.await
@@ -1384,15 +1441,17 @@ mod tests {
13841441
info: None,
13851442
document: None,
13861443
summary: None,
1444+
uid: None,
13871445
},
13881446
1640178619,
13891447
true,
13901448
ContactId::SELF,
13911449
)
1392-
.await?;
1450+
.await?
1451+
.unwrap();
13931452
assert_eq!(
13941453
t.get_webxdc_status_updates(instance.id, update_id1).await?,
1395-
r#"[{"payload":{"foo2":"bar2"},"serial":2,"max_serial":2}]"#
1454+
r#"[{"payload":{"foo2":"bar2"},"serial":3,"max_serial":3}]"#
13961455
);
13971456
t.create_status_update_record(
13981457
&mut instance,
@@ -1401,6 +1460,7 @@ mod tests {
14011460
info: None,
14021461
document: None,
14031462
summary: None,
1463+
uid: None,
14041464
},
14051465
1640178619,
14061466
true,
@@ -1410,9 +1470,9 @@ mod tests {
14101470
assert_eq!(
14111471
t.get_webxdc_status_updates(instance.id, StatusUpdateSerial(0))
14121472
.await?,
1413-
r#"[{"payload":{"foo":"bar"},"serial":1,"max_serial":3},
1414-
{"payload":{"foo2":"bar2"},"serial":2,"max_serial":3},
1415-
{"payload":true,"serial":3,"max_serial":3}]"#
1473+
r#"[{"payload":{"foo":"bar"},"serial":1,"max_serial":4},
1474+
{"payload":{"foo2":"bar2"},"serial":3,"max_serial":4},
1475+
{"payload":true,"serial":4,"max_serial":4}]"#
14161476
);
14171477

14181478
t.send_webxdc_status_update(
@@ -1423,8 +1483,8 @@ mod tests {
14231483
.await?;
14241484
assert_eq!(
14251485
t.get_webxdc_status_updates(instance.id, update_id2).await?,
1426-
r#"[{"payload":true,"serial":3,"max_serial":4},
1427-
{"payload":1,"serial":4,"max_serial":4}]"#
1486+
r#"[{"payload":true,"serial":4,"max_serial":5},
1487+
{"payload":1,"serial":5,"max_serial":5}]"#
14281488
);
14291489

14301490
Ok(())
@@ -1654,6 +1714,8 @@ mod tests {
16541714

16551715
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
16561716
async fn test_render_webxdc_status_update_object_range() -> Result<()> {
1717+
use regex::Regex;
1718+
16571719
let t = TestContext::new_alice().await;
16581720
let chat_id = create_group_chat(&t, ProtectionStatus::Unprotected, "a chat").await?;
16591721
let instance = send_webxdc_instance(&t, chat_id).await?;
@@ -1672,7 +1734,13 @@ mod tests {
16721734
)
16731735
.await?
16741736
.unwrap();
1675-
assert_eq!(json, "{\"updates\":[{\"payload\":2},\n{\"payload\":3}]}");
1737+
let json = Regex::new(r#""uid":"[^"]*""#)
1738+
.unwrap()
1739+
.replace_all(&json, "XXX");
1740+
assert_eq!(
1741+
json,
1742+
"{\"updates\":[{\"payload\":2,XXX},\n{\"payload\":3,XXX}]}"
1743+
);
16761744

16771745
assert_eq!(
16781746
t.sql

0 commit comments

Comments
 (0)