Skip to content

Commit

Permalink
vtorc: improve handling of partial cell topo results (#17718)
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Vaillancourt <[email protected]>
  • Loading branch information
timvaillancourt authored Feb 14, 2025
1 parent 70114ad commit 1c06427
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 41 deletions.
70 changes: 56 additions & 14 deletions go/vt/topo/faketopo/faketopo.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,26 @@ func NewFakeTopoFactory() *FakeFactory {
mu: sync.Mutex{},
cells: map[string][]*FakeConn{},
}
factory.cells[topo.GlobalCell] = []*FakeConn{newFakeConnection()}
factory.cells[topo.GlobalCell] = []*FakeConn{NewFakeConnection()}
return factory
}

// AddCell is used to add a cell to the factory. It returns the fake connection created. This connection can then be used to set get and update errors
func (f *FakeFactory) AddCell(cell string) *FakeConn {
conn := newFakeConnection()
f.mu.Lock()
defer f.mu.Unlock()
conn := NewFakeConnection()
f.cells[cell] = []*FakeConn{conn}
return conn
}

// SetCell is used to set a cell in the factory.
func (f *FakeFactory) SetCell(cell string, fakeConn *FakeConn) {
f.mu.Lock()
defer f.mu.Unlock()
f.cells[cell] = []*FakeConn{fakeConn}
}

