Skip to content

Commit ee6b837

Browse files
authored
VReplication: Improve query buffering behavior during MoveTables traffic switching (#15701)
Signed-off-by: Matt Lord <[email protected]>
1 parent 1de3daa commit ee6b837

File tree

20 files changed

+428
-208
lines changed

20 files changed

+428
-208
lines changed

changelog/20.0/20.0.0/summary.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
- **[Flag changes](#flag-changes)**
2727
- [`pprof-http` default change](#pprof-http-default)
2828
- [New `healthcheck-dial-concurrency` flag](#healthcheck-dial-concurrency-flag)
29+
- [New minimum for `--buffer_min_time_between_failovers`](#buffer_min_time_between_failovers-flag)
2930
- [New `track-udfs` vtgate flag](#vtgate-track-udfs-flag)
3031
- **[Minor Changes](#minor-changes)**
3132
- **[New Stats](#new-stats)**
@@ -214,6 +215,10 @@ To continue enabling these endpoints, explicitly set `--pprof-http` when startin
214215

215216
The new `--healthcheck-dial-concurrency` flag defines the maximum number of healthcheck connections that can open concurrently. This limit is to avoid hitting Go runtime panics on deployments watching enough tablets [to hit the runtime's maximum thread limit of `10000`](https://pkg.go.dev/runtime/debug#SetMaxThreads) due to blocking network syscalls. This flag applies to `vtcombo`, `vtctld` and `vtgate` only and a value less than the runtime max thread limit _(`10000`)_ is recommended.
216217

218+
#### <a id="buffer_min_time_between_failovers-flag"/>New minimum for `--buffer_min_time_between_failovers`
219+
220+
The `--buffer_min_time_between_failovers` `vttablet` flag now has a minimum value of `1s`. This is because a value of 0 can cause issues with the buffering mechanics resulting in unexpected and unnecessary query errors — in particular during `MoveTables SwitchTraffic` operations. If you are currently specifying a value of 0 for this flag then you will need to update the config value to 1s *prior to upgrading to v20 or later* as `vttablet` will report an error and terminate if you attempt to start it with a value of 0.
221+
217222
#### <a id="vtgate-track-udfs-flag"/>New `--track-udfs` vtgate flag
218223

219224
The new `--track-udfs` flag enables VTGate to track user defined functions for better planning.

go/test/endtoend/vreplication/cluster_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,9 @@ var (
5656
sidecarDBIdentifier = sqlparser.String(sqlparser.NewIdentifierCS(sidecarDBName))
5757
mainClusterConfig *ClusterConfig
5858
externalClusterConfig *ClusterConfig
59-
extraVTGateArgs = []string{"--tablet_refresh_interval", "10ms", "--enable_buffer", "--buffer_window", loadTestBufferingWindowDurationStr,
60-
"--buffer_size", "100000", "--buffer_min_time_between_failovers", "0s", "--buffer_max_failover_duration", loadTestBufferingWindowDurationStr}
59+
extraVTGateArgs = []string{"--tablet_refresh_interval", "10ms", "--enable_buffer", "--buffer_window", loadTestBufferingWindowDuration.String(),
60+
"--buffer_size", "250000", "--buffer_min_time_between_failovers", "1s", "--buffer_max_failover_duration", loadTestBufferingWindowDuration.String(),
61+
"--buffer_drain_concurrency", "10"}
6162
extraVtctldArgs = []string{"--remote_operation_timeout", "600s", "--topo_etcd_lease_ttl", "120"}
6263
// This variable can be used within specific tests to alter vttablet behavior
6364
extraVTTabletArgs = []string{}

go/test/endtoend/vreplication/helper_test.go

Lines changed: 74 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,18 @@ package vreplication
1818

1919
import (
2020
"context"
21-
"crypto/rand"
2221
"encoding/hex"
2322
"encoding/json"
2423
"fmt"
2524
"io"
25+
"math/rand"
2626
"net/http"
2727
"os"
2828
"os/exec"
2929
"regexp"
3030
"sort"
3131
"strings"
32+
"sync"
3233
"sync/atomic"
3334
"testing"
3435
"time"
@@ -121,9 +122,10 @@ func getConnectionNoError(t *testing.T, hostname string, port int) *mysql.Conn {
121122

122123
func getConnection(t *testing.T, hostname string, port int) *mysql.Conn {
123124
vtParams := mysql.ConnParams{
124-
Host: hostname,
125-
Port: port,
126-
Uname: "vt_dba",
125+
Host: hostname,
126+
Port: port,
127+
Uname: "vt_dba",
128+
ConnectTimeoutMs: 1000,
127129
}
128130
ctx := context.Background()
129131
conn, err := mysql.Connect(ctx, &vtParams)
@@ -803,92 +805,111 @@ func getRowCount(t *testing.T, vtgateConn *mysql.Conn, table string) int {
803805
}
804806

805807
const (
806-
loadTestBufferingWindowDurationStr = "30s"
807-
loadTestPostBufferingInsertWindow = 60 * time.Second // should be greater than loadTestBufferingWindowDurationStr
808-
loadTestWaitForCancel = 30 * time.Second
809-
loadTestWaitBetweenQueries = 2 * time.Millisecond
808+
loadTestBufferingWindowDuration = 10 * time.Second
809+
loadTestAvgWaitBetweenQueries = 500 * time.Microsecond
810+
loadTestDefaultConnections = 100
810811
)
811812

812813
type loadGenerator struct {
813-
t *testing.T
814-
vc *VitessCluster
815-
ctx context.Context
816-
cancel context.CancelFunc
814+
t *testing.T
815+
vc *VitessCluster
816+
ctx context.Context
817+
cancel context.CancelFunc
818+
connections int
819+
wg sync.WaitGroup
817820
}
818821

819822
func newLoadGenerator(t *testing.T, vc *VitessCluster) *loadGenerator {
820823
return &loadGenerator{
821-
t: t,
822-
vc: vc,
824+
t: t,
825+
vc: vc,
826+
connections: loadTestDefaultConnections,
823827
}
824828
}
825829

826830
func (lg *loadGenerator) stop() {
827-
time.Sleep(loadTestPostBufferingInsertWindow) // wait for buffering to stop and additional records to be inserted by startLoad after traffic is switched
831+
// Wait for buffering to stop and additional records to be inserted by start
832+
// after traffic is switched.
833+
time.Sleep(loadTestBufferingWindowDuration * 2)
828834
log.Infof("Canceling load")
829835
lg.cancel()
830-
time.Sleep(loadTestWaitForCancel) // wait for cancel to take effect
836+
lg.wg.Wait()
831837
}
832838

833839
func (lg *loadGenerator) start() {
834840
t := lg.t
835841
lg.ctx, lg.cancel = context.WithCancel(context.Background())
842+
var connectionCount atomic.Int64
836843

837844
var id int64
838-
log.Infof("startLoad: starting")
845+
log.Infof("loadGenerator: starting")
839846
queryTemplate := "insert into loadtest(id, name) values (%d, 'name-%d')"
840847
var totalQueries, successfulQueries int64
841848
var deniedErrors, ambiguousErrors, reshardedErrors, tableNotFoundErrors, otherErrors int64
849+
lg.wg.Add(1)
842850
defer func() {
843-
844-
log.Infof("startLoad: totalQueries: %d, successfulQueries: %d, deniedErrors: %d, ambiguousErrors: %d, reshardedErrors: %d, tableNotFoundErrors: %d, otherErrors: %d",
851+
defer lg.wg.Done()
852+
log.Infof("loadGenerator: totalQueries: %d, successfulQueries: %d, deniedErrors: %d, ambiguousErrors: %d, reshardedErrors: %d, tableNotFoundErrors: %d, otherErrors: %d",
845853
totalQueries, successfulQueries, deniedErrors, ambiguousErrors, reshardedErrors, tableNotFoundErrors, otherErrors)
846854
}()
847-
logOnce := true
848855
for {
849856
select {
850857
case <-lg.ctx.Done():
851-
log.Infof("startLoad: context cancelled")
852-
log.Infof("startLoad: deniedErrors: %d, ambiguousErrors: %d, reshardedErrors: %d, tableNotFoundErrors: %d, otherErrors: %d",
858+
log.Infof("loadGenerator: context cancelled")
859+
log.Infof("loadGenerator: deniedErrors: %d, ambiguousErrors: %d, reshardedErrors: %d, tableNotFoundErrors: %d, otherErrors: %d",
853860
deniedErrors, ambiguousErrors, reshardedErrors, tableNotFoundErrors, otherErrors)
854861
require.Equal(t, int64(0), deniedErrors)
855862
require.Equal(t, int64(0), otherErrors)
863+
require.Equal(t, int64(0), reshardedErrors)
856864
require.Equal(t, totalQueries, successfulQueries)
857865
return
858866
default:
859-
go func() {
860-
conn := vc.GetVTGateConn(t)
861-
defer conn.Close()
862-
atomic.AddInt64(&id, 1)
863-
query := fmt.Sprintf(queryTemplate, id, id)
864-
_, err := conn.ExecuteFetch(query, 1, false)
865-
atomic.AddInt64(&totalQueries, 1)
866-
if err != nil {
867-
sqlErr := err.(*sqlerror.SQLError)
868-
if strings.Contains(strings.ToLower(err.Error()), "denied tables") {
869-
log.Infof("startLoad: denied tables error executing query: %d:%v", sqlErr.Number(), err)
870-
atomic.AddInt64(&deniedErrors, 1)
871-
} else if strings.Contains(strings.ToLower(err.Error()), "ambiguous") {
872-
// this can happen when a second keyspace is setup with the same tables, but there are no routing rules
873-
// set yet by MoveTables. So we ignore these errors.
874-
atomic.AddInt64(&ambiguousErrors, 1)
875-
} else if strings.Contains(strings.ToLower(err.Error()), "current keyspace is being resharded") {
876-
atomic.AddInt64(&reshardedErrors, 1)
877-
} else if strings.Contains(strings.ToLower(err.Error()), "not found") {
878-
atomic.AddInt64(&tableNotFoundErrors, 1)
879-
} else {
880-
if logOnce {
881-
log.Infof("startLoad: error executing query: %d:%v", sqlErr.Number(), err)
882-
logOnce = false
867+
if int(connectionCount.Load()) < lg.connections {
868+
connectionCount.Add(1)
869+
lg.wg.Add(1)
870+
go func() {
871+
defer lg.wg.Done()
872+
defer connectionCount.Add(-1)
873+
conn := vc.GetVTGateConn(t)
874+
defer conn.Close()
875+
for {
876+
select {
877+
case <-lg.ctx.Done():
878+
return
879+
default:
883880
}
884-
atomic.AddInt64(&otherErrors, 1)
881+
newID := atomic.AddInt64(&id, 1)
882+
query := fmt.Sprintf(queryTemplate, newID, newID)
883+
_, err := conn.ExecuteFetch(query, 1, false)
884+
atomic.AddInt64(&totalQueries, 1)
885+
if err != nil {
886+
sqlErr := err.(*sqlerror.SQLError)
887+
if strings.Contains(strings.ToLower(err.Error()), "denied tables") {
888+
if debugMode {
889+
t.Logf("loadGenerator: denied tables error executing query: %d:%v", sqlErr.Number(), err)
890+
}
891+
atomic.AddInt64(&deniedErrors, 1)
892+
} else if strings.Contains(strings.ToLower(err.Error()), "ambiguous") {
893+
// This can happen when a second keyspace is setup with the same tables, but
894+
// there are no routing rules set yet by MoveTables. So we ignore these errors.
895+
atomic.AddInt64(&ambiguousErrors, 1)
896+
} else if strings.Contains(strings.ToLower(err.Error()), "current keyspace is being resharded") {
897+
atomic.AddInt64(&reshardedErrors, 1)
898+
} else if strings.Contains(strings.ToLower(err.Error()), "not found") {
899+
atomic.AddInt64(&tableNotFoundErrors, 1)
900+
} else {
901+
if debugMode {
902+
t.Logf("loadGenerator: error executing query: %d:%v", sqlErr.Number(), err)
903+
}
904+
atomic.AddInt64(&otherErrors, 1)
905+
}
906+
} else {
907+
atomic.AddInt64(&successfulQueries, 1)
908+
}
909+
time.Sleep(time.Duration(int64(float64(loadTestAvgWaitBetweenQueries.Microseconds()) * rand.Float64())))
885910
}
886-
time.Sleep(loadTestWaitBetweenQueries)
887-
} else {
888-
atomic.AddInt64(&successfulQueries, 1)
889-
}
890-
}()
891-
time.Sleep(loadTestWaitBetweenQueries)
911+
}()
912+
}
892913
}
893914
}
894915
}

go/test/endtoend/vreplication/movetables_buffering_test.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package vreplication
22

33
import (
44
"testing"
5+
"time"
56

67
"github.com/stretchr/testify/require"
78

@@ -33,8 +34,12 @@ func TestMoveTablesBuffering(t *testing.T) {
3334
catchup(t, targetTab2, workflowName, "MoveTables")
3435
vdiffSideBySide(t, ksWorkflow, "")
3536
waitForLowLag(t, "customer", workflowName)
36-
tstWorkflowSwitchReads(t, "", "")
37-
tstWorkflowSwitchWrites(t)
37+
for i := 0; i < 10; i++ {
38+
tstWorkflowSwitchReadsAndWrites(t)
39+
time.Sleep(loadTestBufferingWindowDuration + 1*time.Second)
40+
tstWorkflowReverseReadsAndWrites(t)
41+
time.Sleep(loadTestBufferingWindowDuration + 1*time.Second)
42+
}
3843
log.Infof("SwitchWrites done")
3944
lg.stop()
4045

go/test/endtoend/vreplication/partial_movetables_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"fmt"
2121
"strings"
2222
"testing"
23+
"time"
2324

2425
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
2526

@@ -67,10 +68,12 @@ func testCancel(t *testing.T) {
6768
mt.SwitchReadsAndWrites()
6869
checkDenyList(targetKeyspace, false)
6970
checkDenyList(sourceKeyspace, true)
71+
time.Sleep(loadTestBufferingWindowDuration + 1*time.Second)
7072

7173
mt.ReverseReadsAndWrites()
7274
checkDenyList(targetKeyspace, true)
7375
checkDenyList(sourceKeyspace, false)
76+
time.Sleep(loadTestBufferingWindowDuration + 1*time.Second)
7477

7578
mt.Cancel()
7679
checkDenyList(targetKeyspace, false)
@@ -123,6 +126,7 @@ func testPartialMoveTablesBasic(t *testing.T, flavor workflowFlavor) {
123126
catchup(t, targetTab80Dash, workflowName, "MoveTables")
124127
vdiff(t, targetKeyspace, workflowName, defaultCellName, false, true, nil)
125128
mt.SwitchReadsAndWrites()
129+
time.Sleep(loadTestBufferingWindowDuration + 1*time.Second)
126130
mt.Complete()
127131

128132
emptyGlobalRoutingRules := "{}\n"
@@ -246,6 +250,7 @@ func testPartialMoveTablesBasic(t *testing.T, flavor workflowFlavor) {
246250

247251
// Switch all traffic for the shard
248252
mt80Dash.SwitchReadsAndWrites()
253+
time.Sleep(loadTestBufferingWindowDuration + 1*time.Second)
249254

250255
// Confirm global routing rules -- everything should still be routed
251256
// to the source side, customer, globally.
@@ -331,6 +336,7 @@ func testPartialMoveTablesBasic(t *testing.T, flavor workflowFlavor) {
331336
catchup(t, targetTabDash80, workflowName, "MoveTables")
332337
vdiff(t, targetKeyspace, workflowName, defaultCellName, false, true, nil)
333338
mtDash80.SwitchReadsAndWrites()
339+
time.Sleep(loadTestBufferingWindowDuration + 1*time.Second)
334340

335341
// Confirm global routing rules: everything should still be routed
336342
// to the source side, customer, globally.

0 commit comments

Comments
 (0)