Skip to content

WIP txHandler: do not rebroadcast to peers sent duplicate messages #5424

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 25 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
d48cb17
separate digestCacheData
algorandskiy May 26, 2023
973dcb2
store senders in a cache
algorandskiy May 26, 2023
f07f0b9
make RelayArray accept map of peers
algorandskiy May 26, 2023
69b4e25
fix linter
algorandskiy May 26, 2023
5a9506c
Add RelayArray to mock to fix tests
algorandskiy May 26, 2023
2cc0b75
fix linter
algorandskiy May 26, 2023
93dddb2
fix data race
algorandskiy May 26, 2023
19bb0bb
WIP test
algorandskiy Jun 1, 2023
57b7654
Add no dups relaying test
algorandskiy Jun 2, 2023
676c145
switch from map + copy to a pointer to sync.Map
algorandskiy Jun 2, 2023
ad563e5
fix tests
algorandskiy Jun 2, 2023
f882340
use c.prev as a alloc size guidance
algorandskiy Jun 2, 2023
440b63b
fix lint
algorandskiy Jun 2, 2023
01ac24b
Merge remote-tracking branch 'upstream/master' into pavel/dup-relays-fix
algorandskiy Jun 2, 2023
9230403
CR fixes: remove extra data types
algorandskiy Jun 5, 2023
4b6e9ef
CR fixes
algorandskiy Jun 6, 2023
a9da829
Merge remote-tracking branch 'upstream/master' into pavel/dup-relays-fix
algorandskiy Jun 6, 2023
366b7a1
CR feedback: use eventually
algorandskiy Jun 6, 2023
ce6e517
Merge remote-tracking branch 'upstream/master' into pavel/dup-relays-fix
algorandskiy Jun 9, 2023
6b27c17
duplicates + unique senders benchmark
algorandskiy Jun 9, 2023
1685b49
refactor locks
algorandskiy Jun 9, 2023
bf3bad5
remove page from innerCheck
algorandskiy Jun 9, 2023
5129bd0
use LoadOrStore
algorandskiy Jun 9, 2023
e462b90
add ignore metric
algorandskiy Jun 9, 2023
2fa4046
Add except and exceptMany to reduce allocations
algorandskiy Jun 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions components/mocks/mockNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"net"
"net/http"
"sync"

"github.com/algorand/go-algorand/network"
"github.com/algorand/go-algorand/protocol"
Expand All @@ -35,11 +36,21 @@ func (network *MockNetwork) Broadcast(ctx context.Context, tag protocol.Tag, dat
return nil
}

// BroadcastArray - unused function
func (network *MockNetwork) BroadcastArray(ctx context.Context, tag []protocol.Tag, data [][]byte, wait bool, except network.Peer, exceptMany *sync.Map) error {
return nil
}

// Relay - unused function
func (network *MockNetwork) Relay(ctx context.Context, tag protocol.Tag, data []byte, wait bool, except network.Peer) error {
return nil
}

// RelayArray - unused function
func (network *MockNetwork) RelayArray(ctx context.Context, tag []protocol.Tag, data [][]byte, wait bool, except *sync.Map) error {
return nil
}

// Address - unused function
func (network *MockNetwork) Address() (string, bool) {
return "mock network", true
Expand Down
96 changes: 58 additions & 38 deletions data/txDupCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/network"
"github.com/algorand/go-deadlock"

"golang.org/x/crypto/blake2b"
Expand Down Expand Up @@ -101,10 +102,14 @@
delete(c.prev, *d)
}

