Skip to content

Commit 40b0a0b

Browse files
committed
eth/filters: implement log filter using new log index
1 parent 13944d3 commit 40b0a0b

File tree

14 files changed

+611
-196
lines changed

14 files changed

+611
-196
lines changed

cmd/geth/chaincmd.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,9 @@ if one is set. Otherwise it prints the genesis from the datadir.`,
100100
utils.VMTraceFlag,
101101
utils.VMTraceJsonConfigFlag,
102102
utils.TransactionHistoryFlag,
103+
utils.LogHistoryFlag,
104+
utils.LogNoHistoryFlag,
105+
utils.LogExportCheckpointsFlag,
103106
utils.StateHistoryFlag,
104107
}, utils.DatabaseFlags),
105108
Description: `

cmd/geth/main.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,9 @@ var (
8686
utils.SnapshotFlag,
8787
utils.TxLookupLimitFlag, // deprecated
8888
utils.TransactionHistoryFlag,
89+
utils.LogHistoryFlag,
90+
utils.LogNoHistoryFlag,
91+
utils.LogExportCheckpointsFlag,
8992
utils.StateHistoryFlag,
9093
utils.LightServeFlag, // deprecated
9194
utils.LightIngressFlag, // deprecated

cmd/utils/flags.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,23 @@ var (
272272
Value: ethconfig.Defaults.TransactionHistory,
273273
Category: flags.StateCategory,
274274
}
275+
LogHistoryFlag = &cli.Uint64Flag{
276+
Name: "history.logs",
277+
Usage: "Number of recent blocks to maintain log search index for (default = about one year, 0 = entire chain)",
278+
Value: ethconfig.Defaults.LogHistory,
279+
Category: flags.StateCategory,
280+
}
281+
LogNoHistoryFlag = &cli.BoolFlag{
282+
Name: "history.logs.disable",
283+
Usage: "Do not maintain log search index",
284+
Category: flags.StateCategory,
285+
}
286+
LogExportCheckpointsFlag = &cli.StringFlag{
287+
Name: "history.logs.export",
288+
Usage: "Export checkpoints to file in go source file format",
289+
Category: flags.StateCategory,
290+
Value: "",
291+
}
275292
// Beacon client light sync settings
276293
BeaconApiFlag = &cli.StringSliceFlag{
277294
Name: "beacon.api",
@@ -1662,6 +1679,15 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
16621679
cfg.StateScheme = rawdb.HashScheme
16631680
log.Warn("Forcing hash state-scheme for archive mode")
16641681
}
1682+
if ctx.IsSet(LogHistoryFlag.Name) {
1683+
cfg.LogHistory = ctx.Uint64(LogHistoryFlag.Name)
1684+
}
1685+
if ctx.IsSet(LogNoHistoryFlag.Name) {
1686+
cfg.LogNoHistory = true
1687+
}
1688+
if ctx.IsSet(LogExportCheckpointsFlag.Name) {
1689+
cfg.LogExportCheckpoints = ctx.String(LogExportCheckpointsFlag.Name)
1690+
}
16651691
if ctx.IsSet(CacheFlag.Name) || ctx.IsSet(CacheTrieFlag.Name) {
16661692
cfg.TrieCleanCache = ctx.Int(CacheFlag.Name) * ctx.Int(CacheTrieFlag.Name) / 100
16671693
}

core/blockchain.go

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -231,14 +231,15 @@ type BlockChain struct {
231231
statedb *state.CachingDB // State database to reuse between imports (contains state cache)
232232
txIndexer *txIndexer // Transaction indexer, might be nil if not enabled
233233

234-
hc *HeaderChain
235-
rmLogsFeed event.Feed
236-
chainFeed event.Feed
237-
chainHeadFeed event.Feed
238-
logsFeed event.Feed
239-
blockProcFeed event.Feed
240-
scope event.SubscriptionScope
241-
genesisBlock *types.Block
234+
hc *HeaderChain
235+
rmLogsFeed event.Feed
236+
chainFeed event.Feed
237+
chainHeadFeed event.Feed
238+
logsFeed event.Feed
239+
blockProcFeed event.Feed
240+
blockProcCounter int32
241+
scope event.SubscriptionScope
242+
genesisBlock *types.Block
242243

243244
// This mutex synchronizes chain write operations.
244245
// Readers don't need to take it, they can just read the database.
@@ -1570,8 +1571,6 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
15701571
if len(chain) == 0 {
15711572
return 0, nil
15721573
}
1573-
bc.blockProcFeed.Send(true)
1574-
defer bc.blockProcFeed.Send(false)
15751574

15761575
// Do a sanity check that the provided chain is actually ordered and linked.
15771576
for i := 1; i < len(chain); i++ {
@@ -1611,6 +1610,16 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool, makeWitness
16111610
if bc.insertStopped() {
16121611
return nil, 0, nil
16131612
}
1613+
1614+
if atomic.AddInt32(&bc.blockProcCounter, 1) == 1 {
1615+
bc.blockProcFeed.Send(true)
1616+
}
1617+
defer func() {
1618+
if atomic.AddInt32(&bc.blockProcCounter, -1) == 0 {
1619+
bc.blockProcFeed.Send(false)
1620+
}
1621+
}()
1622+
16141623
// Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss)
16151624
SenderCacher().RecoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number(), chain[0].Time()), chain)
16161625

eth/api_backend.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/ethereum/go-ethereum/consensus/misc/eip4844"
3030
"github.com/ethereum/go-ethereum/core"
3131
"github.com/ethereum/go-ethereum/core/bloombits"
32+
"github.com/ethereum/go-ethereum/core/filtermaps"
3233
"github.com/ethereum/go-ethereum/core/rawdb"
3334
"github.com/ethereum/go-ethereum/core/state"
3435
"github.com/ethereum/go-ethereum/core/txpool"
@@ -407,6 +408,10 @@ func (b *EthAPIBackend) ServiceFilter(ctx context.Context, session *bloombits.Ma
407408
}
408409
}
409410

