Skip to content

Commit 64b60c7

Browse files
authored
Merge pull request #22762 from karalabe/snap-lower-complexity
core, eth, ethdb, trie: simplify range proofs
2 parents a81cf0d + fae165a commit 64b60c7

File tree

12 files changed

+149
-237
lines changed

12 files changed

+149
-237
lines changed

core/rawdb/table.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -176,11 +176,6 @@ func (b *tableBatch) Delete(key []byte) error {
176176
return b.batch.Delete(append([]byte(b.prefix), key...))
177177
}
178178

179-
// KeyCount retrieves the number of keys queued up for writing.
180-
func (b *tableBatch) KeyCount() int {
181-
return b.batch.KeyCount()
182-
}
183-
184179
// ValueSize retrieves the amount of data queued up for writing.
185180
func (b *tableBatch) ValueSize() int {
186181
return b.batch.ValueSize()

core/state/snapshot/generate.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,7 @@ func (dl *diskLayer) proveRange(stats *generatorStats, root common.Hash, prefix
368368
}
369369
// Verify the snapshot segment with range prover, ensure that all flat states
370370
// in this range correspond to merkle trie.
371-
_, cont, err := trie.VerifyRangeProof(root, origin, last, keys, vals, proof)
371+
cont, err := trie.VerifyRangeProof(root, origin, last, keys, vals, proof)
372372
return &proofResult{
373373
keys: keys,
374374
vals: vals,

eth/protocols/snap/sync.go

Lines changed: 64 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -202,9 +202,8 @@ type storageResponse struct {
202202
accounts []common.Hash // Account hashes requested, may be only partially filled
203203
roots []common.Hash // Storage roots requested, may be only partially filled
204204

205-
hashes [][]common.Hash // Storage slot hashes in the returned range
206-
slots [][][]byte // Storage slot values in the returned range
207-
nodes []ethdb.KeyValueStore // Database containing the reconstructed trie nodes
205+
hashes [][]common.Hash // Storage slot hashes in the returned range
206+
slots [][][]byte // Storage slot values in the returned range
208207

209208
cont bool // Whether the last storage range has a continuation
210209
}
@@ -680,12 +679,22 @@ func (s *Syncer) loadSyncStatus() {
680679
}
681680
s.tasks = progress.Tasks
682681
for _, task := range s.tasks {
683-
task.genBatch = s.db.NewBatch()
682+
task.genBatch = ethdb.HookedBatch{
683+
Batch: s.db.NewBatch(),
684+
OnPut: func(key []byte, value []byte) {
685+
s.accountBytes += common.StorageSize(len(key) + len(value))
686+
},
687+
}
684688
task.genTrie = trie.NewStackTrie(task.genBatch)
685689

686690
for _, subtasks := range task.SubTasks {
687691
for _, subtask := range subtasks {
688-
subtask.genBatch = s.db.NewBatch()
692+
subtask.genBatch = ethdb.HookedBatch{
693+
Batch: s.db.NewBatch(),
694+
OnPut: func(key []byte, value []byte) {
695+
s.storageBytes += common.StorageSize(len(key) + len(value))
696+
},
697+
}
689698
subtask.genTrie = trie.NewStackTrie(task.genBatch)
690699
}
691700
}
@@ -729,7 +738,12 @@ func (s *Syncer) loadSyncStatus() {
729738
// Make sure we don't overflow if the step is not a proper divisor
730739
last = common.HexToHash("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")
731740
}
732-
batch := s.db.NewBatch()
741+
batch := ethdb.HookedBatch{
742+
Batch: s.db.NewBatch(),
743+
OnPut: func(key []byte, value []byte) {
744+
s.accountBytes += common.StorageSize(len(key) + len(value))
745+
},
746+
}
733747
s.tasks = append(s.tasks, &accountTask{
734748
Next: next,
735749
Last: last,
@@ -746,19 +760,14 @@ func (s *Syncer) loadSyncStatus() {
746760
func (s *Syncer) saveSyncStatus() {
747761
// Serialize any partial progress to disk before spinning down
748762
for _, task := range s.tasks {
749-
keys, bytes := task.genBatch.KeyCount(), task.genBatch.ValueSize()
750763
if err := task.genBatch.Write(); err != nil {
751764
log.Error("Failed to persist account slots", "err", err)
752765
}
753-
s.accountBytes += common.StorageSize(keys*common.HashLength + bytes)
754-
755766
for _, subtasks := range task.SubTasks {
756767
for _, subtask := range subtasks {
757-
keys, bytes := subtask.genBatch.KeyCount(), subtask.genBatch.ValueSize()
758768
if err := subtask.genBatch.Write(); err != nil {
759769
log.Error("Failed to persist storage slots", "err", err)
760770
}
761-
s.accountBytes += common.StorageSize(keys*common.HashLength + bytes)
762771
}
763772
}
764773
}
@@ -1763,12 +1772,15 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
17631772
if res.subTask != nil {
17641773
res.subTask.req = nil
17651774
}
1766-
batch := s.db.NewBatch()
1767-
1775+
batch := ethdb.HookedBatch{
1776+
Batch: s.db.NewBatch(),
1777+
OnPut: func(key []byte, value []byte) {
1778+
s.storageBytes += common.StorageSize(len(key) + len(value))
1779+
},
1780+
}
17681781
var (
1769-
slots int
1770-
nodes int
1771-
bytes common.StorageSize
1782+
slots int
1783+
oldStorageBytes = s.storageBytes
17721784
)
17731785
// Iterate over all the accounts and reconstruct their storage tries from the
17741786
// delivered slots
@@ -1829,7 +1841,12 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
18291841
r := newHashRange(lastKey, chunks)
18301842

18311843
// Our first task is the one that was just filled by this response.
1832-
batch := s.db.NewBatch()
1844+
batch := ethdb.HookedBatch{
1845+
Batch: s.db.NewBatch(),
1846+
OnPut: func(key []byte, value []byte) {
1847+
s.storageBytes += common.StorageSize(len(key) + len(value))
1848+
},
1849+
}
18331850
tasks = append(tasks, &storageTask{
18341851
Next: common.Hash{},
18351852
Last: r.End(),
@@ -1838,7 +1855,12 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
18381855
genTrie: trie.NewStackTrie(batch),
18391856
})
18401857
for r.Next() {
1841-
batch := s.db.NewBatch()
1858+
batch := ethdb.HookedBatch{
1859+
Batch: s.db.NewBatch(),
1860+
OnPut: func(key []byte, value []byte) {
1861+
s.storageBytes += common.StorageSize(len(key) + len(value))
1862+
},
1863+
}
18421864
tasks = append(tasks, &storageTask{
18431865
Next: r.Start(),
18441866
Last: r.End(),
@@ -1883,27 +1905,23 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
18831905
}
18841906
}
18851907
}
1886-
// Iterate over all the reconstructed trie nodes and push them to disk
1887-
// if the contract is fully delivered. If it's chunked, the trie nodes
1888-
// will be reconstructed later.
1908+
// Iterate over all the complete contracts, reconstruct the trie nodes and
1909+
// push them to disk. If the contract is chunked, the trie nodes will be
1910+
// reconstructed later.
18891911
slots += len(res.hashes[i])
18901912

18911913
if i < len(res.hashes)-1 || res.subTask == nil {
1892-
it := res.nodes[i].NewIterator(nil, nil)
1893-
for it.Next() {
1894-
batch.Put(it.Key(), it.Value())
1895-
1896-
bytes += common.StorageSize(common.HashLength + len(it.Value()))
1897-
nodes++
1914+
tr := trie.NewStackTrie(batch)
1915+
for j := 0; j < len(res.hashes[i]); j++ {
1916+
tr.Update(res.hashes[i][j][:], res.slots[i][j])
18981917
}
1899-
it.Release()
1918+
tr.Commit()
19001919
}
19011920
// Persist the received storage segements. These flat state maybe
19021921
// outdated during the sync, but it can be fixed later during the
19031922
// snapshot generation.
19041923
for j := 0; j < len(res.hashes[i]); j++ {
19051924
rawdb.WriteStorageSnapshot(batch, account, res.hashes[i][j], res.slots[i][j])
1906-
bytes += common.StorageSize(1 + 2*common.HashLength + len(res.slots[i][j]))
19071925

19081926
// If we're storing large contracts, generate the trie nodes
19091927
// on the fly to not trash the gluing points
@@ -1926,25 +1944,20 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
19261944
}
19271945
}
19281946
}
1929-
if data := res.subTask.genBatch.ValueSize(); data > ethdb.IdealBatchSize || res.subTask.done {
1930-
keys := res.subTask.genBatch.KeyCount()
1947+
if res.subTask.genBatch.ValueSize() > ethdb.IdealBatchSize || res.subTask.done {
19311948
if err := res.subTask.genBatch.Write(); err != nil {
19321949
log.Error("Failed to persist stack slots", "err", err)
19331950
}
19341951
res.subTask.genBatch.Reset()
1935-
1936-
bytes += common.StorageSize(keys*common.HashLength + data)
1937-
nodes += keys
19381952
}
19391953
}
19401954
// Flush anything written just now and update the stats
19411955
if err := batch.Write(); err != nil {
19421956
log.Crit("Failed to persist storage slots", "err", err)
19431957
}
19441958
s.storageSynced += uint64(slots)
1945-
s.storageBytes += bytes
19461959

1947-
log.Debug("Persisted set of storage slots", "accounts", len(res.hashes), "slots", slots, "nodes", nodes, "bytes", bytes)
1960+
log.Debug("Persisted set of storage slots", "accounts", len(res.hashes), "slots", slots, "bytes", s.storageBytes-oldStorageBytes)
19481961

19491962
// If this delivery completed the last pending task, forward the account task
19501963
// to the next chunk
@@ -2042,18 +2055,20 @@ func (s *Syncer) forwardAccountTask(task *accountTask) {
20422055
// Persist the received account segements. These flat state maybe
20432056
// outdated during the sync, but it can be fixed later during the
20442057
// snapshot generation.
2045-
var (
2046-
nodes int
2047-
bytes common.StorageSize
2048-
)
2049-
batch := s.db.NewBatch()
2058+
oldAccountBytes := s.accountBytes
2059+
2060+
batch := ethdb.HookedBatch{
2061+
Batch: s.db.NewBatch(),
2062+
OnPut: func(key []byte, value []byte) {
2063+
s.accountBytes += common.StorageSize(len(key) + len(value))
2064+
},
2065+
}
20502066
for i, hash := range res.hashes {
20512067
if task.needCode[i] || task.needState[i] {
20522068
break
20532069
}
20542070
slim := snapshot.SlimAccountRLP(res.accounts[i].Nonce, res.accounts[i].Balance, res.accounts[i].Root, res.accounts[i].CodeHash)
20552071
rawdb.WriteAccountSnapshot(batch, hash, slim)
2056-
bytes += common.StorageSize(1 + common.HashLength + len(slim))
20572072

20582073
// If the task is complete, drop it into the stack trie to generate
20592074
// account trie nodes for it
@@ -2069,7 +2084,6 @@ func (s *Syncer) forwardAccountTask(task *accountTask) {
20692084
if err := batch.Write(); err != nil {
20702085
log.Crit("Failed to persist accounts", "err", err)
20712086
}
2072-
s.accountBytes += bytes
20732087
s.accountSynced += uint64(len(res.accounts))
20742088

20752089
// Task filling persisted, push it the chunk marker forward to the first
@@ -2091,17 +2105,13 @@ func (s *Syncer) forwardAccountTask(task *accountTask) {
20912105
log.Error("Failed to commit stack account", "err", err)
20922106
}
20932107
}
2094-
if data := task.genBatch.ValueSize(); data > ethdb.IdealBatchSize || task.done {
2095-
keys := task.genBatch.KeyCount()
2108+
if task.genBatch.ValueSize() > ethdb.IdealBatchSize || task.done {
20962109
if err := task.genBatch.Write(); err != nil {
20972110
log.Error("Failed to persist stack account", "err", err)
20982111
}
20992112
task.genBatch.Reset()
2100-
2101-
nodes += keys
2102-
bytes += common.StorageSize(keys*common.HashLength + data)
21032113
}
2104-
log.Debug("Persisted range of accounts", "accounts", len(res.accounts), "nodes", nodes, "bytes", bytes)
2114+
log.Debug("Persisted range of accounts", "accounts", len(res.accounts), "bytes", s.accountBytes-oldAccountBytes)
21052115
}
21062116

21072117
// OnAccounts is a callback method to invoke when a range of accounts are
@@ -2176,7 +2186,7 @@ func (s *Syncer) OnAccounts(peer SyncPeer, id uint64, hashes []common.Hash, acco
21762186
if len(keys) > 0 {
21772187
end = keys[len(keys)-1]
21782188
}
2179-
_, cont, err := trie.VerifyRangeProof(root, req.origin[:], end, keys, accounts, proofdb)
2189+
cont, err := trie.VerifyRangeProof(root, req.origin[:], end, keys, accounts, proofdb)
21802190
if err != nil {
21812191
logger.Warn("Account range failed proof", "err", err)
21822192
// Signal this request as failed, and ready for rescheduling
@@ -2393,10 +2403,8 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo
23932403
s.lock.Unlock()
23942404

23952405
// Reconstruct the partial tries from the response and verify them
2396-
var (
2397-
dbs = make([]ethdb.KeyValueStore, len(hashes))
2398-
cont bool
2399-
)
2406+
var cont bool
2407+
24002408
for i := 0; i < len(hashes); i++ {
24012409
// Convert the keys and proofs into an internal format
24022410
keys := make([][]byte, len(hashes[i]))
@@ -2413,7 +2421,7 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo
24132421
if len(nodes) == 0 {
24142422
// No proof has been attached, the response must cover the entire key
24152423
// space and hash to the origin root.
2416-
dbs[i], _, err = trie.VerifyRangeProof(req.roots[i], nil, nil, keys, slots[i], nil)
2424+
_, err = trie.VerifyRangeProof(req.roots[i], nil, nil, keys, slots[i], nil)
24172425
if err != nil {
24182426
s.scheduleRevertStorageRequest(req) // reschedule request
24192427
logger.Warn("Storage slots failed proof", "err", err)
@@ -2428,7 +2436,7 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo
24282436
if len(keys) > 0 {
24292437
end = keys[len(keys)-1]
24302438
}
2431-
dbs[i], cont, err = trie.VerifyRangeProof(req.roots[i], req.origin[:], end, keys, slots[i], proofdb)
2439+
cont, err = trie.VerifyRangeProof(req.roots[i], req.origin[:], end, keys, slots[i], proofdb)
24322440
if err != nil {
24332441
s.scheduleRevertStorageRequest(req) // reschedule request
24342442
logger.Warn("Storage range failed proof", "err", err)
@@ -2444,7 +2452,6 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo
24442452
roots: req.roots,
24452453
hashes: hashes,
24462454
slots: slots,
2447-
nodes: dbs,
24482455
cont: cont,
24492456
}
24502457
select {

ethdb/batch.go

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,6 @@ const IdealBatchSize = 100 * 1024
2525
type Batch interface {
2626
KeyValueWriter
2727

28-
// KeyCount retrieves the number of keys queued up for writing.
29-
KeyCount() int
30-
3128
// ValueSize retrieves the amount of data queued up for writing.
3229
ValueSize() int
3330

@@ -47,3 +44,28 @@ type Batcher interface {
4744
// until a final write is called.
4845
NewBatch() Batch
4946
}
47+
48+
// HookedBatch wraps an arbitrary batch where each operation may be hooked into
49+
// to monitor from black box code.
50+
type HookedBatch struct {
51+
Batch
52+
53+
OnPut func(key []byte, value []byte) // Callback if a key is inserted
54+
OnDelete func(key []byte) // Callback if a key is deleted
55+
}
56+
57+
// Put inserts the given value into the key-value data store.
58+
func (b HookedBatch) Put(key []byte, value []byte) error {
59+
if b.OnPut != nil {
60+
b.OnPut(key, value)
61+
}
62+
return b.Batch.Put(key, value)
63+
}
64+
65+
// Delete removes the key from the key-value data store.
66+
func (b HookedBatch) Delete(key []byte) error {
67+
if b.OnDelete != nil {
68+
b.OnDelete(key)
69+
}
70+
return b.Batch.Delete(key)
71+
}

ethdb/leveldb/leveldb.go

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -448,7 +448,6 @@ func (db *Database) meter(refresh time.Duration) {
448448
type batch struct {
449449
db *leveldb.DB
450450
b *leveldb.Batch
451-
keys int
452451
size int
453452
}
454453

@@ -462,16 +461,10 @@ func (b *batch) Put(key, value []byte) error {
462461
// Delete inserts the a key removal into the batch for later committing.
463462
func (b *batch) Delete(key []byte) error {
464463
b.b.Delete(key)
465-
b.keys++
466464
b.size += len(key)
467465
return nil
468466
}
469467

470-
// KeyCount retrieves the number of keys queued up for writing.
471-
func (b *batch) KeyCount() int {
472-
return b.keys
473-
}
474-
475468
// ValueSize retrieves the amount of data queued up for writing.
476469
func (b *batch) ValueSize() int {
477470
return b.size
@@ -485,7 +478,7 @@ func (b *batch) Write() error {
485478
// Reset resets the batch for reuse.
486479
func (b *batch) Reset() {
487480
b.b.Reset()
488-
b.keys, b.size = 0, 0
481+
b.size = 0
489482
}
490483

491484
// Replay replays the batch contents.

0 commit comments

Comments
 (0)