@@ -31,6 +31,7 @@ import (
31
31
"github.com/ethereum/go-ethereum/core/rawdb"
32
32
"github.com/ethereum/go-ethereum/core/types"
33
33
"github.com/ethereum/go-ethereum/event"
34
+ "github.com/ethereum/go-ethereum/log"
34
35
"github.com/ethereum/go-ethereum/rpc"
35
36
)
36
37
@@ -92,8 +93,21 @@ type EventSystem struct {
92
93
backend Backend
93
94
lightMode bool
94
95
lastHead * types.Header
95
- install chan * subscription // install filter for event notification
96
- uninstall chan * subscription // remove filter for event notification
96
+
97
+ // Subscriptions
98
+ txSub event.Subscription // Subscription for new transaction event
99
+ logsSub event.Subscription // Subscription for new log event
100
+ rmLogsSub event.Subscription // Subscription for removed log event
101
+ chainSub event.Subscription // Subscription for new chain event
102
+ pendingLogSub * event.TypeMuxSubscription // Subscription for pending log event
103
+
104
+ // Channels
105
+ install chan * subscription // install filter for event notification
106
+ uninstall chan * subscription // remove filter for event notification
107
+ txCh chan core.TxPreEvent // Channel to receive new transaction event
108
+ logsCh chan []* types.Log // Channel to receive new log event
109
+ rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
110
+ chainCh chan core.ChainEvent // Channel to receive new chain event
97
111
}
98
112
99
113
// NewEventSystem creates a new manager that listens for event on the given mux,
@@ -109,10 +123,27 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS
109
123
lightMode : lightMode ,
110
124
install : make (chan * subscription ),
111
125
uninstall : make (chan * subscription ),
126
+ txCh : make (chan core.TxPreEvent , txChanSize ),
127
+ logsCh : make (chan []* types.Log , logsChanSize ),
128
+ rmLogsCh : make (chan core.RemovedLogsEvent , rmLogsChanSize ),
129
+ chainCh : make (chan core.ChainEvent , chainEvChanSize ),
112
130
}
113
131
114
- go m .eventLoop ()
132
+ // Subscribe events
133
+ m .txSub = m .backend .SubscribeTxPreEvent (m .txCh )
134
+ m .logsSub = m .backend .SubscribeLogsEvent (m .logsCh )
135
+ m .rmLogsSub = m .backend .SubscribeRemovedLogsEvent (m .rmLogsCh )
136
+ m .chainSub = m .backend .SubscribeChainEvent (m .chainCh )
137
+ // TODO(rjl493456442): use feed to subscribe pending log event
138
+ m .pendingLogSub = m .mux .Subscribe (core.PendingLogsEvent {})
139
+
140
+ // Make sure none of the subscriptions are empty
141
+ if m .txSub == nil || m .logsSub == nil || m .rmLogsSub == nil || m .chainSub == nil ||
142
+ m .pendingLogSub .Closed () {
143
+ log .Crit ("Subscribe for event system failed" )
144
+ }
115
145
146
+ go m .eventLoop ()
116
147
return m
117
148
}
118
149
@@ -412,50 +443,35 @@ func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.
412
443
413
444
// eventLoop (un)installs filters and processes mux events.
414
445
func (es * EventSystem ) eventLoop () {
415
- var (
416
- index = make (filterIndex )
417
- sub = es .mux .Subscribe (core.PendingLogsEvent {})
418
- // Subscribe TxPreEvent form txpool
419
- txCh = make (chan core.TxPreEvent , txChanSize )
420
- txSub = es .backend .SubscribeTxPreEvent (txCh )
421
- // Subscribe RemovedLogsEvent
422
- rmLogsCh = make (chan core.RemovedLogsEvent , rmLogsChanSize )
423
- rmLogsSub = es .backend .SubscribeRemovedLogsEvent (rmLogsCh )
424
- // Subscribe []*types.Log
425
- logsCh = make (chan []* types.Log , logsChanSize )
426
- logsSub = es .backend .SubscribeLogsEvent (logsCh )
427
- // Subscribe ChainEvent
428
- chainEvCh = make (chan core.ChainEvent , chainEvChanSize )
429
- chainEvSub = es .backend .SubscribeChainEvent (chainEvCh )
430
- )
431
-
432
- // Unsubscribe all events
433
- defer sub .Unsubscribe ()
434
- defer txSub .Unsubscribe ()
435
- defer rmLogsSub .Unsubscribe ()
436
- defer logsSub .Unsubscribe ()
437
- defer chainEvSub .Unsubscribe ()
438
-
446
+ // Ensure all subscriptions get cleaned up
447
+ defer func () {
448
+ es .pendingLogSub .Unsubscribe ()
449
+ es .txSub .Unsubscribe ()
450
+ es .logsSub .Unsubscribe ()
451
+ es .rmLogsSub .Unsubscribe ()
452
+ es .chainSub .Unsubscribe ()
453
+ }()
454
+
455
+ index := make (filterIndex )
439
456
for i := UnknownSubscription ; i < LastIndexSubscription ; i ++ {
440
457
index [i ] = make (map [rpc.ID ]* subscription )
441
458
}
442
459
443
460
for {
444
461
select {
445
- case ev , active := <- sub .Chan ():
446
- if ! active { // system stopped
447
- return
448
- }
449
- es .broadcast (index , ev )
450
-
451
462
// Handle subscribed events
452
- case ev := <- txCh :
463
+ case ev := <- es . txCh :
453
464
es .broadcast (index , ev )
454
- case ev := <- rmLogsCh :
465
+ case ev := <- es . logsCh :
455
466
es .broadcast (index , ev )
456
- case ev := <- logsCh :
467
+ case ev := <- es . rmLogsCh :
457
468
es .broadcast (index , ev )
458
- case ev := <- chainEvCh :
469
+ case ev := <- es .chainCh :
470
+ es .broadcast (index , ev )
471
+ case ev , active := <- es .pendingLogSub .Chan ():
472
+ if ! active { // system stopped
473
+ return
474
+ }
459
475
es .broadcast (index , ev )
460
476
461
477
case f := <- es .install :
@@ -467,6 +483,7 @@ func (es *EventSystem) eventLoop() {
467
483
index [f.typ ][f.id ] = f
468
484
}
469
485
close (f .installed )
486
+
470
487
case f := <- es .uninstall :
471
488
if f .typ == MinedAndPendingLogsSubscription {
472
489
// the type are logs and pending logs subscriptions
@@ -478,13 +495,13 @@ func (es *EventSystem) eventLoop() {
478
495
close (f .err )
479
496
480
497
// System stopped
481
- case <- txSub .Err ():
498
+ case <- es . txSub .Err ():
482
499
return
483
- case <- rmLogsSub .Err ():
500
+ case <- es . logsSub .Err ():
484
501
return
485
- case <- logsSub .Err ():
502
+ case <- es . rmLogsSub .Err ():
486
503
return
487
- case <- chainEvSub .Err ():
504
+ case <- es . chainSub .Err ():
488
505
return
489
506
}
490
507
}
0 commit comments