Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/hooks.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ The following variables are available on all hooks:
- `GH_OST_ESTIMATED_ROWS` - estimated total rows in table
- `GH_OST_COPIED_ROWS` - number of rows copied by `gh-ost`
- `GH_OST_INSPECTED_LAG` - lag in seconds (floating point) of inspected server
- `GH_OST_HEARTBEAT_LAG` - lag in seconds (floating point) of heartbeat
- `GH_OST_PROGRESS` - progress pct ([0..100], floating point) of migration
- `GH_OST_MIGRATED_HOST`
- `GH_OST_INSPECTED_HOST`
Expand Down
5 changes: 5 additions & 0 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ type MigrationContext struct {
RenameTablesEndTime time.Time
pointOfInterestTime time.Time
pointOfInterestTimeMutex *sync.Mutex
CurrentHeartbeatLag int64
CurrentLag int64
currentProgress uint64
ThrottleHTTPStatusCode int64
Expand Down Expand Up @@ -454,6 +455,10 @@ func (this *MigrationContext) MarkRowCopyEndTime() {
this.RowCopyEndTime = time.Now()
}

func (this *MigrationContext) GetCurrentHeartbeatLagDuration() time.Duration {
return time.Duration(atomic.LoadInt64(&this.CurrentHeartbeatLag))
}

func (this *MigrationContext) GetCurrentLagDuration() time.Duration {
return time.Duration(atomic.LoadInt64(&this.CurrentLag))
}
Expand Down
1 change: 1 addition & 0 deletions go/logic/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func (this *HooksExecutor) applyEnvironmentVariables(extraVariables ...string) [
env = append(env, fmt.Sprintf("GH_OST_INSPECTED_HOST=%s", this.migrationContext.GetInspectorHostname()))
env = append(env, fmt.Sprintf("GH_OST_EXECUTING_HOST=%s", this.migrationContext.Hostname))
env = append(env, fmt.Sprintf("GH_OST_INSPECTED_LAG=%f", this.migrationContext.GetCurrentLagDuration().Seconds()))
env = append(env, fmt.Sprintf("GH_OST_HEARTBEAT_LAG=%f", this.migrationContext.GetCurrentHeartbeatLagDuration().Seconds()))
env = append(env, fmt.Sprintf("GH_OST_PROGRESS=%f", this.migrationContext.GetProgressPct()))
env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT=%s", this.migrationContext.HooksHintMessage))
env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT_OWNER=%s", this.migrationContext.HooksHintOwner))
Expand Down
45 changes: 40 additions & 5 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,20 @@ func (this *Migrator) canStopStreaming() bool {
return atomic.LoadInt64(&this.migrationContext.CutOverCompleteFlag) != 0
}

// onChangelogStateEvent is called when a binlog event operation on the changelog table is intercepted.
func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (err error) {
// onChangelogEvent is called when a binlog event operation on the changelog table is intercepted.
func (this *Migrator) onChangelogEvent(dmlEvent *binlog.BinlogDMLEvent) (err error) {
// Hey, I created the changelog table, I know the type of columns it has!
if hint := dmlEvent.NewColumnValues.StringColumn(2); hint != "state" {
switch hint := dmlEvent.NewColumnValues.StringColumn(2); hint {
case "state":
return this.onChangelogStateEvent(dmlEvent)
case "heartbeat":
return this.onChangelogHeartbeatEvent(dmlEvent)
default:
return nil
}
}

func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (err error) {
changelogStateString := dmlEvent.NewColumnValues.StringColumn(3)
changelogState := ReadChangelogState(changelogStateString)
this.migrationContext.Log.Infof("Intercepted changelog state %s", changelogState)
Expand Down Expand Up @@ -245,6 +253,16 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
return nil
}

func (this *Migrator) onChangelogHeartbeatEvent(dmlEvent *binlog.BinlogDMLEvent) (err error) {
changelogHeartbeatString := dmlEvent.NewColumnValues.StringColumn(3)
if lag, err := parseChangelogHeartbeat(changelogHeartbeatString); err != nil {
return this.migrationContext.Log.Errore(err)
} else {
atomic.StoreInt64(&this.migrationContext.CurrentHeartbeatLag, int64(lag))
return nil
}
}

// listenOnPanicAbort aborts on abort request
func (this *Migrator) listenOnPanicAbort() {
err := <-this.migrationContext.PanicAbort
Expand Down Expand Up @@ -500,6 +518,22 @@ func (this *Migrator) cutOver() (err error) {
this.migrationContext.MarkPointOfInterest()
this.migrationContext.Log.Debugf("checking for cut-over postpone: complete")

this.migrationContext.Log.Infof("Waiting for heartbeat lag to be low enough to proceed")
this.sleepWhileTrue(
func() (bool, error) {
currentHeartbeatLag := atomic.LoadInt64(&this.migrationContext.CurrentHeartbeatLag)
maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold)
if time.Duration(currentHeartbeatLag) > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond {
this.migrationContext.Log.Debugf("current HeartbeatLag (%.2fs) is too high, it needs to be less than --max-lag-millis (%.2fs) to continue", time.Duration(currentHeartbeatLag).Seconds(), (time.Duration(maxLagMillisecondsThrottleThreshold) * time.Millisecond).Seconds())
return true, nil
} else {
return false, nil
}
},
)
this.migrationContext.MarkPointOfInterest()
this.migrationContext.Log.Infof("Heartbeat lag is low enough, proceeding")

if this.migrationContext.TestOnReplica {
// With `--test-on-replica` we stop replication thread, and then proceed to use
// the same cut-over phase as the master would use. That means we take locks
Expand Down Expand Up @@ -962,13 +996,14 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {

currentBinlogCoordinates := *this.eventsStreamer.GetCurrentBinlogCoordinates()

status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Time: %+v(total), %+v(copy); streamer: %+v; Lag: %.2fs, State: %s; ETA: %s",
status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Time: %+v(total), %+v(copy); streamer: %+v; Lag: %.2fs, HeartbeatLag: %.2fs, State: %s; ETA: %s",
totalRowsCopied, rowsEstimate, progressPct,
atomic.LoadInt64(&this.migrationContext.TotalDMLEventsApplied),
len(this.applyEventsQueue), cap(this.applyEventsQueue),
base.PrettifyDurationOutput(elapsedTime), base.PrettifyDurationOutput(this.migrationContext.ElapsedRowCopyTime()),
currentBinlogCoordinates,
this.migrationContext.GetCurrentLagDuration().Seconds(),
this.migrationContext.GetCurrentHeartbeatLagDuration().Seconds(),
state,
eta,
)
Expand All @@ -995,7 +1030,7 @@ func (this *Migrator) initiateStreaming() error {
this.migrationContext.DatabaseName,
this.migrationContext.GetChangelogTableName(),
func(dmlEvent *binlog.BinlogDMLEvent) error {
return this.onChangelogStateEvent(dmlEvent)
return this.onChangelogEvent(dmlEvent)
},
)

Expand Down