// HasGlobalReadOnlyCell implements the Factory interface
func (f *FakeFactory) HasGlobalReadOnlyCell(serverAddr, root string) bool {
return false
Expand All @@ -70,7 +79,7 @@ func (f *FakeFactory) Create(cell, serverAddr, root string) (topo.Conn, error) {
if !ok || len(connections) == 0 {
return nil, topo.NewError(topo.NoNode, cell)
}
// pick the first connection and remove it from the list
// pick the first connection and remove it from the list.
conn := connections[0]
f.cells[cell] = connections[1:]

Expand All @@ -84,15 +93,19 @@ type FakeConn struct {
cell string
serverAddr string

// mutex to protect all the operations
// mutex to protect all the operations.
mu sync.Mutex

// getResultMap is a map storing the results for each filepath
// getResultMap is a map storing the results for each filepath.
getResultMap map[string]result
// updateErrors stores whether update function call should error or not
// listResultMap is a map storing the resuls for each filepath prefix.
listResultMap map[string][]topo.KVInfo
// updateErrors stores whether update function call should error or not.
updateErrors []updateError
// getErrors stores whether the get function call should error or not
// getErrors stores whether the get function call should error or not.
getErrors []bool
// listErrors stores whether the list function call should error or not.
listErrors []bool

// watches is a map of all watches for this connection to the cell keyed by the filepath.
watches map[string][]chan *topo.WatchData
Expand All @@ -105,13 +118,15 @@ type updateError struct {
writePersists bool
}

// newFakeConnection creates a new fake connection
func newFakeConnection() *FakeConn {
// NewFakeConnection creates a new fake connection
func NewFakeConnection() *FakeConn {
return &FakeConn{
getResultMap: map[string]result{},
watches: map[string][]chan *topo.WatchData{},
getErrors: []bool{},
updateErrors: []updateError{},
getResultMap: map[string]result{},
listResultMap: map[string][]topo.KVInfo{},
watches: map[string][]chan *topo.WatchData{},
getErrors: []bool{},
listErrors: []bool{},
updateErrors: []updateError{},
}
}

Expand All @@ -122,6 +137,20 @@ func (f *FakeConn) AddGetError(shouldErr bool) {
f.getErrors = append(f.getErrors, shouldErr)
}

// AddListError is used to add a list error to the fake connection
func (f *FakeConn) AddListError(shouldErr bool) {
f.mu.Lock()
defer f.mu.Unlock()
f.listErrors = append(f.listErrors, shouldErr)
}

// AddListResult is used to add a list result to the fake connection
func (f *FakeConn) AddListResult(filePathPrefix string, result []topo.KVInfo) {
f.mu.Lock()
defer f.mu.Unlock()
f.listResultMap[filePathPrefix] = result
}

// AddUpdateError is used to add an update error to the fake connection
func (f *FakeConn) AddUpdateError(shouldErr bool, writePersists bool) {
f.mu.Lock()
Expand Down Expand Up @@ -261,7 +290,20 @@ func (f *FakeConn) GetVersion(ctx context.Context, filePath string, version int6

// List is part of the topo.Conn interface.
func (f *FakeConn) List(ctx context.Context, filePathPrefix string) ([]topo.KVInfo, error) {
return nil, topo.NewError(topo.NoImplementation, "List not supported in fake topo")
f.mu.Lock()
defer f.mu.Unlock()
if len(f.listErrors) > 0 {
shouldErr := f.listErrors[0]
f.listErrors = f.listErrors[1:]
if shouldErr {
return nil, topo.NewError(topo.Timeout, filePathPrefix)
}
}
kvInfos, isPresent := f.listResultMap[filePathPrefix]
if !isPresent {
return nil, topo.NewError(topo.NoNode, filePathPrefix)
}
return kvInfos, nil
}

// Delete implements the Conn interface
Expand Down
67 changes: 40 additions & 27 deletions go/vt/vtorc/logic/tablet_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,26 +150,30 @@ func OpenTabletDiscovery() <-chan time.Time {
return time.Tick(config.GetTopoInformationRefreshDuration()) //nolint SA1015: using time.Tick leaks the underlying ticker
}

// getAllTablets gets all tablets from all cells using a goroutine per cell.
func getAllTablets(ctx context.Context, cells []string) []*topo.TabletInfo {
var tabletsMu sync.Mutex
tablets := make([]*topo.TabletInfo, 0)
// getAllTablets gets all tablets from all cells using a goroutine per cell. It returns a map of
// cells (string) to slices of tablets (as topo.TabletInfo) and a slice of cells (string) that
// failed to return a result.
func getAllTablets(ctx context.Context, cells []string) (tabletsByCell map[string][]*topo.TabletInfo, failedCells []string) {
var mu sync.Mutex
failedCells = make([]string, 0, len(cells))
tabletsByCell = make(map[string][]*topo.TabletInfo, len(cells))
eg, ctx := errgroup.WithContext(ctx)
for _, cell := range cells {
eg.Go(func() error {
t, err := ts.GetTabletsByCell(ctx, cell, nil)
tablets, err := ts.GetTabletsByCell(ctx, cell, nil)
mu.Lock()
defer mu.Unlock()
if err != nil {
log.Errorf("Failed to load tablets from cell %s: %+v", cell, err)
return nil
failedCells = append(failedCells, cell)
} else {
tabletsByCell[cell] = tablets
}
tabletsMu.Lock()
defer tabletsMu.Unlock()
tablets = append(tablets, t...)
return nil
})
}
_ = eg.Wait() // always nil
return tablets
return tabletsByCell, failedCells
}

// refreshAllTablets reloads the tablets from topo and discovers the ones which haven't been refreshed in a while
Expand All @@ -182,35 +186,44 @@ func refreshAllTablets(ctx context.Context) error {
// refreshTabletsUsing refreshes tablets using a provided loader.
func refreshTabletsUsing(ctx context.Context, loader func(tabletAlias string), forceRefresh bool) error {
// Get all cells.
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
cells, err := ts.GetKnownCells(ctx)
cellsCtx, cellsCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cellsCancel()
cells, err := ts.GetKnownCells(cellsCtx)
if err != nil {
return err
}

// Get all tablets from all cells.
getTabletsCtx, getTabletsCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer getTabletsCancel()
tablets := getAllTablets(getTabletsCtx, cells)
if len(tablets) == 0 {
log.Error("Found no tablets")
tabletsByCell, failedCells := getAllTablets(getTabletsCtx, cells)
if len(tabletsByCell) == 0 {
log.Error("Found no cells with tablets")
return nil
}
if len(failedCells) > 0 {
log.Errorf("Got partial topo result. Failed cells: %s", strings.Join(failedCells, ", "))
}

// Filter tablets that should not be watched using shardsToWatch map.
matchedTablets := make([]*topo.TabletInfo, 0, len(tablets))
func() {
for _, t := range tablets {
if shouldWatchTablet(t.Tablet) {
matchedTablets = append(matchedTablets, t)
// Update each cell that provided a response. This ensures only cells that provided a
// response are updated in the backend and are considered for forgetting stale tablets.
for cell, tablets := range tabletsByCell {
// Filter tablets that should not be watched using func shouldWatchTablet.
matchedTablets := make([]*topo.TabletInfo, 0, len(tablets))
func() {
for _, t := range tablets {
if shouldWatchTablet(t.Tablet) {
matchedTablets = append(matchedTablets, t)
}
}
}
}()
}()

// Refresh the filtered tablets and forget stale tablets.
query := "select alias from vitess_tablet where cell = ?"
args := sqlutils.Args(cell)
refreshTablets(matchedTablets, query, args, loader, forceRefresh, nil)
}

// Refresh the filtered tablets.
query := "select alias from vitess_tablet"
refreshTablets(matchedTablets, query, nil, loader, forceRefresh, nil)
return nil
}

Expand Down
60 changes: 60 additions & 0 deletions go/vt/vtorc/logic/tablet_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package logic
import (
"context"
"fmt"
"slices"
"strings"
"sync/atomic"
"testing"
Expand All @@ -34,6 +35,7 @@ import (
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/proto/vttime"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/faketopo"
"vitess.io/vitess/go/vt/topo/memorytopo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vtctl/grpcvtctldserver/testutil"
Expand Down Expand Up @@ -840,3 +842,61 @@ func TestSetReplicationSource(t *testing.T) {
})
}
}

func TestGetAllTablets(t *testing.T) {
tablet := &topodatapb.Tablet{
Hostname: t.Name(),
}
tabletProto, _ := tablet.MarshalVT()

factory := faketopo.NewFakeTopoFactory()

// zone1 (success)
goodCell1 := faketopo.NewFakeConnection()
goodCell1.AddListResult("tablets", []topo.KVInfo{
{
Key: []byte("zone1-00000001"),
Value: tabletProto,
},
})
factory.SetCell("zone1", goodCell1)

// zone2 (success)
goodCell2 := faketopo.NewFakeConnection()
goodCell2.AddListResult("tablets", []topo.KVInfo{
{
Key: []byte("zone2-00000002"),
Value: tabletProto,
},
})
factory.SetCell("zone2", goodCell2)

// zone3 (fail)
badCell1 := faketopo.NewFakeConnection()
badCell1.AddListError(true)
factory.SetCell("zone3", badCell1)

// zone4 (fail)
badCell2 := faketopo.NewFakeConnection()
badCell2.AddListError(true)
factory.SetCell("zone4", badCell2)

oldTs := ts
defer func() {
ts = oldTs
}()
ctx := context.Background()
ts = faketopo.NewFakeTopoServer(ctx, factory)

// confirm zone1 + zone2 succeeded and zone3 + zone4 failed
tabletsByCell, failedCells := getAllTablets(ctx, []string{"zone1", "zone2", "zone3", "zone4"})
require.Len(t, tabletsByCell, 2)
slices.Sort(failedCells)
require.Equal(t, []string{"zone3", "zone4"}, failedCells)
for _, tablets := range tabletsByCell {
require.Len(t, tablets, 1)
for _, tablet := range tablets {
require.Equal(t, t.Name(), tablet.Tablet.GetHostname())
}
}
}

0 comments on commit 1c06427

Please sign in to comment.