// txSaltedCache is a digest cache with a rotating salt
// uses blake2b hash function
// txSaltedCache is a cache with a rotating salt
// uses blake2b hash function, and stores set of values per each entry
type txSaltedCache struct {
digestCache
cur map[crypto.Digest]*sync.Map
prev map[crypto.Digest]*sync.Map

maxSize int
mu deadlock.RWMutex

curSalt [4]byte
prevSalt [4]byte
Expand All @@ -114,7 +119,8 @@

func makeSaltedCache(size int) *txSaltedCache {
return &txSaltedCache{
digestCache: *makeDigestCache(size),
cur: map[crypto.Digest]*sync.Map{},
maxSize: size,
}
}

Expand Down Expand Up @@ -159,28 +165,23 @@
func (c *txSaltedCache) Remix() {
c.mu.Lock()
defer c.mu.Unlock()
c.innerSwap(true)
c.innerSwap()
}

// innerSwap rotates cache pages and update the salt used.
// locking semantic: write lock must be held
func (c *txSaltedCache) innerSwap(scheduled bool) {
func (c *txSaltedCache) innerSwap() {
c.prevSalt = c.curSalt
c.prev = c.cur

if scheduled {
// updating by timer, the prev size is a good estimation of a current load => preallocate
c.cur = make(map[crypto.Digest]struct{}, len(c.prev))
} else {
// otherwise start empty
c.cur = map[crypto.Digest]struct{}{}
}
// the prev size is a good estimation of a current load
c.cur = make(map[crypto.Digest]*sync.Map, len(c.prev))
c.moreSalt()
}

// innerCheck returns true if exists, and the current salted hash if does not.
// locking semantic: write lock must be held
func (c *txSaltedCache) innerCheck(msg []byte) (*crypto.Digest, bool) {
// innerCheck returns true if exists, the salted hash if does not exist
// locking semantic: read lock must be held
func (c *txSaltedCache) innerCheck(msg []byte) (*crypto.Digest, *sync.Map, bool) {
ptr := saltedPool.Get()
defer saltedPool.Put(ptr)

Expand All @@ -191,53 +192,59 @@

d := crypto.Digest(blake2b.Sum256(toBeHashed))

_, found := c.cur[d]
v, found := c.cur[d]
if found {
return nil, true
return &d, v, true
}

toBeHashed = append(toBeHashed[:len(msg)], c.prevSalt[:]...)
toBeHashed = toBeHashed[:len(msg)+len(c.prevSalt)]
pd := crypto.Digest(blake2b.Sum256(toBeHashed))
_, found = c.prev[pd]
v, found = c.prev[pd]
if found {
return nil, true
return &pd, v, true
}
return &d, false
return &d, nil, false
}

// CheckAndPut adds msg into a cache if not found
// returns a hashing key used for insertion if the message not found.
func (c *txSaltedCache) CheckAndPut(msg []byte) (*crypto.Digest, bool) {
// CheckAndPut adds (msg, sender) pair into a cache. The sender is appended into values if msg is already in the cache.
// Returns a hashing key used for insertion and its associated map of values. The boolean flag `found` is false if the msg not in the cache.
func (c *txSaltedCache) CheckAndPut(msg []byte, sender network.Peer) (d *crypto.Digest, vals *sync.Map, found bool) {
c.mu.RLock()
d, found := c.innerCheck(msg)
d, vals, found = c.innerCheck(msg)
salt := c.curSalt
c.mu.RUnlock()
// fast read-only path: assuming most messages are duplicates, hash msg and check cache
// keep lock - it is needed for copying vals in defer
if found {
return d, found
vals.LoadOrStore(sender, struct{}{})
return d, vals, true
}

// not found: acquire write lock to add this msg hash to cache
c.mu.Lock()
defer c.mu.Unlock()
// salt may have changed between RUnlock() and Lock(), rehash if needed
if salt != c.curSalt {
d, found = c.innerCheck(msg)
d, vals, found = c.innerCheck(msg)

Check warning on line 228 in data/txDupCache.go

View check run for this annotation

Codecov / codecov/patch

data/txDupCache.go#L228

Added line #L228 was not covered by tests
if found {
// already added to cache between RUnlock() and Lock(), return
return d, found
c.mu.Unlock()
vals.LoadOrStore(sender, struct{}{})
return d, vals, true

Check warning on line 232 in data/txDupCache.go

View check run for this annotation

Codecov / codecov/patch

data/txDupCache.go#L230-L232

Added lines #L230 - L232 were not covered by tests
}
} else {
} else { // not found or found in cur page
// Do another check to see if another copy of the transaction won the race to write it to the cache
// Only check current to save a lookup since swaps are rare and no need to re-hash
if _, found := c.cur[*d]; found {
return d, found
// Only check current to save a lookup since swap is handled in the first branch
vals, found = c.cur[*d]
if found {
c.mu.Unlock()
vals.LoadOrStore(sender, struct{}{})
return d, vals, true

Check warning on line 241 in data/txDupCache.go

View check run for this annotation

Codecov / codecov/patch

data/txDupCache.go#L239-L241

Added lines #L239 - L241 were not covered by tests
}
}
defer c.mu.Unlock()

if len(c.cur) >= c.maxSize {
c.innerSwap(false)
c.innerSwap()
ptr := saltedPool.Get()
defer saltedPool.Put(ptr)

Expand All @@ -250,13 +257,26 @@
d = &dn
}

c.cur[*d] = struct{}{}
return d, false
vals = &sync.Map{}
vals.Store(sender, struct{}{})
c.cur[*d] = vals
return d, vals, false
}

// DeleteByKey from the cache by using a key used for insertion
func (c *txSaltedCache) DeleteByKey(d *crypto.Digest) {
c.digestCache.Delete(d)
c.mu.Lock()
defer c.mu.Unlock()
delete(c.cur, *d)
delete(c.prev, *d)
}

// Len returns size of a cache
func (c *txSaltedCache) Len() int {
c.mu.Lock()
defer c.mu.Unlock()

return len(c.cur) + len(c.prev)
}

var saltedPool = sync.Pool{
Expand Down
Loading