Skip to content

Commit 8c3132c

Browse files
committed
cmd/geth: Refactor state migration initialization
Using the freezer to init the database was fine when we were using a local ancients database, but is very slow with the s3 freezer. This refactors to pull those records across from the old database rather than reconstructing them from blocks.
1 parent 571ab4c commit 8c3132c

File tree

1 file changed

+91
-9
lines changed

1 file changed

+91
-9
lines changed

cmd/geth/chaincmd.go

Lines changed: 91 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -895,6 +895,9 @@ func syncState(root common.Hash, srcDb state.Database, newDb ethdb.Database) <-c
895895
defer wg.Done()
896896
for r := range ch {
897897
r.data, r.err = srcDb.TrieDB().Node(r.hash)
898+
if r.err != nil {
899+
log.Warn("Error processing hash", "hash", r.hash, "err", r.err)
900+
}
898901
popCh <- r
899902
}
900903
}(&wg)
@@ -911,17 +914,20 @@ func syncState(root common.Hash, srcDb state.Database, newDb ethdb.Database) <-c
911914
}()
912915
for r := range popCh {
913916
if r.err != nil {
917+
log.Crit("trie node error", "err", r.err)
914918
errCh <- r.err
915919
return
916920
}
917921
results[r.i] = trie.SyncResult{Hash: r.hash, Data: r.data}
918922
}
919923
if _, index, err := sched.Process(results); err != nil {
924+
log.Crit("trie processing error", "err", err)
920925
errCh <- fmt.Errorf("failed to process result #%d: %v", index, err)
921926
return
922927
}
923928
batch := newDb.NewBatch()
924929
if err := sched.Commit(batch); err != nil {
930+
log.Crit("commit error", "err", err)
925931
errCh <- fmt.Errorf("failed to commit data: %v", err)
926932
return
927933
}
@@ -976,26 +982,92 @@ func migrateState(ctx *cli.Context) error {
976982
return fmt.Errorf("Usage: migrateState [ancients] [oldLeveldb] [newLeveldb] [?kafkaTopic]")
977983
}
978984
newDb, err := rawdb.NewLevelDBDatabaseWithFreezer(ctx.Args()[2], 16, 16, ctx.Args()[0], "new")
979-
if err != nil { return err }
985+
if err != nil {
986+
log.Crit("Error new opening database")
987+
return err
988+
}
980989
frozen, err := newDb.Ancients()
981-
if err != nil { return err }
990+
if err != nil {
991+
log.Crit("Error getting ancients")
992+
return err
993+
}
982994
if frozen == 0 {
983995
return fmt.Errorf("Freezer is empty")
984996
}
985997
oldDb, err := rawdb.NewLevelDBDatabase(ctx.Args()[1], 16, 16, "old")
986-
if err != nil { return err }
998+
if err != nil {
999+
log.Crit("Error old opening database")
1000+
return err
1001+
}
9871002
if len(ctx.Args()) == 4 {
9881003
key := fmt.Sprintf("cdc-log-%v-offset", ctx.Args()[3])
9891004
offset, err := oldDb.Get([]byte(key))
990-
if err != nil { return err }
1005+
if err != nil {
1006+
log.Crit("Error getting offset from database")
1007+
return err
1008+
}
9911009
if err := newDb.Put([]byte(key), offset); err != nil { return err }
9921010
log.Info("Copied offset", "key", key)
9931011
}
9941012
start := time.Now()
9951013
ancientErrCh := make(chan error, 1)
9961014
if os.Getenv("SKIP_INIT_FREEZER") != "true" {
9971015
go func() {
998-
rawdb.InitDatabaseFromFreezer(newDb)
1016+
it := oldDb.NewIterator([]byte("l"), nil)
1017+
defer it.Release()
1018+
batch := newDb.NewBatch()
1019+
for it.Next() {
1020+
if err := batch.Put(it.Key(), it.Value()); err != nil {
1021+
ancientErrCh <- err
1022+
return
1023+
}
1024+
if batch.ValueSize() > ethdb.IdealBatchSize {
1025+
if err := batch.Write(); err != nil {
1026+
ancientErrCh <- err
1027+
return
1028+
}
1029+
batch.Reset()
1030+
}
1031+
}
1032+
if err := it.Error(); err != nil {
1033+
ancientErrCh <- err
1034+
return
1035+
}
1036+
headerIt := oldDb.NewIterator([]byte("H"), nil)
1037+
defer headerIt.Release()
1038+
for headerIt.Next() {
1039+
if err := batch.Put(headerIt.Key(), headerIt.Value()); err != nil {
1040+
ancientErrCh <- err
1041+
return
1042+
}
1043+
if batch.ValueSize() > ethdb.IdealBatchSize {
1044+
if err := batch.Write(); err != nil {
1045+
ancientErrCh <- err
1046+
return
1047+
}
1048+
batch.Reset()
1049+
}
1050+
}
1051+
if err := batch.Write(); err != nil {
1052+
ancientErrCh <- err
1053+
return
1054+
}
1055+
if err := headerIt.Error(); err != nil {
1056+
ancientErrCh <- err
1057+
return
1058+
}
1059+
blockNo, err := newDb.Ancients()
1060+
if err != nil {
1061+
ancientErrCh <- err
1062+
return
1063+
}
1064+
hash := rawdb.ReadCanonicalHash(newDb, blockNo)
1065+
1066+
rawdb.WriteHeadHeaderHash(newDb, hash)
1067+
rawdb.WriteHeadFastBlockHash(newDb, hash)
1068+
1069+
1070+
// rawdb.InitDatabaseFromFreezer(newDb)
9991071
ancientErrCh <- nil
10001072
log.Info("Initialized from freezer", "elapsed", time.Since(start))
10011073
}()
@@ -1007,12 +1079,16 @@ func migrateState(ctx *cli.Context) error {
10071079
genesisHash := rawdb.ReadCanonicalHash(oldDb, 0)
10081080
block := rawdb.ReadBlock(oldDb, genesisHash, 0)
10091081

1010-
latestBlockHash := rawdb.ReadHeadFastBlockHash(oldDb) // Find the latest blockhash migrated to the new database
1082+
latestBlockHash := rawdb.ReadHeadBlockHash(oldDb) // Find the latest blockhash migrated to the new database
10111083
if latestBlockHash == (common.Hash{}) {
10121084
return fmt.Errorf("Source block hash empty")
10131085
}
10141086
latestHeaderNumber := rawdb.ReadHeaderNumber(oldDb, latestBlockHash)
1015-
latestBlock := rawdb.ReadBlock(newDb, latestBlockHash, *latestHeaderNumber)
1087+
latestBlock := rawdb.ReadBlock(oldDb, latestBlockHash, *latestHeaderNumber)
1088+
for _, err := srcDb.TrieDB().Node(latestBlock.Root()); err != nil ; {
1089+
latestBlock = rawdb.ReadBlock(oldDb, latestBlock.ParentHash(), latestBlock.NumberU64() - 1)
1090+
}
1091+
10161092

10171093
log.Info("Syncing genesis block state", "hash", block.Hash(), "root", block.Root())
10181094
genesisErrCh := syncState(block.Root(), srcDb, newDb)
@@ -1063,8 +1139,14 @@ func migrateState(ctx *cli.Context) error {
10631139
}
10641140
}
10651141

1066-
if err := <-genesisErrCh; err != nil { return err }
1067-
if err := <-latestErrCh; err != nil { return err }
1142+
if err := <-genesisErrCh; err != nil {
1143+
log.Crit("error syncing genesis")
1144+
return err
1145+
}
1146+
if err := <-latestErrCh; err != nil {
1147+
log.Crit("error syncing latest")
1148+
return err
1149+
}
10681150
rawdb.WriteHeadBlockHash(newDb, block.Hash())
10691151
rawdb.WriteHeadHeaderHash(newDb, block.Hash())
10701152
rawdb.WriteHeadFastBlockHash(newDb, block.Hash())

0 commit comments

Comments
 (0)