Skip to content

Commit 96a79b9

Browse files
committed
eth/filters, accounts, rpc: abort system if subscribe failed
1 parent cf133fe commit 96a79b9

File tree

2 files changed

+66
-48
lines changed

2 files changed

+66
-48
lines changed

eth/filters/filter_system.go

Lines changed: 62 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,21 @@ type EventSystem struct {
9191
backend Backend
9292
lightMode bool
9393
lastHead *types.Header
94-
install chan *subscription // install filter for event notification
95-
uninstall chan *subscription // remove filter for event notification
94+
95+
// Subscriptions
96+
txSub event.Subscription // Subscription for new transaction event
97+
logsSub event.Subscription // Subscription for new log event
98+
rmLogsSub event.Subscription // Subscription for removed log event
99+
chainSub event.Subscription // Subscription for new chain event
100+
pendingLogSub *event.TypeMuxSubscription // Subscription for pending log event
101+
102+
// Channels
103+
install chan *subscription // install filter for event notification
104+
uninstall chan *subscription // remove filter for event notification
105+
txCh chan core.TxPreEvent // Channel to receive new transaction event
106+
logsCh chan []*types.Log // Channel to receive new log event
107+
rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
108+
chainCh chan core.ChainEvent // Channel to receive new chain event
96109
}
97110

98111
// NewEventSystem creates a new manager that listens for event on the given mux,
@@ -108,10 +121,36 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS
108121
lightMode: lightMode,
109122
install: make(chan *subscription),
110123
uninstall: make(chan *subscription),
124+
txCh: make(chan core.TxPreEvent, txChanSize),
125+
logsCh: make(chan []*types.Log, logsChanSize),
126+
rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize),
127+
chainCh: make(chan core.ChainEvent, chainEvChanSize),
111128
}
112129

113-
go m.eventLoop()
130+
// Subscribe events
131+
m.txSub = m.backend.SubscribeTxPreEvent(m.txCh)
132+
if m.txSub == nil {
133+
return nil
134+
}
135+
m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh)
136+
if m.logsSub == nil {
137+
return nil
138+
}
139+
m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh)
140+
if m.rmLogsSub == nil {
141+
return nil
142+
}
143+
m.chainSub = m.backend.SubscribeChainEvent(m.chainCh)
144+
if m.chainSub == nil {
145+
return nil
146+
}
147+
// TODO(rjl493456442): use feed to subscribe pending log event
148+
m.pendingLogSub = m.mux.Subscribe(core.PendingLogsEvent{})
149+
if m.pendingLogSub.Closed() {
150+
return nil
151+
}
114152

153+
go m.eventLoop()
115154
return m
116155
}
117156

@@ -411,40 +450,15 @@ func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.
411450

412451
// eventLoop (un)installs filters and processes mux events.
413452
func (es *EventSystem) eventLoop() {
414-
var (
415-
index = make(filterIndex)
416-
sub = es.mux.Subscribe(core.PendingLogsEvent{})
417-
// Subscribe TxPreEvent form txpool
418-
txCh = make(chan core.TxPreEvent, txChanSize)
419-
txSub = es.backend.SubscribeTxPreEvent(txCh)
420-
// Subscribe RemovedLogsEvent
421-
rmLogsCh = make(chan core.RemovedLogsEvent, rmLogsChanSize)
422-
rmLogsSub = es.backend.SubscribeRemovedLogsEvent(rmLogsCh)
423-
// Subscribe []*types.Log
424-
logsCh = make(chan []*types.Log, logsChanSize)
425-
logsSub = es.backend.SubscribeLogsEvent(logsCh)
426-
// Subscribe ChainEvent
427-
chainEvCh = make(chan core.ChainEvent, chainEvChanSize)
428-
chainEvSub = es.backend.SubscribeChainEvent(chainEvCh)
429-
)
453+
var index = make(filterIndex)
430454

431455
defer func() {
432456
// Unsubscribe all events
433-
if sub != nil {
434-
sub.Unsubscribe()
435-
}
436-
if txSub != nil {
437-
txSub.Unsubscribe()
438-
}
439-
if rmLogsSub != nil {
440-
rmLogsSub.Unsubscribe()
441-
}
442-
if logsSub != nil {
443-
logsSub.Unsubscribe()
444-
}
445-
if chainEvSub != nil {
446-
chainEvSub.Unsubscribe()
447-
}
457+
es.pendingLogSub.Unsubscribe()
458+
es.txSub.Unsubscribe()
459+
es.logsSub.Unsubscribe()
460+
es.rmLogsSub.Unsubscribe()
461+
es.chainSub.Unsubscribe()
448462
}()
449463

450464
for i := UnknownSubscription; i < LastIndexSubscription; i++ {
@@ -453,20 +467,19 @@ func (es *EventSystem) eventLoop() {
453467

454468
for {
455469
select {
456-
case ev, active := <-sub.Chan():
457-
if !active { // system stopped
458-
return
459-
}
460-
es.broadcast(index, ev)
461-
462470
// Handle subscribed events
463-
case ev := <-txCh:
471+
case ev := <-es.txCh:
464472
es.broadcast(index, ev)
465-
case ev := <-rmLogsCh:
473+
case ev := <-es.logsCh:
466474
es.broadcast(index, ev)
467-
case ev := <-logsCh:
475+
case ev := <-es.rmLogsCh:
468476
es.broadcast(index, ev)
469-
case ev := <-chainEvCh:
477+
case ev := <-es.chainCh:
478+
es.broadcast(index, ev)
479+
case ev, active := <-es.pendingLogSub.Chan():
480+
if !active { // system stopped
481+
return
482+
}
470483
es.broadcast(index, ev)
471484

472485
case f := <-es.install:
@@ -478,6 +491,7 @@ func (es *EventSystem) eventLoop() {
478491
index[f.typ][f.id] = f
479492
}
480493
close(f.installed)
494+
481495
case f := <-es.uninstall:
482496
if f.typ == MinedAndPendingLogsSubscription {
483497
// the type are logs and pending logs subscriptions
@@ -489,13 +503,13 @@ func (es *EventSystem) eventLoop() {
489503
close(f.err)
490504

491505
// System stopped
492-
case <-txSub.Err():
506+
case <-es.txSub.Err():
493507
return
494-
case <-rmLogsSub.Err():
508+
case <-es.logsSub.Err():
495509
return
496-
case <-logsSub.Err():
510+
case <-es.rmLogsSub.Err():
497511
return
498-
case <-chainEvSub.Err():
512+
case <-es.chainSub.Err():
499513
return
500514
}
501515
}

event/event.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,10 @@ func (s *TypeMuxSubscription) Unsubscribe() {
180180
s.closewait()
181181
}
182182

183+
func (s *TypeMuxSubscription) Closed() bool {
184+
return s.closed
185+
}
186+
183187
func (s *TypeMuxSubscription) closewait() {
184188
s.closeMu.Lock()
185189
defer s.closeMu.Unlock()

0 commit comments

Comments
 (0)