Skip to content

Commit 99d808e

Browse files
committed
feat(su): deephash ordering
1 parent 70c3cc6 commit 99d808e

File tree

3 files changed

+60
-6
lines changed

3 files changed

+60
-6
lines changed

servers/su/src/domain/core/bytes.rs

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -740,19 +740,30 @@ impl DataItem {
740740
Utilized for deduplicating incoming messages even
741741
if they have the same id
742742
*/
743-
pub fn deep_hash(&mut self) -> Result<String, ByteErrorType> {
743+
pub fn deep_hash(&mut self, ordered: bool) -> Result<String, ByteErrorType> {
744744
let data_chunk = match &mut self.data {
745745
Data::None => DeepHashChunk::Chunk(Bytes::new()),
746746
Data::Bytes(data) => DeepHashChunk::Chunk(data.clone().into()),
747747
};
748748

749749
let encoded_tags = if !self.tags.is_empty() {
750-
let filtered: Vec<Tag> = self
750+
let mut filtered: Vec<Tag> = self
751751
.tags
752752
.iter()
753753
.filter(|tag| tag.name != "Pushed-For")
754754
.cloned()
755755
.collect();
756+
757+
758+
if ordered {
759+
// Sort tags alphabetically by name, then by value
760+
filtered.sort_by(|a, b| {
761+
match a.name.cmp(&b.name) {
762+
std::cmp::Ordering::Equal => a.value.cmp(&b.value),
763+
other => other,
764+
}
765+
});
766+
}
756767

757768
filtered.encode()?
758769
} else {
@@ -781,6 +792,7 @@ impl DataItem {
781792
anchor: Option<String>,
782793
tags: Vec<Tag>,
783794
data: Vec<u8>,
795+
ordered: bool
784796
) -> Result<String, ByteErrorType> {
785797
let target_chunk = match target {
786798
None => DeepHashChunk::Chunk(Bytes::new()),
@@ -793,12 +805,22 @@ impl DataItem {
793805
};
794806

795807
let encoded_tags = if !tags.is_empty() {
796-
let filtered: Vec<Tag> = tags
808+
let mut filtered: Vec<Tag> = tags
797809
.iter()
798810
.filter(|tag| tag.name != "Pushed-For")
799811
.cloned()
800812
.collect();
801813

814+
if ordered {
815+
// Sort tags alphabetically by name, then by value
816+
filtered.sort_by(|a, b| {
817+
match a.name.cmp(&b.name) {
818+
std::cmp::Ordering::Equal => a.value.cmp(&b.value),
819+
other => other,
820+
}
821+
});
822+
}
823+
802824
filtered.encode()?
803825
} else {
804826
Bytes::default()

servers/su/src/domain/core/flows.rs

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -426,17 +426,36 @@ pub async fn write_item(
426426
None => {
427427
let tx_data = deps.gateway.raw(&assign).await?;
428428
let dh = DataItem::deep_hash_fields(
429+
gateway_tx.recipient.clone(),
430+
gateway_tx.anchor.clone(),
431+
gateway_tx.tags.clone(),
432+
tx_data.clone(),
433+
true
434+
)
435+
.map_err(|_| "Unable to calculate deep hash".to_string())?;
436+
437+
let dh_unordered = DataItem::deep_hash_fields(
429438
gateway_tx.recipient,
430439
gateway_tx.anchor,
431440
gateway_tx.tags,
432441
tx_data,
442+
false
433443
)
434-
.map_err(|_| "Unable to calculate deep hash".to_string())?;
444+
.map_err(|_| "Unable to calculate deep hash unordered".to_string())?;
435445

436446
if deps.config.enable_deep_hash_checks() {
437447
deps.data_store
438448
.check_existing_deep_hash(&process_id, &dh)
439449
.await?;
450+
451+
/*
452+
A safety check for messages that may have been deep hashed
453+
with unordered tags, and have not yet been recalculated with
454+
ordered tags.
455+
*/
456+
deps.data_store
457+
.check_existing_deep_hash(&process_id, &dh_unordered)
458+
.await?;
440459
}
441460

442461
Some(dh)
@@ -621,7 +640,11 @@ pub async fn write_item(
621640
*/
622641
Some(_) => {
623642
let mut mutable_item = data_item.clone();
624-
let deep_hash = match mutable_item.deep_hash() {
643+
let deep_hash = match mutable_item.deep_hash(true) {
644+
Ok(d) => d,
645+
Err(_) => return Err("Unable to calculate deep hash".to_string()),
646+
};
647+
let deep_hash_unordered = match mutable_item.deep_hash(false) {
625648
Ok(d) => d,
626649
Err(_) => return Err("Unable to calculate deep hash".to_string()),
627650
};
@@ -634,6 +657,14 @@ pub async fn write_item(
634657
deps.data_store
635658
.check_existing_deep_hash(&dtarget, &deep_hash)
636659
.await?;
660+
/*
661+
A safety check for messages that may have been deep hashed
662+
with unordered tags, and have not yet been recalculated with
663+
ordered tags.
664+
*/
665+
deps.data_store
666+
.check_existing_deep_hash(&dtarget, &deep_hash_unordered)
667+
.await?;
637668
}
638669

639670
Some(deep_hash)
@@ -808,7 +839,7 @@ pub async fn msg_deephash(
808839

809840
let mut message_item = bundle.items[1].clone();
810841
let deep_hash = match m.tags.iter().find(|tag| tag.name == "From-Process") {
811-
Some(_) => match message_item.deep_hash() {
842+
Some(_) => match message_item.deep_hash(true) {
812843
Ok(d) => Some(d),
813844
Err(_) => return Err("Unable to calculate deep hash".to_string()),
814845
},
@@ -836,6 +867,7 @@ pub async fn msg_deephash(
836867
gateway_tx.anchor,
837868
gateway_tx.tags,
838869
tx_data,
870+
true
839871
)
840872
.map_err(|_| "Unable to calculate deep hash".to_string())?;
841873

servers/su/su

-3.49 KB
Binary file not shown.

0 commit comments

Comments
 (0)