411+
func (b *EthAPIBackend) NewMatcherBackend() filtermaps.MatcherBackend {
412+
return b.eth.filterMaps.NewMatcherBackend()
413+
}
414+
410415
func (b *EthAPIBackend) Engine() consensus.Engine {
411416
return b.eth.engine
412417
}

eth/backend.go

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/ethereum/go-ethereum/consensus"
3232
"github.com/ethereum/go-ethereum/core"
3333
"github.com/ethereum/go-ethereum/core/bloombits"
34+
"github.com/ethereum/go-ethereum/core/filtermaps"
3435
"github.com/ethereum/go-ethereum/core/rawdb"
3536
"github.com/ethereum/go-ethereum/core/state/pruner"
3637
"github.com/ethereum/go-ethereum/core/txpool"
@@ -88,6 +89,9 @@ type Ethereum struct {
8889
bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports
8990
closeBloomHandler chan struct{}
9091

92+
filterMaps *filtermaps.FilterMaps
93+
closeFilterMaps chan chan struct{}
94+
9195
APIBackend *EthAPIBackend
9296

9397
miner *miner.Miner
@@ -225,6 +229,8 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
225229
return nil, err
226230
}
227231
eth.bloomIndexer.Start(eth.blockchain)
232+
eth.filterMaps = filtermaps.NewFilterMaps(chainDb, eth.newChainView(eth.blockchain.CurrentBlock()), filtermaps.DefaultParams, config.LogHistory, 1000, config.LogNoHistory, config.LogExportCheckpoints)
233+
eth.closeFilterMaps = make(chan chan struct{})
228234

