Skip to content

Commit a1d8244

Browse files
committed
eth/filters: implement log filter using new log index
1 parent f0a253a commit a1d8244

File tree

14 files changed

+612
-196
lines changed

14 files changed

+612
-196
lines changed

cmd/geth/chaincmd.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,9 @@ if one is set. Otherwise it prints the genesis from the datadir.`,
100100
utils.VMTraceFlag,
101101
utils.VMTraceJsonConfigFlag,
102102
utils.TransactionHistoryFlag,
103+
utils.LogHistoryFlag,
104+
utils.LogNoHistoryFlag,
105+
utils.LogExportCheckpointsFlag,
103106
utils.StateHistoryFlag,
104107
}, utils.DatabaseFlags),
105108
Description: `

cmd/geth/main.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,9 @@ var (
8686
utils.SnapshotFlag,
8787
utils.TxLookupLimitFlag, // deprecated
8888
utils.TransactionHistoryFlag,
89+
utils.LogHistoryFlag,
90+
utils.LogNoHistoryFlag,
91+
utils.LogExportCheckpointsFlag,
8992
utils.StateHistoryFlag,
9093
utils.LightServeFlag, // deprecated
9194
utils.LightIngressFlag, // deprecated

cmd/utils/flags.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,23 @@ var (
272272
Value: ethconfig.Defaults.TransactionHistory,
273273
Category: flags.StateCategory,
274274
}
275+
LogHistoryFlag = &cli.Uint64Flag{
276+
Name: "history.logs",
277+
Usage: "Number of recent blocks to maintain log search index for (default = about one year, 0 = entire chain)",
278+
Value: ethconfig.Defaults.LogHistory,
279+
Category: flags.StateCategory,
280+
}
281+
LogNoHistoryFlag = &cli.BoolFlag{
282+
Name: "history.logs.disable",
283+
Usage: "Do not maintain log search index",
284+
Category: flags.StateCategory,
285+
}
286+
LogExportCheckpointsFlag = &cli.StringFlag{
287+
Name: "history.logs.export",
288+
Usage: "Export checkpoints to file in go source file format",
289+
Category: flags.StateCategory,
290+
Value: "",
291+
}
275292
// Beacon client light sync settings
276293
BeaconApiFlag = &cli.StringSliceFlag{
277294
Name: "beacon.api",
@@ -1662,6 +1679,15 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
16621679
cfg.StateScheme = rawdb.HashScheme
16631680
log.Warn("Forcing hash state-scheme for archive mode")
16641681
}
1682+
if ctx.IsSet(LogHistoryFlag.Name) {
1683+
cfg.LogHistory = ctx.Uint64(LogHistoryFlag.Name)
1684+
}
1685+
if ctx.IsSet(LogNoHistoryFlag.Name) {
1686+
cfg.LogNoHistory = true
1687+
}
1688+
if ctx.IsSet(LogExportCheckpointsFlag.Name) {
1689+
cfg.LogExportCheckpoints = ctx.String(LogExportCheckpointsFlag.Name)
1690+
}
16651691
if ctx.IsSet(CacheFlag.Name) || ctx.IsSet(CacheTrieFlag.Name) {
16661692
cfg.TrieCleanCache = ctx.Int(CacheFlag.Name) * ctx.Int(CacheTrieFlag.Name) / 100
16671693
}

core/blockchain.go

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -225,14 +225,15 @@ type BlockChain struct {
225225
statedb *state.CachingDB // State database to reuse between imports (contains state cache)
226226
txIndexer *txIndexer // Transaction indexer, might be nil if not enabled
227227

228-
hc *HeaderChain
229-
rmLogsFeed event.Feed
230-
chainFeed event.Feed
231-
chainHeadFeed event.Feed
232-
logsFeed event.Feed
233-
blockProcFeed event.Feed
234-
scope event.SubscriptionScope
235-
genesisBlock *types.Block
228+
hc *HeaderChain
229+
rmLogsFeed event.Feed
230+
chainFeed event.Feed
231+
chainHeadFeed event.Feed
232+
logsFeed event.Feed
233+
blockProcFeed event.Feed
234+
blockProcCounter int32
235+
scope event.SubscriptionScope
236+
genesisBlock *types.Block
236237

237238
// This mutex synchronizes chain write operations.
238239
// Readers don't need to take it, they can just read the database.
@@ -1564,8 +1565,6 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
15641565
if len(chain) == 0 {
15651566
return 0, nil
15661567
}
1567-
bc.blockProcFeed.Send(true)
1568-
defer bc.blockProcFeed.Send(false)
15691568

15701569
// Do a sanity check that the provided chain is actually ordered and linked.
15711570
for i := 1; i < len(chain); i++ {
@@ -1605,6 +1604,16 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool, makeWitness
16051604
if bc.insertStopped() {
16061605
return nil, 0, nil
16071606
}
1607+
1608+
if atomic.AddInt32(&bc.blockProcCounter, 1) == 1 {
1609+
bc.blockProcFeed.Send(true)
1610+
}
1611+
defer func() {
1612+
if atomic.AddInt32(&bc.blockProcCounter, -1) == 0 {
1613+
bc.blockProcFeed.Send(false)
1614+
}
1615+
}()
1616+
16081617
// Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss)
16091618
SenderCacher().RecoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number(), chain[0].Time()), chain)
16101619

eth/api_backend.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/ethereum/go-ethereum/consensus/misc/eip4844"
3030
"github.com/ethereum/go-ethereum/core"
3131
"github.com/ethereum/go-ethereum/core/bloombits"
32+
"github.com/ethereum/go-ethereum/core/filtermaps"
3233
"github.com/ethereum/go-ethereum/core/rawdb"
3334
"github.com/ethereum/go-ethereum/core/state"
3435
"github.com/ethereum/go-ethereum/core/txpool"
@@ -404,6 +405,10 @@ func (b *EthAPIBackend) ServiceFilter(ctx context.Context, session *bloombits.Ma
404405
}
405406
}
406407

