Skip to content

Commit 82b0dec

Browse files
authored
eth/filters: remove support for pending logs (#29574)
This change removes support for subscribing to pending logs. "Pending logs" were always an odd feature, because it can never be fully reliable. When support for it was added many years ago, the intention was for this to be used by wallet apps to show the 'potential future token balance' of accounts, i.e. as a way of notifying the user of incoming transfers before they were mined. In order to generate the pending logs, the node must pick a subset of all public mempool transactions, execute them in the EVM, and then dispatch the resulting logs to API consumers.
1 parent ad3d8cb commit 82b0dec

File tree

8 files changed

+44
-614
lines changed

8 files changed

+44
-614
lines changed

cmd/utils/flags.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1959,7 +1959,7 @@ func RegisterFilterAPI(stack *node.Node, backend ethapi.Backend, ethcfg *ethconf
19591959
})
19601960
stack.RegisterAPIs([]rpc.API{{
19611961
Namespace: "eth",
1962-
Service: filters.NewFilterAPI(filterSystem, false),
1962+
Service: filters.NewFilterAPI(filterSystem),
19631963
}})
19641964
return filterSystem
19651965
}

eth/filters/api.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,11 @@ import (
3434
)
3535

3636
var (
37-
errInvalidTopic = errors.New("invalid topic(s)")
38-
errFilterNotFound = errors.New("filter not found")
39-
errInvalidBlockRange = errors.New("invalid block range params")
40-
errExceedMaxTopics = errors.New("exceed max topics")
37+
errInvalidTopic = errors.New("invalid topic(s)")
38+
errFilterNotFound = errors.New("filter not found")
39+
errInvalidBlockRange = errors.New("invalid block range params")
40+
errPendingLogsUnsupported = errors.New("pending logs are not supported")
41+
errExceedMaxTopics = errors.New("exceed max topics")
4142
)
4243

4344
// The maximum number of topic criteria allowed, vm.LOG4 - vm.LOG0
@@ -70,10 +71,10 @@ type FilterAPI struct {
7071
}
7172

7273
// NewFilterAPI returns a new FilterAPI instance.
73-
func NewFilterAPI(system *FilterSystem, lightMode bool) *FilterAPI {
74+
func NewFilterAPI(system *FilterSystem) *FilterAPI {
7475
api := &FilterAPI{
7576
sys: system,
76-
events: NewEventSystem(system, lightMode),
77+
events: NewEventSystem(system),
7778
filters: make(map[rpc.ID]*filter),
7879
timeout: system.cfg.Timeout,
7980
}
@@ -456,7 +457,7 @@ func (api *FilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
456457
f.txs = nil
457458
return hashes, nil
458459
}
459-
case LogsSubscription, MinedAndPendingLogsSubscription:
460+
case LogsSubscription:
460461
logs := f.logs
461462
f.logs = nil
462463
return returnLogs(logs), nil

eth/filters/filter.go

Lines changed: 4 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -108,19 +108,9 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
108108
return f.blockLogs(ctx, header)
109109
}
110110

111-
var (
112-
beginPending = f.begin == rpc.PendingBlockNumber.Int64()
113-
endPending = f.end == rpc.PendingBlockNumber.Int64()
114-
)
115-
116-
// special case for pending logs
117-
if beginPending && !endPending {
118-
return nil, errInvalidBlockRange
119-
}
120-
121-
// Short-cut if all we care about is pending logs
122-
if beginPending && endPending {
123-
return f.pendingLogs(), nil
111+
// Disallow pending logs.
112+
if f.begin == rpc.PendingBlockNumber.Int64() || f.end == rpc.PendingBlockNumber.Int64() {
113+
return nil, errPendingLogsUnsupported
124114
}
125115

126116
resolveSpecial := func(number int64) (int64, error) {
@@ -165,16 +155,7 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
165155
case log := <-logChan:
166156
logs = append(logs, log)
167157
case err := <-errChan:
168-
if err != nil {
169-
// if an error occurs during extraction, we do return the extracted data
170-
return logs, err
171-
}
172-
// Append the pending ones
173-
if endPending {
174-
pendingLogs := f.pendingLogs()
175-
logs = append(logs, pendingLogs...)
176-
}
177-
return logs, nil
158+
return logs, err
178159
}
179160
}
180161
}
@@ -332,22 +313,6 @@ func (f *Filter) checkMatches(ctx context.Context, header *types.Header) ([]*typ
332313
return logs, nil
333314
}
334315

