Skip to content

core/filtermaps: properly handle history cutoff while rendering index head (WIP) #31447

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions core/filtermaps/filtermaps.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ const (
cachedRenderSnapshots = 8 // saved map renderer data at block boundaries
)

var errHistoryCutoff = errors.New("cannot start indexing before history cutoff point")

// FilterMaps is the in-memory representation of the log index structure that is
// responsible for building and updating the index according to the canonical
// chain.
Expand Down Expand Up @@ -278,8 +280,13 @@ func (f *FilterMaps) initChainView(chainView *ChainView) *ChainView {
}

// reset un-initializes the FilterMaps structure and removes all related data from
// the database. The function returns true if everything was successfully removed.
func (f *FilterMaps) reset() bool {
// the database.
// Note that in case of leveldb database the fallback implementation of DeleteRange
// might take a long time to finish and deleting the entire database may be
// interrupted by a shutdown. Deleting the filterMapsRange entry first does
// guarantee though that the next init() will not return successfully until the
// entire database has been cleaned.
func (f *FilterMaps) reset() {
f.indexLock.Lock()
f.indexedRange = filterMapsRange{}
f.indexedView = nil
Expand All @@ -292,11 +299,16 @@ func (f *FilterMaps) reset() bool {
// deleting the range first ensures that resetDb will be called again at next
// startup and any leftover data will be removed even if it cannot finish now.
rawdb.DeleteFilterMapsRange(f.db)
return f.safeDeleteRange(rawdb.DeleteFilterMapsDb, "Resetting log index database")
f.safeDeleteRange(rawdb.DeleteFilterMapsDb, "Resetting log index database")
}

// init initializes an empty log index according to the current targetView.
func (f *FilterMaps) init() error {
// ensure that there is no remaining data in the filter maps key range
if !f.safeDeleteRange(rawdb.DeleteFilterMapsDb, "Resetting log index database") {
return errors.New("could not reset log index database")
}

f.indexLock.Lock()
defer f.indexLock.Unlock()

Expand All @@ -317,6 +329,13 @@ func (f *FilterMaps) init() error {
bestIdx, bestLen = idx, max
}
}
var initBlockNumber uint64
if bestLen > 0 {
initBlockNumber = checkpoints[bestIdx][bestLen-1].BlockNumber
}
if initBlockNumber < f.historyCutoff {
return errHistoryCutoff
}
batch := f.db.NewBatch()
for epoch := range bestLen {
cp := checkpoints[bestIdx][epoch]
Expand Down
43 changes: 31 additions & 12 deletions core/filtermaps/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,18 @@ func (f *FilterMaps) indexerLoop() {

for !f.stop {
if !f.indexedRange.initialized {
if f.targetView.headNumber == 0 {
// initialize when chain head is available
f.processSingleEvent(true)
continue
}
if err := f.init(); err != nil {
if err == errHistoryCutoff {
log.Warn("History cutoff point beyond latest checkpoint; log index disabled")
f.disabled = true
f.reset()
return
}
log.Error("Error initializing log index", "error", err)
// unexpected error; there is not a lot we can do here, maybe it
// recovers, maybe not. Calling event processing here ensures
Expand All @@ -52,10 +63,16 @@ func (f *FilterMaps) indexerLoop() {
}
}
if !f.targetHeadIndexed() {
if !f.tryIndexHead() {
// either shutdown or unexpected error; in the latter case ensure
// that proper shutdown is still possible.
f.processSingleEvent(true)
if _, err := f.tryIndexHead(); err != nil {
switch err {
case errHistoryCutoff:
log.Warn("History cutoff point beyond rendered index head; resetting log index")
f.reset() // still attempt re-initializing; maybe there are more recent checkpoints
default:
// unexpected error; ensure that we can still properly
// shutdown in case of an infinite loop.
f.processSingleEvent(true)
}
}
} else {
if f.finalBlock != f.lastFinal {
Expand Down Expand Up @@ -193,22 +210,22 @@ func (f *FilterMaps) setTarget(target targetUpdate) {

// tryIndexHead tries to render head maps according to the current targetView
// and returns true if successful.
func (f *FilterMaps) tryIndexHead() bool {
func (f *FilterMaps) tryIndexHead() (bool, error) {
headRenderer, err := f.renderMapsBefore(math.MaxUint32)
if err != nil {
log.Error("Error creating log index head renderer", "error", err)
return false
return false, err
}
if headRenderer == nil {
return true
return true, nil
}
if !f.startedHeadIndex {
f.lastLogHeadIndex = time.Now()
f.startedHeadIndexAt = f.lastLogHeadIndex
f.startedHeadIndex = true
f.ptrHeadIndex = f.indexedRange.blocks.AfterLast()
}
if _, err := headRenderer.run(func() bool {
if done, err := headRenderer.run(func() bool {
f.processEvents()
return f.stop
}, func() {
Expand All @@ -224,9 +241,11 @@ func (f *FilterMaps) tryIndexHead() bool {
f.loggedHeadIndex = true
f.lastLogHeadIndex = time.Now()
}
}); err != nil {
log.Error("Log index head rendering failed", "error", err)
return false
}); !done {
if err != nil {
log.Error("Log index head rendering failed", "error", err)
}
return false, err
}
if f.loggedHeadIndex && f.indexedRange.hasIndexedBlocks() {
log.Info("Log index head rendering finished",
Expand All @@ -235,7 +254,7 @@ func (f *FilterMaps) tryIndexHead() bool {
"elapsed", common.PrettyDuration(time.Since(f.startedHeadIndexAt)))
}
f.loggedHeadIndex, f.startedHeadIndex = false, false
return true
return true, nil
}

// tryIndexTail tries to render tail epochs until the tail target block is
Expand Down
4 changes: 2 additions & 2 deletions core/filtermaps/map_renderer.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,7 @@ func (f *FilterMaps) newLogIteratorFromMapBoundary(mapIndex uint32, startBlock,
// get block receipts
receipts := f.targetView.getReceipts(startBlock)
if receipts == nil {
return nil, fmt.Errorf("receipts not found for start block %d", startBlock)
return nil, errHistoryCutoff
}
// initialize iterator at block start
l := &logIterator{
Expand Down Expand Up @@ -748,7 +748,7 @@ func (l *logIterator) next() error {
l.blockNumber++
l.receipts = l.chainView.getReceipts(l.blockNumber)
if l.receipts == nil {
return fmt.Errorf("receipts not found for block %d", l.blockNumber)
return errHistoryCutoff
}
l.txIndex, l.logIndex, l.topicIndex, l.blockStart = 0, 0, 0, true
} else {
Expand Down