Skip to content

Commit 32a1881

Browse files
author
Gleez Technologies
committed
Crash service on certain errors, so kubernetes can restart. Several fixes
1 parent 1eaccf5 commit 32a1881

File tree

6 files changed

+46
-69
lines changed

6 files changed

+46
-69
lines changed

cmd/mysql-manticore/main.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -146,20 +146,21 @@ func run() (err error) {
146146
select {
147147
case n := <-sc:
148148
log.Infof("received signal %v, exiting", n)
149-
// case err = <-r.FatalErrC:
150-
// if errors.Cause(err) == river.ErrRebuildAndExitFlagSet {
151-
// log.Info(err.Error())
152-
// err = nil
153-
// }
149+
case err = <-r.FatalErrC:
150+
// if errors.Cause(err) == river.ErrRebuildAndExitFlagSet {
151+
// log.Info(err.Error())
152+
// err = nil
153+
// }
154+
log.Infof("received error [%v], exiting", err.Error())
154155
}
155156

156157
rootSup.Stop()
157158
return err
158159
}
159160

160-
// k8s run master loop will check for master once per 30 seconds.
161+
// k8s run master loop will check for master once per 20 seconds.
161162
func runMasterLoop() {
162-
ticker := time.NewTicker(30 * time.Second)
163+
ticker := time.NewTicker(10 * time.Second)
163164
for {
164165
select {
165166
case <-done:

river/river.go

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -181,39 +181,38 @@ func (r *River) run() error {
181181

182182
r.sup.ServeBackground()
183183

184-
r.sphinxService.RequestStartNotification()
184+
if r.sphinxToken == nil {
185+
r.sphinxService.RequestStartNotification()
185186

186-
t := r.sup.Add(r.sphinxService)
187-
r.sphinxToken = &t
187+
t := r.sup.Add(r.sphinxService)
188+
r.sphinxToken = &t
188189

189-
r.sphinxService.WaitUntilStarted()
190+
r.sphinxService.WaitUntilStarted()
191+
}
190192

191193
b := &backoff.Backoff{
192194
Min: 1 * time.Second,
193-
Max: 20 * time.Minute,
195+
Max: 10 * time.Minute,
194196
Factor: 2,
195197
Jitter: true,
196198
}
197199
defer b.Reset()
198200

199-
// get master state - wait until get state or timeout
200-
for {
201-
time.Sleep(b.Duration())
202-
203-
err = r.sphinxService.LoadSyncState(r.master.syncState())
204-
if err != nil {
205-
r.l.Errorf("one or more manticore backends are not up to date: %v", err)
206-
}
201+
// // get master state - wait until get state or timeout
202+
// for {
203+
// time.Sleep(b.Duration())
207204

208-
if err == nil {
209-
b.Reset()
210-
r.l.Infof("Connected to manticore backend")
211-
break
212-
}
213-
}
205+
// err = r.sphinxService.LoadSyncState(r.master.syncState())
206+
// if err == nil {
207+
// b.Reset()
208+
// r.l.Infof("Connected to manticore backend")
209+
// break
210+
// }
211+
// }
214212

213+
err = r.sphinxService.LoadSyncState(r.master.syncState())
215214
if err != nil {
216-
// r.l.Errorf("one or more manticore backends are not up to date: %v", err)
215+
r.l.Errorf("one or more manticore backends are not up to date: %v", err)
217216
return errors.Trace(err)
218217
}
219218

river/sphinx_service.go

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"github.com/juju/errors"
77
"github.com/sandeepone/mysql-manticore/sphinx"
88
"github.com/siddontang/go/sync2"
9+
910
"gopkg.in/birkirb/loggers.v1"
1011
"gopkg.in/birkirb/loggers.v1/log"
1112
)
@@ -125,7 +126,13 @@ func (s *SphinxService) SaveSyncState() (err error) {
125126
if s.sph == nil {
126127
return errors.Trace(errSphinxDisconnected)
127128
}
128-
return errors.Trace(sphinx.SaveSyncState(s.sph, master.syncState()))
129+
130+
err = sphinx.SaveSyncState(s.sph, master.syncState())
131+
if err != nil {
132+
s.riverInstance.FatalErrC <- errors.Annotatef(err, "could not connect to manticore")
133+
}
134+
135+
return errors.Trace(err)
129136
}
130137

131138
// IndexIsReady ...
@@ -138,22 +145,6 @@ func (s *SphinxService) IndexIsReady(index string, parts uint16) (bool, error) {
138145
return sphinx.IndexIsReady(s.sph, index, parts)
139146
}
140147

141-
// ReloadRtIndex ...
142-
// func (s *SphinxService) ReloadRtIndex(build indexGroupBuild) error {
143-
// s.sphm.Lock()
144-
// defer s.sphm.Unlock()
145-
146-
// if s.sph == nil {
147-
// return errors.Annotatef(errSphinxDisconnected, "error reloading after the build %s", build.id)
148-
// }
149-
150-
// if s.riverInstance.balancer != nil {
151-
// return errors.Trace(s.riverInstance.balancer.RollingReloadIndex(s.sph, build))
152-
// }
153-
154-
// return nil
155-
// }
156-
157148
// CheckIndexForOptimize ...
158149
func (s *SphinxService) CheckIndexForOptimize(index string, parts uint16) error {
159150
s.sphm.Lock()

river/status.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ func (s *stat) run() (err error) {
214214

215215
getStatusInfo = s.getStatusInfo
216216

217-
// signal Kubernetes the server is healthy & ready to receive traffic
217+
// signal Kubernetes the server is healthy
218218
atomic.StoreInt32(&healthy, 1)
219219

220220
return s.srv.Serve(s.l)
@@ -252,8 +252,8 @@ func handleReadyz(s *stat) http.HandlerFunc {
252252
d := time.Now().Sub(s.startedAt)
253253

254254
// allows to take leadership or rolling update.
255-
// wait for 2 mins and inform kubernetes if unsuccessful
256-
if !s.r.isRunning && d.Seconds() < 65 {
255+
// wait for 1 min and inform kubernetes if unsuccessful
256+
if !s.r.isRunning && d.Seconds() < 45 {
257257
w.WriteHeader(http.StatusNoContent)
258258
return
259259
}
@@ -304,11 +304,12 @@ func handleStartSync(r *River) http.HandlerFunc {
304304

305305
err := r.sphinxService.LoadSyncState(r.master.syncState())
306306
if err != nil {
307-
r.l.Errorf("Status: one or more manticore backends are not up to date: %v", err)
308-
w.Write([]byte("manticore backend not up to date\n"))
307+
r.l.Errorf("Status: failed to reset GTID after successful restart: %s", errors.ErrorStack(err))
308+
w.Write([]byte("failed to reset GTID\n"))
309309
return
310310
}
311311

312+
r.l.Infof("reset GTID after successful restart to: %s", r.master.gtidSet())
312313
r.startSyncRoutine()
313314
w.WriteHeader(http.StatusNoContent)
314315
})

river/sync.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,6 @@ type syncState struct {
4848
positionEvents []positionEvent
4949
flushC <-chan time.Time
5050
flushTimer *time.Timer
51-
// buildModeC <-chan buildModeMsg
52-
// buildModeEnabled bool
5351
}
5452

5553
type syncActionNeed struct {
@@ -176,7 +174,6 @@ func (s *SyncService) SyncLoop(ctx context.Context) {
176174
pos: r.master.position(),
177175
},
178176
},
179-
// buildModeC: s.switchC,
180177
}
181178

182179
defer r.SaveState()
@@ -224,7 +221,6 @@ func (s *SyncService) SyncLoop(ctx context.Context) {
224221
state.firstPositionEventID = syncedPositionEventID
225222
state.positionEvents = state.positionEvents[offset:]
226223

227-
// if !state.buildModeEnabled {
228224
bnow := time.Now()
229225
if bnow.Sub(lastSavedTime) > 3*time.Second {
230226
lastSavedTime = bnow
@@ -237,7 +233,6 @@ func (s *SyncService) SyncLoop(ctx context.Context) {
237233
}
238234
}
239235
}
240-
// }
241236
}
242237
}
243238
}

sphinx/sphinx.go

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"time"
1111

1212
set "github.com/deckarep/golang-set"
13-
"github.com/jpillora/backoff"
1413
"github.com/juju/errors"
1514

1615
"github.com/sandeepone/mysql-manticore/util"
@@ -131,7 +130,7 @@ func ConnectMany(addrList []string, settings ConnSettings, wg *sync.WaitGroup) (
131130
in: make(chan interface{}),
132131
out: make(chan SphResult),
133132
settings: settings,
134-
maxRetries: 10,
133+
maxRetries: 5,
135134
}
136135
if wg != nil {
137136
wg.Add(1)
@@ -229,23 +228,14 @@ func (c *SphConn) retry(pause time.Duration, stmt string) (*mysql.Result, error)
229228
return nil, errors.Trace(err)
230229
}
231230

232-
b := &backoff.Backoff{
233-
Min: 1 * time.Second,
234-
Max: 30 * time.Second,
235-
Factor: 2,
236-
Jitter: true,
237-
}
238-
defer b.Reset()
239-
240231
for {
241-
log.Infof("[sphinx-query@%s] waiting for %s before a retry", c.addr, b.Duration())
242-
time.Sleep(b.Duration())
232+
log.Infof("[sphinx-query@%s] waiting for %s before a retry", c.addr, pause)
233+
time.Sleep(pause)
243234
c.attempts = c.attempts + 1
244235

245236
err = c.reconnect()
246237
if err == nil {
247-
b.Reset()
248-
c.attempts = 0
238+
// c.attempts = 0
249239
break
250240
}
251241

@@ -254,7 +244,7 @@ func (c *SphConn) retry(pause time.Duration, stmt string) (*mysql.Result, error)
254244
}
255245
}
256246

257-
b.Reset()
247+
c.attempts = 0
258248
log.Infof("[sphinx-query@%s] [retry] %s", c.RemoteAddr().String(), stmt)
259249
return c.conn.Conn.Execute(stmt)
260250
}

0 commit comments

Comments
 (0)