335-
// pendingLogs returns the logs matching the filter criteria within the pending block.
336-
func (f *Filter) pendingLogs() []*types.Log {
337-
block, receipts, _ := f.sys.backend.Pending()
338-
if block == nil || receipts == nil {
339-
return nil
340-
}
341-
if bloomFilter(block.Bloom(), f.addresses, f.topics) {
342-
var unfiltered []*types.Log
343-
for _, r := range receipts {
344-
unfiltered = append(unfiltered, r.Logs...)
345-
}
346-
return filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
347-
}
348-
return nil
349-
}
350-
351316
// filterLogs creates a slice of logs matching the given criteria.
352317
func filterLogs(logs []*types.Log, fromBlock, toBlock *big.Int, addresses []common.Address, topics [][]common.Hash) []*types.Log {
353318
var check = func(log *types.Log) bool {

eth/filters/filter_system.go

Lines changed: 9 additions & 186 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@ import (
3030
"github.com/ethereum/go-ethereum/common/lru"
3131
"github.com/ethereum/go-ethereum/core"
3232
"github.com/ethereum/go-ethereum/core/bloombits"
33-
"github.com/ethereum/go-ethereum/core/rawdb"
34-
"github.com/ethereum/go-ethereum/core/state"
3533
"github.com/ethereum/go-ethereum/core/types"
3634
"github.com/ethereum/go-ethereum/ethdb"
3735
"github.com/ethereum/go-ethereum/event"
@@ -63,7 +61,6 @@ type Backend interface {
6361
GetBody(ctx context.Context, hash common.Hash, number rpc.BlockNumber) (*types.Body, error)
6462
GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
6563
GetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error)
66-
Pending() (*types.Block, types.Receipts, *state.StateDB)
6764

6865
CurrentHeader() *types.Header
6966
ChainConfig() *params.ChainConfig
@@ -152,10 +149,6 @@ const (
152149
UnknownSubscription Type = iota
153150
// LogsSubscription queries for new or removed (chain reorg) logs
154151
LogsSubscription
155-
// PendingLogsSubscription queries for logs in pending blocks
156-
PendingLogsSubscription
157-
// MinedAndPendingLogsSubscription queries for logs in mined and pending blocks.
158-
MinedAndPendingLogsSubscription
159152
// PendingTransactionsSubscription queries for pending transactions entering
160153
// the pending state
161154
PendingTransactionsSubscription
@@ -192,10 +185,8 @@ type subscription struct {
192185
// EventSystem creates subscriptions, processes events and broadcasts them to the
193186
// subscription which match the subscription criteria.
194187
type EventSystem struct {
195-
backend Backend
196-
sys *FilterSystem
197-
lightMode bool
198-
lastHead *types.Header
188+
backend Backend
189+
sys *FilterSystem
199190

200191
// Subscriptions
201192
txsSub event.Subscription // Subscription for new transaction event
@@ -218,11 +209,10 @@ type EventSystem struct {
218209
//
219210
// The returned manager has a loop that needs to be stopped with the Stop function
220211
// or by stopping the given mux.
221-
func NewEventSystem(sys *FilterSystem, lightMode bool) *EventSystem {
212+
func NewEventSystem(sys *FilterSystem) *EventSystem {
222213
m := &EventSystem{
223214
sys: sys,
224215
backend: sys.backend,
225-
lightMode: lightMode,
226216
install: make(chan *subscription),
227217
uninstall: make(chan *subscription),
228218
txsCh: make(chan core.NewTxsEvent, txChanSize),
@@ -310,10 +300,11 @@ func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*typ
310300
to = rpc.BlockNumber(crit.ToBlock.Int64())
311301
}
312302

313-
// only interested in pending logs
314-
if from == rpc.PendingBlockNumber && to == rpc.PendingBlockNumber {
315-
return es.subscribePendingLogs(crit, logs), nil
303+
// Pending logs are not supported anymore.
304+
if from == rpc.PendingBlockNumber || to == rpc.PendingBlockNumber {
305+
return nil, errPendingLogsUnsupported
316306
}
307+
317308
// only interested in new mined logs
318309
if from == rpc.LatestBlockNumber && to == rpc.LatestBlockNumber {
319310
return es.subscribeLogs(crit, logs), nil
@@ -322,34 +313,13 @@ func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*typ
322313
if from >= 0 && to >= 0 && to >= from {
323314
return es.subscribeLogs(crit, logs), nil
324315
}
325-
// interested in mined logs from a specific block number, new logs and pending logs
326-
if from >= rpc.LatestBlockNumber && to == rpc.PendingBlockNumber {
327-
return es.subscribeMinedPendingLogs(crit, logs), nil
328-
}
329316
// interested in logs from a specific block number to new mined blocks
330317
if from >= 0 && to == rpc.LatestBlockNumber {
331318
return es.subscribeLogs(crit, logs), nil
332319
}
333320
return nil, errInvalidBlockRange
334321
}
335322

336-
// subscribeMinedPendingLogs creates a subscription that returned mined and
337-
// pending logs that match the given criteria.
338-
func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription {
339-
sub := &subscription{
340-
id: rpc.NewID(),
341-
typ: MinedAndPendingLogsSubscription,
342-
logsCrit: crit,
343-
created: time.Now(),
344-
logs: logs,
345-
txs: make(chan []*types.Transaction),
346-
headers: make(chan *types.Header),
347-
installed: make(chan struct{}),
348-
err: make(chan error),
349-
}
350-
return es.subscribe(sub)
351-
}
352-
353323
// subscribeLogs creates a subscription that will write all logs matching the
354324
// given criteria to the given logs channel.
355325
func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription {
@@ -367,23 +337,6 @@ func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*typ
367337
return es.subscribe(sub)
368338
}
369339

370-
// subscribePendingLogs creates a subscription that writes contract event logs for
371-
// transactions that enter the transaction pool.
372-
func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription {
373-
sub := &subscription{
374-
id: rpc.NewID(),
375-
typ: PendingLogsSubscription,
376-
logsCrit: crit,
377-
created: time.Now(),
378-
logs: logs,
379-
txs: make(chan []*types.Transaction),
380-
headers: make(chan *types.Header),
381-
installed: make(chan struct{}),
382-
err: make(chan error),
383-
}
384-
return es.subscribe(sub)
385-
}
386-
387340
// SubscribeNewHeads creates a subscription that writes the header of a block that is
388341
// imported in the chain.
389342
func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscription {
@@ -430,18 +383,6 @@ func (es *EventSystem) handleLogs(filters filterIndex, ev []*types.Log) {
430383
}
431384
}
432385

433-
func (es *EventSystem) handlePendingLogs(filters filterIndex, logs []*types.Log) {
434-
if len(logs) == 0 {
435-
return
436-
}
437-
for _, f := range filters[PendingLogsSubscription] {
438-
matchedLogs := filterLogs(logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics)
439-
if len(matchedLogs) > 0 {
440-
f.logs <- matchedLogs
441-
}
442-
}
443-
}
444-
445386
func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent) {
446387
for _, f := range filters[PendingTransactionsSubscription] {
447388
f.txs <- ev.Txs
@@ -452,91 +393,6 @@ func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent)
452393
for _, f := range filters[BlocksSubscription] {
453394
f.headers <- ev.Block.Header()
454395
}
455-
if es.lightMode && len(filters[LogsSubscription]) > 0 {
456-
es.lightFilterNewHead(ev.Block.Header(), func(header *types.Header, remove bool) {
457-
for _, f := range filters[LogsSubscription] {
458-
if f.logsCrit.FromBlock != nil && header.Number.Cmp(f.logsCrit.FromBlock) < 0 {
459-
continue
460-
}
461-
if f.logsCrit.ToBlock != nil && header.Number.Cmp(f.logsCrit.ToBlock) > 0 {
462-
continue
463-
}
464-
if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 {
465-
f.logs <- matchedLogs
466-
}
467-
}
468-
})
469-
}
470-
}
471-
472-
func (es *EventSystem) lightFilterNewHead(newHeader *types.Header, callBack func(*types.Header, bool)) {
473-
oldh := es.lastHead
474-
es.lastHead = newHeader
475-
if oldh == nil {
476-
return
477-
}
478-
newh := newHeader
479-
// find common ancestor, create list of rolled back and new block hashes
480-
var oldHeaders, newHeaders []*types.Header
481-
for oldh.Hash() != newh.Hash() {
482-
if oldh.Number.Uint64() >= newh.Number.Uint64() {
483-
oldHeaders = append(oldHeaders, oldh)
484-
oldh = rawdb.ReadHeader(es.backend.ChainDb(), oldh.ParentHash, oldh.Number.Uint64()-1)
485-
}
486-
if oldh.Number.Uint64() < newh.Number.Uint64() {
487-
newHeaders = append(newHeaders, newh)
488-
newh = rawdb.ReadHeader(es.backend.ChainDb(), newh.ParentHash, newh.Number.Uint64()-1)
489-
if newh == nil {
490-
// happens when CHT syncing, nothing to do
491-
newh = oldh
492-
}
493-
}
494-
}
495-
// roll back old blocks
496-
for _, h := range oldHeaders {
497-
callBack(h, true)
498-
}
499-
// check new blocks (array is in reverse order)
500-
for i := len(newHeaders) - 1; i >= 0; i-- {
501-
callBack(newHeaders[i], false)
502-
}
503-
}
504-
505-
// filter logs of a single header in light client mode
506-
func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.Address, topics [][]common.Hash, remove bool) []*types.Log {
507-
if !bloomFilter(header.Bloom, addresses, topics) {
508-
return nil
509-
}
510-
// Get the logs of the block
511-
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
512-
defer cancel()
513-
cached, err := es.sys.cachedLogElem(ctx, header.Hash(), header.Number.Uint64())
514-
if err != nil {
515-
return nil
516-
}
517-
unfiltered := append([]*types.Log{}, cached.logs...)
518-
for i, log := range unfiltered {
519-
// Don't modify in-cache elements
520-
logcopy := *log
521-
logcopy.Removed = remove
522-
// Swap copy in-place
523-
unfiltered[i] = &logcopy
524-
}
525-
logs := filterLogs(unfiltered, nil, nil, addresses, topics)
526-
// Txhash is already resolved
527-
if len(logs) > 0 && logs[0].TxHash != (common.Hash{}) {
528-
return logs
529-
}
530-
// Resolve txhash
531-
body, err := es.sys.cachedGetBody(ctx, cached, header.Hash(), header.Number.Uint64())
532-
if err != nil {
533-
return nil
534-
}
535-
for _, log := range logs {
536-
// logs are already copied, safe to modify
537-
log.TxHash = body.Transactions[log.TxIndex].Hash()
538-
}
539-
return logs
540396
}
541397

542398
// eventLoop (un)installs filters and processes mux events.
@@ -564,46 +420,13 @@ func (es *EventSystem) eventLoop() {
564420
es.handleLogs(index, ev.Logs)
565421
case ev := <-es.chainCh:
566422
es.handleChainEvent(index, ev)
567-
// If we have no pending log subscription,
568-
// we don't need to collect any pending logs.
569-
if len(index[PendingLogsSubscription]) == 0 {
570-
continue
571-
}
572-
573-
// Pull the pending logs if there is a new chain head.
574-
pendingBlock, pendingReceipts, _ := es.backend.Pending()
575-
if pendingBlock == nil || pendingReceipts == nil {
576-
continue
577-
}
578-
if pendingBlock.ParentHash() != ev.Block.Hash() {
579-
continue
580-
}
581-
var logs []*types.Log
582-
for _, receipt := range pendingReceipts {
583-
if len(receipt.Logs) > 0 {
584-
logs = append(logs, receipt.Logs...)
585-
}
586-
}
587-
es.handlePendingLogs(index, logs)
588423

589424
case f := <-es.install:
590-
if f.typ == MinedAndPendingLogsSubscription {
591-
// the type are logs and pending logs subscriptions
592-
index[LogsSubscription][f.id] = f
593-
index[PendingLogsSubscription][f.id] = f
594-
} else {
595-
index[f.typ][f.id] = f
596-
}
425+
index[f.typ][f.id] = f
597426
close(f.installed)
598427

599428
case f := <-es.uninstall:
600-
if f.typ == MinedAndPendingLogsSubscription {
601-
// the type are logs and pending logs subscriptions
602-
delete(index[LogsSubscription], f.id)
603-
delete(index[PendingLogsSubscription], f.id)
604-
} else {
605-
delete(index[f.typ], f.id)
606-
}
429+
delete(index[f.typ], f.id)
607430
close(f.err)
608431

609432
// System stopped

0 commit comments

Comments
 (0)