408+
func (b *EthAPIBackend) NewMatcherBackend() filtermaps.MatcherBackend {
409+
return b.eth.filterMaps.NewMatcherBackend()
410+
}
411+
407412
func (b *EthAPIBackend) Engine() consensus.Engine {
408413
return b.eth.engine
409414
}

eth/backend.go

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,15 @@ import (
2323
"math/big"
2424
"runtime"
2525
"sync"
26+
"time"
2627

2728
"github.com/ethereum/go-ethereum/accounts"
2829
"github.com/ethereum/go-ethereum/common"
2930
"github.com/ethereum/go-ethereum/common/hexutil"
3031
"github.com/ethereum/go-ethereum/consensus"
3132
"github.com/ethereum/go-ethereum/core"
3233
"github.com/ethereum/go-ethereum/core/bloombits"
34+
"github.com/ethereum/go-ethereum/core/filtermaps"
3335
"github.com/ethereum/go-ethereum/core/rawdb"
3436
"github.com/ethereum/go-ethereum/core/state/pruner"
3537
"github.com/ethereum/go-ethereum/core/txpool"
@@ -85,6 +87,9 @@ type Ethereum struct {
8587
bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports
8688
closeBloomHandler chan struct{}
8789

90+
filterMaps *filtermaps.FilterMaps
91+
closeFilterMaps chan chan struct{}
92+
8893
APIBackend *EthAPIBackend
8994

9095
miner *miner.Miner
@@ -222,6 +227,8 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
222227
return nil, err
223228
}
224229
eth.bloomIndexer.Start(eth.blockchain)
230+
eth.filterMaps = filtermaps.NewFilterMaps(chainDb, eth.newChainView(eth.blockchain.CurrentBlock()), filtermaps.DefaultParams, config.LogHistory, 1000, config.LogNoHistory, config.LogExportCheckpoints)
231+
eth.closeFilterMaps = make(chan chan struct{})
225232

226233
if config.BlobPool.Datadir != "" {
227234
config.BlobPool.Datadir = stack.ResolvePath(config.BlobPool.Datadir)
@@ -364,9 +371,113 @@ func (s *Ethereum) Start() error {
364371

365372
// Start the networking layer
366373
s.handler.Start(s.p2pServer.MaxPeers)
374+
375+
// start log indexer
376+
s.filterMaps.Start()
377+
go s.updateFilterMapsHeads()
367378
return nil
368379
}
369380

381+
func (s *Ethereum) newChainView(head *types.Header) *filtermaps.StoredChainView {
382+
if head == nil {
383+
return nil
384+
}
385+
return filtermaps.NewStoredChainView(s.blockchain, head.Number.Uint64(), head.Hash())
386+
}
387+
388+
func (s *Ethereum) updateFilterMapsHeads() {
389+
headEventCh := make(chan core.ChainEvent, 10)
390+
blockProcCh := make(chan bool, 10)
391+
sub := s.blockchain.SubscribeChainEvent(headEventCh)
392+
sub2 := s.blockchain.SubscribeBlockProcessingEvent(blockProcCh)
393+
defer func() {
394+
sub.Unsubscribe()
395+
sub2.Unsubscribe()
396+
for {
397+
select {
398+
case <-headEventCh:
399+
case <-blockProcCh:
400+
default:
401+
return
402+
}
403+
}
404+
}()
405+
406+
head := s.blockchain.CurrentBlock()
407+
targetView := s.newChainView(head) // nil if already sent to channel
408+
var (
409+
blockProc, lastBlockProc bool
410+
finalBlock, lastFinal uint64
411+
)
412+
413+
setHead := func(newHead *types.Header) {
414+
if newHead == nil {
415+
return
416+
}
417+
if head == nil || newHead.Hash() != head.Hash() {
418+
head = newHead
419+
targetView = s.newChainView(head)
420+
}
421+
if fb := s.blockchain.CurrentFinalBlock(); fb != nil {
422+
finalBlock = fb.Number.Uint64()
423+
}
424+
}
425+
426+
for {
427+
if blockProc != lastBlockProc {
428+
select {
429+
case s.filterMaps.BlockProcessingCh <- blockProc:
430+
lastBlockProc = blockProc
431+
case ev := <-headEventCh:
432+
setHead(ev.Header)
433+
case blockProc = <-blockProcCh:
434+
case <-time.After(time.Second * 10):
435+
setHead(s.blockchain.CurrentBlock())
436+
case ch := <-s.closeFilterMaps:
437+
close(ch)
438+
return
439+
}
440+
} else if targetView != nil {
441+
select {
442+
case s.filterMaps.TargetViewCh <- targetView:
443+
targetView = nil
444+
case ev := <-headEventCh:
445+
setHead(ev.Header)
446+
case blockProc = <-blockProcCh:
447+
case <-time.After(time.Second * 10):
448+
setHead(s.blockchain.CurrentBlock())
449+
case ch := <-s.closeFilterMaps:
450+
close(ch)
451+
return
452+
}
453+
} else if finalBlock != lastFinal {
454+
select {
455+
case s.filterMaps.FinalBlockCh <- finalBlock:
456+
lastFinal = finalBlock
457+
case ev := <-headEventCh:
458+
setHead(ev.Header)
459+
case blockProc = <-blockProcCh:
460+
case <-time.After(time.Second * 10):
461+
setHead(s.blockchain.CurrentBlock())
462+
case ch := <-s.closeFilterMaps:
463+
close(ch)
464+
return
465+
}
466+
} else {
467+
select {
468+
case ev := <-headEventCh:
469+
setHead(ev.Header)
470+
case <-time.After(time.Second * 10):
471+
setHead(s.blockchain.CurrentBlock())
472+
case blockProc = <-blockProcCh:
473+
case ch := <-s.closeFilterMaps:
474+
close(ch)
475+
return
476+
}
477+
}
478+
}
479+
}
480+
370481
func (s *Ethereum) setupDiscovery() error {
371482
eth.StartENRUpdater(s.blockchain, s.p2pServer.LocalNode())
372483

@@ -409,6 +520,10 @@ func (s *Ethereum) Stop() error {
409520
// Then stop everything else.
410521
s.bloomIndexer.Close()
411522
close(s.closeBloomHandler)
523+
ch := make(chan struct{})
524+
s.closeFilterMaps <- ch
525+
<-ch
526+
s.filterMaps.Stop()
412527
s.txPool.Close()
413528
s.blockchain.Stop()
414529
s.engine.Close()

eth/ethconfig/config.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ var Defaults = Config{
5252
NetworkId: 0, // enable auto configuration of networkID == chainID
5353
TxLookupLimit: 2350000,
5454
TransactionHistory: 2350000,
55+
LogHistory: 2350000,
5556
StateHistory: params.FullImmutabilityThreshold,
5657
DatabaseCache: 512,
5758
TrieCleanCache: 154,
@@ -93,8 +94,11 @@ type Config struct {
9394
// Deprecated: use 'TransactionHistory' instead.
9495
TxLookupLimit uint64 `toml:",omitempty"` // The maximum number of blocks from head whose tx indices are reserved.
9596

96-
TransactionHistory uint64 `toml:",omitempty"` // The maximum number of blocks from head whose tx indices are reserved.
97-
StateHistory uint64 `toml:",omitempty"` // The maximum number of blocks from head whose state histories are reserved.
97+
TransactionHistory uint64 `toml:",omitempty"` // The maximum number of blocks from head whose tx indices are reserved.
98+
LogHistory uint64 `toml:",omitempty"` // The maximum number of blocks from head where a log search index is maintained.
99+
LogNoHistory bool `toml:",omitempty"` // No log search index is maintained.
100+
LogExportCheckpoints string // export log index checkpoints to file
101+
StateHistory uint64 `toml:",omitempty"` // The maximum number of blocks from head whose state histories are reserved.
98102

99103
// State scheme represents the scheme used to store ethereum states and trie
100104
// nodes on top. It can be 'hash', 'path', or none which means use the scheme

0 commit comments

Comments
 (0)