229235
if config.BlobPool.Datadir != "" {
230236
config.BlobPool.Datadir = stack.ResolvePath(config.BlobPool.Datadir)
@@ -378,9 +384,113 @@ func (s *Ethereum) Start() error {
378384

379385
// Start the networking layer
380386
s.handler.Start(s.p2pServer.MaxPeers)
387+
388+
// start log indexer
389+
s.filterMaps.Start()
390+
go s.updateFilterMapsHeads()
381391
return nil
382392
}
383393

394+
func (s *Ethereum) newChainView(head *types.Header) *filtermaps.StoredChainView {
395+
if head == nil {
396+
return nil
397+
}
398+
return filtermaps.NewStoredChainView(s.blockchain, head.Number.Uint64(), head.Hash())
399+
}
400+
401+
func (s *Ethereum) updateFilterMapsHeads() {
402+
headEventCh := make(chan core.ChainEvent, 10)
403+
blockProcCh := make(chan bool, 10)
404+
sub := s.blockchain.SubscribeChainEvent(headEventCh)
405+
sub2 := s.blockchain.SubscribeBlockProcessingEvent(blockProcCh)
406+
defer func() {
407+
sub.Unsubscribe()
408+
sub2.Unsubscribe()
409+
for {
410+
select {
411+
case <-headEventCh:
412+
case <-blockProcCh:
413+
default:
414+
return
415+
}
416+
}
417+
}()
418+
419+
head := s.blockchain.CurrentBlock()
420+
targetView := s.newChainView(head) // nil if already sent to channel
421+
var (
422+
blockProc, lastBlockProc bool
423+
finalBlock, lastFinal uint64
424+
)
425+
426+
setHead := func(newHead *types.Header) {
427+
if newHead == nil {
428+
return
429+
}
430+
if head == nil || newHead.Hash() != head.Hash() {
431+
head = newHead
432+
targetView = s.newChainView(head)
433+
}
434+
if fb := s.blockchain.CurrentFinalBlock(); fb != nil {
435+
finalBlock = fb.Number.Uint64()
436+
}
437+
}
438+
439+
for {
440+
if blockProc != lastBlockProc {
441+
select {
442+
case s.filterMaps.BlockProcessingCh <- blockProc:
443+
lastBlockProc = blockProc
444+
case ev := <-headEventCh:
445+
setHead(ev.Header)
446+
case blockProc = <-blockProcCh:
447+
case <-time.After(time.Second * 10):
448+
setHead(s.blockchain.CurrentBlock())
449+
case ch := <-s.closeFilterMaps:
450+
close(ch)
451+
return
452+
}
453+
} else if targetView != nil {
454+
select {
455+
case s.filterMaps.TargetViewCh <- targetView:
456+
targetView = nil
457+
case ev := <-headEventCh:
458+
setHead(ev.Header)
459+
case blockProc = <-blockProcCh:
460+
case <-time.After(time.Second * 10):
461+
setHead(s.blockchain.CurrentBlock())
462+
case ch := <-s.closeFilterMaps:
463+
close(ch)
464+
return
465+
}
466+
} else if finalBlock != lastFinal {
467+
select {
468+
case s.filterMaps.FinalBlockCh <- finalBlock:
469+
lastFinal = finalBlock
470+
case ev := <-headEventCh:
471+
setHead(ev.Header)
472+
case blockProc = <-blockProcCh:
473+
case <-time.After(time.Second * 10):
474+
setHead(s.blockchain.CurrentBlock())
475+
case ch := <-s.closeFilterMaps:
476+
close(ch)
477+
return
478+
}
479+
} else {
480+
select {
481+
case ev := <-headEventCh:
482+
setHead(ev.Header)
483+
case <-time.After(time.Second * 10):
484+
setHead(s.blockchain.CurrentBlock())
485+
case blockProc = <-blockProcCh:
486+
case ch := <-s.closeFilterMaps:
487+
close(ch)
488+
return
489+
}
490+
}
491+
}
492+
}
493+
384494
func (s *Ethereum) setupDiscovery() error {
385495
eth.StartENRUpdater(s.blockchain, s.p2pServer.LocalNode())
386496

@@ -423,6 +533,10 @@ func (s *Ethereum) Stop() error {
423533
// Then stop everything else.
424534
s.bloomIndexer.Close()
425535
close(s.closeBloomHandler)
536+
ch := make(chan struct{})
537+
s.closeFilterMaps <- ch
538+
<-ch
539+
s.filterMaps.Stop()
426540
s.txPool.Close()
427541
s.blockchain.Stop()
428542
s.engine.Close()

eth/ethconfig/config.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ var Defaults = Config{
5252
NetworkId: 0, // enable auto configuration of networkID == chainID
5353
TxLookupLimit: 2350000,
5454
TransactionHistory: 2350000,
55+
LogHistory: 2350000,
5556
StateHistory: params.FullImmutabilityThreshold,
5657
DatabaseCache: 512,
5758
TrieCleanCache: 154,
@@ -93,8 +94,11 @@ type Config struct {
9394
// Deprecated: use 'TransactionHistory' instead.
9495
TxLookupLimit uint64 `toml:",omitempty"` // The maximum number of blocks from head whose tx indices are reserved.
9596

96-
TransactionHistory uint64 `toml:",omitempty"` // The maximum number of blocks from head whose tx indices are reserved.
97-
StateHistory uint64 `toml:",omitempty"` // The maximum number of blocks from head whose state histories are reserved.
97+
TransactionHistory uint64 `toml:",omitempty"` // The maximum number of blocks from head whose tx indices are reserved.
98+
LogHistory uint64 `toml:",omitempty"` // The maximum number of blocks from head where a log search index is maintained.
99+
LogNoHistory bool `toml:",omitempty"` // No log search index is maintained.
100+
LogExportCheckpoints string // export log index checkpoints to file
101+
StateHistory uint64 `toml:",omitempty"` // The maximum number of blocks from head whose state histories are reserved.
98102

99103
// State scheme represents the scheme used to store ethereum states and trie
100104
// nodes on top. It can be 'hash', 'path', or none which means use the scheme

0 commit comments

Comments
 (0)