Skip to content

Commit 02244c2

Browse files
authored
Improve reconnect and resubscribe stability (#108)
Several problems were found when testing library under unstable conditions – with frequent reconnections. Allows to avoid already subscribed subscription error, and bad request error (sending subscribe frame before connect frame). Also protects from running several simultaneous timers for reconnect/resubscribe.
1 parent 42be075 commit 02244c2

File tree

4 files changed

+40
-2
lines changed

4 files changed

+40
-2
lines changed

client.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -601,6 +601,13 @@ func (c *Client) moveToConnecting(code uint32, reason string) {
601601

602602
func (c *Client) scheduleReconnectLocked() {
603603
c.reconnectAttempts++
604+
if c.reconnectTimer != nil {
605+
if c.logLevelEnabled(LogLevelDebug) {
606+
c.log(LogLevelDebug, "stopping previous reconnect timer", nil)
607+
}
608+
c.reconnectTimer.Stop()
609+
c.reconnectTimer = nil
610+
}
604611
reconnectDelay := c.getReconnectDelay()
605612
if c.logLevelEnabled(LogLevelDebug) {
606613
c.log(LogLevelDebug, "reconnect with delay", map[string]string{

examples/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ require (
1818
github.com/planetscale/vtprotobuf v0.6.0 // indirect
1919
github.com/segmentio/asm v1.2.0 // indirect
2020
github.com/segmentio/encoding v0.4.0 // indirect
21+
github.com/shadowspore/fossil-delta v0.0.0-20241213113458-1d797d70cbe3 // indirect
2122
github.com/valyala/bytebufferpool v1.0.0 // indirect
2223
golang.org/x/sys v0.11.0 // indirect
2324
google.golang.org/protobuf v1.36.6 // indirect

examples/go.sum

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@ github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys=
2222
github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs=
2323
github.com/segmentio/encoding v0.4.0 h1:MEBYvRqiUB2nfR2criEXWqwdY6HJOUrCn5hboVOVmy8=
2424
github.com/segmentio/encoding v0.4.0/go.mod h1:/d03Cd8PoaDeceuhUUUQWjU0KhWjrmYrWPgtJHYZSnI=
25-
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
26-
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
25+
github.com/shadowspore/fossil-delta v0.0.0-20241213113458-1d797d70cbe3 h1:/4/IJi5iyTdh6mqOUaASW148HQpujYiHl0Wl78dSOSc=
26+
github.com/shadowspore/fossil-delta v0.0.0-20241213113458-1d797d70cbe3/go.mod h1:aJIMhRsunltJR926EB2MUg8qHemFQDreSB33pyto2Ps=
27+
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
28+
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
2729
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
2830
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
2931
golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM=

subscription.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,8 @@ type Subscription struct {
110110
deltaType DeltaType
111111
deltaNegotiated bool
112112
prevData []byte
113+
114+
inflight atomic.Bool
113115
}
114116

115117
func (s *Subscription) State() SubState {
@@ -572,6 +574,15 @@ func (s *Subscription) applyDeltaLocked(pub *protocol.Publication, event Publica
572574

573575
// Lock must be held outside.
574576
func (s *Subscription) scheduleResubscribe() {
577+
if s.resubscribeTimer != nil {
578+
if s.centrifuge.logLevelEnabled(LogLevelDebug) {
579+
s.centrifuge.log(LogLevelDebug, "stopping previous resubscribe timer", map[string]string{
580+
"channel": s.Channel,
581+
})
582+
}
583+
s.resubscribeTimer.Stop()
584+
s.resubscribeTimer = nil
585+
}
575586
delay := s.resubscribeStrategy.timeBeforeNextAttempt(s.resubscribeAttempts)
576587
s.resubscribeAttempts++
577588
s.resubscribeTimer = time.AfterFunc(delay, func() {
@@ -698,23 +709,36 @@ func (s *Subscription) resubscribe() {
698709
s.mu.Unlock()
699710
return
700711
}
712+
if s.inflight.Load() {
713+
s.mu.Unlock()
714+
if s.centrifuge.logLevelEnabled(LogLevelDebug) {
715+
s.centrifuge.log(LogLevelDebug, "avoid subscribe since inflight", map[string]string{
716+
"channel": s.Channel,
717+
})
718+
}
719+
return
720+
}
701721
token := s.token
722+
s.inflight.Store(true)
702723
s.mu.Unlock()
703724

704725
if token == "" && s.getToken != nil {
705726
var err error
706727
token, err = s.getSubscriptionToken(s.Channel)
707728
if err != nil {
708729
if errors.Is(err, ErrUnauthorized) {
730+
s.inflight.Store(false)
709731
s.unsubscribe(unsubscribedUnauthorized, "unauthorized", false)
710732
return
711733
}
734+
s.inflight.Store(false)
712735
s.subscribeError(err)
713736
return
714737
}
715738
s.mu.Lock()
716739
if token == "" {
717740
s.mu.Unlock()
741+
s.inflight.Store(false)
718742
s.unsubscribe(unsubscribedUnauthorized, "unauthorized", false)
719743
return
720744
}
@@ -725,6 +749,7 @@ func (s *Subscription) resubscribe() {
725749
s.mu.Lock()
726750
defer s.mu.Unlock()
727751
if s.state != SubStateSubscribing {
752+
s.inflight.Store(false)
728753
return
729754
}
730755

@@ -738,12 +763,15 @@ func (s *Subscription) resubscribe() {
738763

739764
err := s.centrifuge.sendSubscribe(s.Channel, s.data, isRecover, sp, token, s.positioned, s.recoverable, s.joinLeave, s.deltaType, func(res *protocol.SubscribeResult, err error) {
740765
if err != nil {
766+
s.inflight.Store(false)
741767
s.subscribeError(err)
742768
return
743769
}
770+
s.inflight.Store(false)
744771
s.moveToSubscribed(res)
745772
})
746773
if err != nil {
774+
s.inflight.Store(false)
747775
s.scheduleResubscribe()
748776
}
749777
}

0 commit comments

Comments
 (0)