Skip to content

Commit

Permalink
Merge pull request #31 from github/fix-out-of-order-dml-apply
Browse files Browse the repository at this point in the history
initial, simple solution to our-of-order applying of DML events
  • Loading branch information
Shlomi Noach committed May 16, 2016
2 parents 619be65 + 1e10f1f commit 92d09db
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 72 deletions.
6 changes: 3 additions & 3 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,17 +121,17 @@ func GetMigrationContext() *MigrationContext {

// GetGhostTableName generates the name of ghost table, based on original table name
func (this *MigrationContext) GetGhostTableName() string {
return fmt.Sprintf("_%s_New", this.OriginalTableName)
return fmt.Sprintf("_%s_gst", this.OriginalTableName)
}

// GetOldTableName generates the name of the "old" table, into which the original table is renamed.
func (this *MigrationContext) GetOldTableName() string {
return fmt.Sprintf("_%s_Old", this.OriginalTableName)
return fmt.Sprintf("_%s_old", this.OriginalTableName)
}

// GetChangelogTableName generates the name of changelog table, based on original table name
func (this *MigrationContext) GetChangelogTableName() string {
return fmt.Sprintf("_%s_OSC", this.OriginalTableName)
return fmt.Sprintf("_%s_osc", this.OriginalTableName)
}

// GetVoluntaryLockName returns a name of a voluntary lock to be used throughout
Expand Down
33 changes: 33 additions & 0 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,39 @@ func (this *Applier) StopSlaveIOThread() error {
return nil
}

// MasterPosWait is applicable with --test-on-replica
func (this *Applier) MasterPosWait(binlogCoordinates *mysql.BinlogCoordinates) error {
var appliedRows int64
if err := this.db.QueryRow(`select master_pos_wait(?, ?, ?)`, binlogCoordinates.LogFile, binlogCoordinates.LogPos, 1).Scan(&appliedRows); err != nil {
return err
}
if appliedRows < 0 {
return fmt.Errorf("Timeout waiting on master_pos_wait()")
}
return nil
}

func (this *Applier) StopSlaveNicely() error {
if err := this.StopSlaveIOThread(); err != nil {
return err
}
binlogCoordinates, err := mysql.GetReadBinlogCoordinates(this.db)
if err != nil {
return err
}
log.Debugf("Replication stopped at %+v. Will wait for SQL thread to apply", *binlogCoordinates)
if err := this.MasterPosWait(binlogCoordinates); err != nil {
return err
}
log.Debugf("Replication SQL thread applied all events")
if selfBinlogCoordinates, err := mysql.GetSelfBinlogCoordinates(this.db); err != nil {
return err
} else {
log.Debugf("Self binlog coordinates: %+v", *selfBinlogCoordinates)
}
return nil
}

// GrabVoluntaryLock gets a named lock (`GET_LOCK`) and listens
// on a okToRelease in order to release it
func (this *Applier) GrabVoluntaryLock(lockGrabbed chan<- error, okToRelease <-chan bool) error {
Expand Down
157 changes: 88 additions & 69 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type Migrator struct {
voluntaryLockAcquired chan bool
panicAbort chan error

allEventsUpToLockProcessedFlag int64
// copyRowsQueue should not be buffered; if buffered some non-damaging but
// excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete
copyRowsQueue chan tableWriteFunc
Expand All @@ -65,6 +66,8 @@ func NewMigrator() *Migrator {
voluntaryLockAcquired: make(chan bool, 1),
panicAbort: make(chan error),

allEventsUpToLockProcessedFlag: 0,

copyRowsQueue: make(chan tableWriteFunc),
applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer),
handledChangelogStates: make(map[string]bool),
Expand Down Expand Up @@ -106,7 +109,7 @@ func (this *Migrator) shouldThrottle() (result bool, reason string) {
if time.Duration(lag) > time.Duration(this.migrationContext.MaxLagMillisecondsThrottleThreshold)*time.Millisecond {
return true, fmt.Sprintf("lag=%fs", time.Duration(lag).Seconds())
}
if this.migrationContext.TestOnReplica {
if this.migrationContext.TestOnReplica && (atomic.LoadInt64(&this.allEventsUpToLockProcessedFlag) == 0) {
replicationLag, err := mysql.GetMaxReplicationLag(this.migrationContext.InspectorConnectionConfig, this.migrationContext.ThrottleControlReplicaKeys, this.migrationContext.ReplictionLagQuery)
if err != nil {
return true, err.Error()
Expand Down Expand Up @@ -198,6 +201,16 @@ func (this *Migrator) executeAndThrottleOnError(operation func() error) (err err
return nil
}

// consumeRowCopyComplete blocks on the rowCopyComplete channel once, and then
// consumers and drops any further incoming events that may be left hanging.
func (this *Migrator) consumeRowCopyComplete() {
<-this.rowCopyComplete
go func() {
for <-this.rowCopyComplete {
}
}()
}

func (this *Migrator) canStopStreaming() bool {
return false
}
Expand All @@ -215,33 +228,18 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
}
case AllEventsUpToLockProcessed:
{
this.allEventsUpToLockProcessed <- true
}
default:
{
return fmt.Errorf("Unknown changelog state: %+v", changelogState)
}
}
log.Debugf("Received state %+v", changelogState)
return nil
}

func (this *Migrator) onChangelogState(stateValue string) (err error) {
log.Fatalf("I shouldn't be here")
if this.handledChangelogStates[stateValue] {
return nil
}
this.handledChangelogStates[stateValue] = true

changelogState := ChangelogState(stateValue)
switch changelogState {
case TablesInPlace:
{
this.tablesInPlace <- true
}
case AllEventsUpToLockProcessed:
{
this.allEventsUpToLockProcessed <- true
applyEventFunc := func() error {
this.allEventsUpToLockProcessed <- true
return nil
}
// at this point we know all events up to lock have been read from the streamer,
// because the streamer works sequentially. So those events are either already handled,
// or have event functions in applyEventsQueue.
// So as not to create a potential deadlock, we write this func to applyEventsQueue
// asynchronously, understanding it doesn't really matter.
go func() {
this.applyEventsQueue <- applyEventFunc
}()
}
default:
{
Expand Down Expand Up @@ -295,6 +293,9 @@ func (this *Migrator) Migrate() (err error) {
if err := this.inspector.InspectOriginalAndGhostTables(); err != nil {
return err
}
if err := this.addDMLEventsListener(); err != nil {
return err
}
go this.initiateHeartbeatListener()

if err := this.applier.ReadMigrationRangeValues(); err != nil {
Expand All @@ -307,7 +308,7 @@ func (this *Migrator) Migrate() (err error) {
go this.initiateStatus()

log.Debugf("Operating until row copy is complete")
<-this.rowCopyComplete
this.consumeRowCopyComplete()
log.Debugf("Row copy complete")
this.printStatus()

Expand Down Expand Up @@ -336,18 +337,20 @@ func (this *Migrator) stopWritesAndCompleteMigration() (err error) {
if this.migrationContext.QuickAndBumpySwapTables {
return this.stopWritesAndCompleteMigrationOnMasterQuickAndBumpy()
}
// Lock-based solution: we use low timeout and multiple attempts. But for
// each failed attempt, we throttle until replication lag is back to normal
if err := this.retryOperation(
func() error {
return this.executeAndThrottleOnError(this.stopWritesAndCompleteMigrationOnMasterViaLock)
}); err != nil {
return err
}
if err := this.dropOldTableIfRequired(); err != nil {
return err
}

{
// Lock-based solution: we use low timeout and multiple attempts. But for
// each failed attempt, we throttle until replication lag is back to normal
if err := this.retryOperation(
func() error {
return this.executeAndThrottleOnError(this.stopWritesAndCompleteMigrationOnMasterViaLock)
}); err != nil {
return err
}
if err := this.dropOldTableIfRequired(); err != nil {
return err
}
}
return
}

Expand All @@ -364,6 +367,21 @@ func (this *Migrator) dropOldTableIfRequired() (err error) {
return nil
}

// Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs,
// make sure the queue is drained.
func (this *Migrator) waitForEventsUpToLock() (err error) {
if _, err := this.applier.WriteChangelogState(string(AllEventsUpToLockProcessed)); err != nil {
return err
}
log.Debugf("Waiting for events up to lock")
<-this.allEventsUpToLockProcessed
atomic.StoreInt64(&this.allEventsUpToLockProcessedFlag, 1)
log.Debugf("Done waiting for events up to lock")
this.printStatus()

return nil
}

// stopWritesAndCompleteMigrationOnMasterQuickAndBumpy will lock down the original table, execute
// what's left of last DML entries, and **non-atomically** swap original->old, then new->original.
// There is a point in time where the "original" table does not exist and queries are non-blocked
Expand All @@ -373,11 +391,9 @@ func (this *Migrator) stopWritesAndCompleteMigrationOnMasterQuickAndBumpy() (err
return err
}

this.applier.WriteChangelogState(string(AllEventsUpToLockProcessed))
log.Debugf("Waiting for events up to lock")
<-this.allEventsUpToLockProcessed
log.Debugf("Done waiting for events up to lock")

if err := this.retryOperation(this.waitForEventsUpToLock); err != nil {
return err
}
if err := this.retryOperation(this.applier.SwapTablesQuickAndBumpy); err != nil {
return err
}
Expand Down Expand Up @@ -438,10 +454,7 @@ func (this *Migrator) stopWritesAndCompleteMigrationOnMasterViaLock() (err error
log.Infof("Found RENAME to be executing")

// OK, at this time we know any newly incoming DML on original table is blocked.
this.applier.WriteChangelogState(string(AllEventsUpToLockProcessed))
log.Debugf("Waiting for events up to lock")
<-this.allEventsUpToLockProcessed
log.Debugf("Done waiting for events up to lock")
this.waitForEventsUpToLock()

okToReleaseLock <- true
// BAM: voluntary lock is released, blocking query is released, rename is released.
Expand All @@ -466,14 +479,11 @@ func (this *Migrator) stopWritesAndCompleteMigrationOnMasterViaLock() (err error
// in sync. There is no table swap.
func (this *Migrator) stopWritesAndCompleteMigrationOnReplica() (err error) {
log.Debugf("testing on replica. Instead of LOCK tables I will STOP SLAVE")
if err := this.retryOperation(this.applier.StopSlaveIOThread); err != nil {
if err := this.retryOperation(this.applier.StopSlaveNicely); err != nil {
return err
}

this.applier.WriteChangelogState(string(AllEventsUpToLockProcessed))
log.Debugf("Waiting for events up to lock")
<-this.allEventsUpToLockProcessed
log.Debugf("Done waiting for events up to lock")
this.waitForEventsUpToLock()

log.Info("Table duplicated with new schema. Am not touching the original table. Replication is stopped. You may now compare the two tables to gain trust into this tool's operation")
return nil
Expand Down Expand Up @@ -612,8 +622,18 @@ func (this *Migrator) initiateStreaming() error {
return this.onChangelogStateEvent(dmlEvent)
},
)
this.eventsStreamer.AddListener(
true,

go func() {
log.Debugf("Beginning streaming")
this.eventsStreamer.StreamEvents(func() bool { return this.canStopStreaming() })
}()
return nil
}

// addDMLEventsListener
func (this *Migrator) addDMLEventsListener() error {
err := this.eventsStreamer.AddListener(
false,
this.migrationContext.DatabaseName,
this.migrationContext.OriginalTableName,
func(dmlEvent *binlog.BinlogDMLEvent) error {
Expand All @@ -624,12 +644,7 @@ func (this *Migrator) initiateStreaming() error {
return nil
},
)

go func() {
log.Debugf("Beginning streaming")
this.eventsStreamer.StreamEvents(func() bool { return this.canStopStreaming() })
}()
return nil
return err
}

func (this *Migrator) initiateApplier() error {
Expand Down Expand Up @@ -680,13 +695,16 @@ func (this *Migrator) iterateChunks() error {
if !hasFurtherRange {
return terminateRowIteration(nil)
}
_, rowsAffected, _, err := this.applier.ApplyIterationInsertQuery()
if err != nil {
return terminateRowIteration(err)
applyCopyRowsFunc := func() error {
_, rowsAffected, _, err := this.applier.ApplyIterationInsertQuery()
if err != nil {
return terminateRowIteration(err)
}
atomic.AddInt64(&this.migrationContext.TotalRowsCopied, rowsAffected)
atomic.AddInt64(&this.migrationContext.Iteration, 1)
return nil
}
atomic.AddInt64(&this.migrationContext.TotalRowsCopied, rowsAffected)
atomic.AddInt64(&this.migrationContext.Iteration, 1)
return nil
return this.retryOperation(applyCopyRowsFunc)
}
this.copyRowsQueue <- copyRowsFunc
}
Expand Down Expand Up @@ -714,7 +732,8 @@ func (this *Migrator) executeWriteFuncs() error {
select {
case copyRowsFunc := <-this.copyRowsQueue:
{
if err := this.retryOperation(copyRowsFunc); err != nil {
// Retries are handled within the copyRowsFunc
if err := copyRowsFunc(); err != nil {
return log.Errore(err)
}
}
Expand Down
10 changes: 10 additions & 0 deletions go/logic/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
gosql "database/sql"
"fmt"
"strings"
"sync"

"github.com/github/gh-osc/go/base"
"github.com/github/gh-osc/go/binlog"
Expand Down Expand Up @@ -37,6 +38,7 @@ type EventsStreamer struct {
migrationContext *base.MigrationContext
nextBinlogCoordinates *mysql.BinlogCoordinates
listeners [](*BinlogEventListener)
listenersMutex *sync.Mutex
eventsChannel chan *binlog.BinlogEntry
binlogReader binlog.BinlogReader
}
Expand All @@ -46,12 +48,17 @@ func NewEventsStreamer() *EventsStreamer {
connectionConfig: base.GetMigrationContext().InspectorConnectionConfig,
migrationContext: base.GetMigrationContext(),
listeners: [](*BinlogEventListener){},
listenersMutex: &sync.Mutex{},
eventsChannel: make(chan *binlog.BinlogEntry, EventsChannelBufferSize),
}
}

func (this *EventsStreamer) AddListener(
async bool, databaseName string, tableName string, onDmlEvent func(event *binlog.BinlogDMLEvent) error) (err error) {

this.listenersMutex.Lock()
defer this.listenersMutex.Unlock()

if databaseName == "" {
return fmt.Errorf("Empty database name in AddListener")
}
Expand All @@ -69,6 +76,9 @@ func (this *EventsStreamer) AddListener(
}

func (this *EventsStreamer) notifyListeners(binlogEvent *binlog.BinlogDMLEvent) {
this.listenersMutex.Lock()
defer this.listenersMutex.Unlock()

for _, listener := range this.listeners {
if strings.ToLower(listener.databaseName) != strings.ToLower(binlogEvent.DatabaseName) {
continue
Expand Down
Loading

0 comments on commit 92d09db

Please sign in to comment.