From 4a8339768b3e55de127d07b1fd68062e453a4933 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Mon, 11 Mar 2019 16:02:40 +0300 Subject: [PATCH] Add --priority keeper option. Sentinel will promote keeper with higher priority than the current one if this is possible. In async mode this is a bit non-deterministic because we always elect node with highest LSN, and under heavy load prioritized node might never report LSN higher than its stronger competitors. However, if nodes are equal this should happen at some moment. In sync mode, we can just elect any of synchronous standbies. Priority can be set during keeper start (--priority) or later with new command 'stolonctl set keeperpriority'. The latter allows to update priority without restarting the keeper (and its Postgres instance), which can be used for controlled failover. Implements #492 --- cmd/keeper/cmd/keeper.go | 16 +++ cmd/sentinel/cmd/sentinel.go | 110 ++++++++++----- cmd/sentinel/cmd/sentinel_test.go | 186 +++++++++++++++++++++++++ cmd/stolonctl/cmd/setkeeperpriority.go | 74 ++++++++++ doc/commands/stolon-keeper.md | 3 +- internal/cluster/cluster.go | 19 ++- internal/cluster/member.go | 3 + tests/integration/ha_test.go | 91 ++++++++++++ tests/integration/utils.go | 11 +- 9 files changed, 472 insertions(+), 41 deletions(-) create mode 100644 cmd/stolonctl/cmd/setkeeperpriority.go diff --git a/cmd/keeper/cmd/keeper.go b/cmd/keeper/cmd/keeper.go index 225d842a6..e4f9ff43f 100644 --- a/cmd/keeper/cmd/keeper.go +++ b/cmd/keeper/cmd/keeper.go @@ -99,6 +99,8 @@ type config struct { uid string dataDir string debug bool + priority int + prioritySpecified bool // true iff explicitly set by user pgListenAddress string pgAdvertiseAddress string pgPort string @@ -143,6 +145,7 @@ func init() { CmdKeeper.PersistentFlags().StringVar(&cfg.pgSUPassword, "pg-su-password", "", "postgres superuser password. Only one of --pg-su-password or --pg-su-passwordfile must be provided. Must be the same for all keepers.") CmdKeeper.PersistentFlags().StringVar(&cfg.pgSUPasswordFile, "pg-su-passwordfile", "", "postgres superuser password file. Only one of --pg-su-password or --pg-su-passwordfile must be provided. Must be the same for all keepers)") CmdKeeper.PersistentFlags().BoolVar(&cfg.debug, "debug", false, "enable debug logging") + CmdKeeper.PersistentFlags().IntVar(&cfg.priority, "priority", 0, "keeper priority, integer. Stolon will promote available keeper with higher priority than current master, if this is possible. Healthy keeper with higher priority will be elected even if current master is online. If not specified, priority is set to 0 on first keeper invocation; on subsequent invocations, last value (which could be also set with 'stolonctl setkeeperpriority') is reused.") CmdKeeper.PersistentFlags().MarkDeprecated("id", "please use --uid") CmdKeeper.PersistentFlags().MarkDeprecated("debug", "use --log-level=debug instead") @@ -440,6 +443,8 @@ type PostgresKeeper struct { pgSUPassword string pgInitialSUUsername string + priority *int // nil means not specified + sleepInterval time.Duration requestTimeout time.Duration @@ -470,6 +475,10 @@ func NewPostgresKeeper(cfg *config, end chan error) (*PostgresKeeper, error) { return nil, fmt.Errorf("cannot get absolute datadir path for %q: %v", cfg.dataDir, err) } + var priority *int = nil + if cfg.prioritySpecified { + priority = &cfg.priority + } p := &PostgresKeeper{ cfg: cfg, @@ -490,6 +499,8 @@ func NewPostgresKeeper(cfg *config, end chan error) (*PostgresKeeper, error) { pgSUPassword: cfg.pgSUPassword, pgInitialSUUsername: cfg.pgInitialSUUsername, + priority: priority, + sleepInterval: cluster.DefaultSleepInterval, requestTimeout: cluster.DefaultRequestTimeout, @@ -562,6 +573,7 @@ func (p *PostgresKeeper) updateKeeperInfo() error { Maj: maj, Min: min, }, + Priority: p.priority, PostgresState: p.getLastPGState(), } @@ -1982,6 +1994,10 @@ func keeper(c *cobra.Command, args []string) { } } + // if --priority wasn't specified explictily, last value is reused, so + // remember it + cfg.prioritySpecified = c.Flags().Changed("priority") + // Open (and create if needed) the lock file. // There is no need to clean up this file since we don't use the file as an actual lock. We get a lock // on the file. So the lock get released when our process stops (or log.Fatalfs). diff --git a/cmd/sentinel/cmd/sentinel.go b/cmd/sentinel/cmd/sentinel.go index 100e381bb..6ab8ee5be 100644 --- a/cmd/sentinel/cmd/sentinel.go +++ b/cmd/sentinel/cmd/sentinel.go @@ -231,6 +231,11 @@ func (s *Sentinel) updateKeepersStatus(cd *cluster.ClusterData, keepersInfo clus } else { s.CleanKeeperError(keeperUID) // Update keeper status infos + // If keeper restarted with specified priority, update it + if ki.Priority != nil && + k.Status.BootUUID != ki.BootUUID { + k.Spec.Priority = *ki.Priority + } k.Status.BootUUID = ki.BootUUID k.Status.PostgresBinaryVersion.Maj = ki.PostgresBinaryVersion.Maj k.Status.PostgresBinaryVersion.Min = ki.PostgresBinaryVersion.Min @@ -687,12 +692,17 @@ func (s *Sentinel) validStandbysByStatus(cd *cluster.ClusterData) (map[string]*c return goodStandbys, failedStandbys, convergingStandbys } -// dbSlice implements sort interface to sort by XLogPos -type dbSlice []*cluster.DB - -func (p dbSlice) Len() int { return len(p) } -func (p dbSlice) Less(i, j int) bool { return p[i].Status.XLogPos < p[j].Status.XLogPos } -func (p dbSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } +// sort dbs by XLogPos and keeper's priority +func sortDBs(cd *cluster.ClusterData, dbs []*cluster.DB) { + sort.Slice(dbs, func(i, j int) bool { + if dbs[i].Status.XLogPos != dbs[j].Status.XLogPos { + return dbs[i].Status.XLogPos < dbs[j].Status.XLogPos + } + pi := cd.Keepers[dbs[i].Spec.KeeperUID].Spec.Priority + pj := cd.Keepers[dbs[j].Spec.KeeperUID].Spec.Priority + return pi < pj + }) +} func (s *Sentinel) findBestStandbys(cd *cluster.ClusterData, masterDB *cluster.DB) []*cluster.DB { goodStandbys, _, _ := s.validStandbysByStatus(cd) @@ -714,7 +724,7 @@ func (s *Sentinel) findBestStandbys(cd *cluster.ClusterData, masterDB *cluster.D bestDBs = append(bestDBs, db) } // Sort by XLogPos - sort.Sort(dbSlice(bestDBs)) + sortDBs(cd, bestDBs) return bestDBs } @@ -744,11 +754,54 @@ func (s *Sentinel) findBestNewMasters(cd *cluster.ClusterData, masterDB *cluster bestNewMasters = append(bestNewMasters, db) } // Sort by XLogPos - sort.Sort(dbSlice(bestNewMasters)) + sortDBs(cd, bestNewMasters) log.Debugf("bestNewMasters: %s", spew.Sdump(bestNewMasters)) return bestNewMasters } +// Return DB who can be new master. This function mostly takes care of +// sync mode; in async case, new master is just first element of findBestNewMasters. +func (s *Sentinel) findBestNewMaster(cd *cluster.ClusterData, curMasterDB *cluster.DB, logErrors bool) *cluster.DB { + bestNewMasters := s.findBestNewMasters(cd, curMasterDB) + if len(bestNewMasters) == 0 { + if logErrors { + log.Errorw("no eligible masters") + } + return nil + } + + // if synchronous replication is enabled, only choose new master in the synchronous replication standbys. + var bestNewMasterDB *cluster.DB = nil + if curMasterDB.Spec.SynchronousReplication == true { + commonSyncStandbys := util.CommonElements(curMasterDB.Status.SynchronousStandbys, curMasterDB.Spec.SynchronousStandbys) + if len(commonSyncStandbys) == 0 { + if logErrors { + log.Warnw("cannot choose synchronous standby since there are no common elements between the latest master reported synchronous standbys and the db spec ones", "reported", curMasterDB.Status.SynchronousStandbys, "spec", curMasterDB.Spec.SynchronousStandbys) + } + return nil + } + // In synchronous mode there is no need to choose DB with + // highest LSN; all found dbs must be in sync, so pick the one + // with highest priority. + var newMasterPriority int + for _, nm := range bestNewMasters { + if util.StringInSlice(commonSyncStandbys, nm.UID) { + nmPriority := cd.Keepers[nm.Spec.KeeperUID].Spec.Priority + if (bestNewMasterDB == nil) || (nmPriority > newMasterPriority) { + bestNewMasterDB = nm + newMasterPriority = nmPriority + } + } + } + if bestNewMasterDB == nil && logErrors { + log.Warnw("cannot choose synchronous standby since there's not match between the possible masters and the usable synchronousStandbys", "reported", curMasterDB.Status.SynchronousStandbys, "spec", curMasterDB.Spec.SynchronousStandbys, "common", commonSyncStandbys, "possibleMasters", bestNewMasters) + } + } else { + bestNewMasterDB = bestNewMasters[0] + } + return bestNewMasterDB +} + func (s *Sentinel) updateCluster(cd *cluster.ClusterData, pis cluster.ProxiesInfo) (*cluster.ClusterData, error) { // take a cd deepCopy to check that the code isn't changing it (it'll be a bug) origcd := cd.DeepCopy() @@ -981,37 +1034,20 @@ func (s *Sentinel) updateCluster(cd *cluster.ClusterData, pis cluster.ProxiesInf masterOK = false } - if !masterOK { - log.Infow("trying to find a new master to replace failed master") - bestNewMasters := s.findBestNewMasters(newcd, curMasterDB) - if len(bestNewMasters) == 0 { - log.Errorw("no eligible masters") + bestNewMasterDB := s.findBestNewMaster(newcd, curMasterDB, !masterOK) + if bestNewMasterDB != nil { + if !masterOK { + log.Infow("electing db as the new master", "db", bestNewMasterDB.UID, "keeper", bestNewMasterDB.Spec.KeeperUID) + wantedMasterDBUID = bestNewMasterDB.UID } else { - // if synchronous replication is enabled, only choose new master in the synchronous replication standbys. - var bestNewMasterDB *cluster.DB - if curMasterDB.Spec.SynchronousReplication == true { - commonSyncStandbys := util.CommonElements(curMasterDB.Status.SynchronousStandbys, curMasterDB.Spec.SynchronousStandbys) - if len(commonSyncStandbys) == 0 { - log.Warnw("cannot choose synchronous standby since there are no common elements between the latest master reported synchronous standbys and the db spec ones", "reported", curMasterDB.Status.SynchronousStandbys, "spec", curMasterDB.Spec.SynchronousStandbys) - } else { - for _, nm := range bestNewMasters { - if util.StringInSlice(commonSyncStandbys, nm.UID) { - bestNewMasterDB = nm - break - } - } - if bestNewMasterDB == nil { - log.Warnw("cannot choose synchronous standby since there's not match between the possible masters and the usable synchronousStandbys", "reported", curMasterDB.Status.SynchronousStandbys, "spec", curMasterDB.Spec.SynchronousStandbys, "common", commonSyncStandbys, "possibleMasters", bestNewMasters) - } - } - } else { - bestNewMasterDB = bestNewMasters[0] - } - if bestNewMasterDB != nil { - log.Infow("electing db as the new master", "db", bestNewMasterDB.UID, "keeper", bestNewMasterDB.Spec.KeeperUID) + // Even if current master is ok, we probably still + // want to change it if there is ready DB with higher + // keeper priority. + curMasterPriority := cd.Keepers[curMasterDB.Spec.KeeperUID].Spec.Priority + newMasterPriority := cd.Keepers[bestNewMasterDB.Spec.KeeperUID].Spec.Priority + if newMasterPriority > curMasterPriority { + log.Infow("electing db as the new master because it has higher priority", "db", bestNewMasterDB.UID, "keeper", bestNewMasterDB.Spec.KeeperUID, "currPriority", curMasterPriority, "newPriority", newMasterPriority) wantedMasterDBUID = bestNewMasterDB.UID - } else { - log.Errorw("no eligible masters") } } } diff --git a/cmd/sentinel/cmd/sentinel_test.go b/cmd/sentinel/cmd/sentinel_test.go index ed576ee1e..836951071 100644 --- a/cmd/sentinel/cmd/sentinel_test.go +++ b/cmd/sentinel/cmd/sentinel_test.go @@ -4961,6 +4961,192 @@ func TestUpdateCluster(t *testing.T) { }, }, }, + // #26 Test keeper's priority. One master and one healthy + // standby. Master is ok, but standy has higher priority and + // gets elected. + { + cd: &cluster.ClusterData{ + Cluster: &cluster.Cluster{ + UID: "cluster1", + Generation: 1, + Spec: &cluster.ClusterSpec{ + ConvergenceTimeout: &cluster.Duration{Duration: cluster.DefaultConvergenceTimeout}, + InitTimeout: &cluster.Duration{Duration: cluster.DefaultInitTimeout}, + SyncTimeout: &cluster.Duration{Duration: cluster.DefaultSyncTimeout}, + MaxStandbysPerSender: cluster.Uint16P(cluster.DefaultMaxStandbysPerSender), + }, + Status: cluster.ClusterStatus{ + CurrentGeneration: 1, + Phase: cluster.ClusterPhaseNormal, + Master: "db1", + }, + }, + Keepers: cluster.Keepers{ + "keeper1": &cluster.Keeper{ + UID: "keeper1", + Spec: &cluster.KeeperSpec{}, + Status: cluster.KeeperStatus{ + Healthy: true, + LastHealthyTime: now, + }, + }, + "keeper2": &cluster.Keeper{ + UID: "keeper2", + Spec: &cluster.KeeperSpec{ + Priority: 1, + }, + Status: cluster.KeeperStatus{ + Healthy: true, + LastHealthyTime: now, + }, + }, + }, + DBs: cluster.DBs{ + "db1": &cluster.DB{ + UID: "db1", + Generation: 1, + ChangeTime: time.Time{}, + Spec: &cluster.DBSpec{ + KeeperUID: "keeper1", + RequestTimeout: cluster.Duration{Duration: cluster.DefaultRequestTimeout}, + MaxStandbys: cluster.DefaultMaxStandbys, + AdditionalWalSenders: cluster.DefaultAdditionalWalSenders, + InitMode: cluster.DBInitModeNone, + SynchronousReplication: false, + Role: common.RoleMaster, + Followers: []string{"db2"}, + SynchronousStandbys: nil, + ExternalSynchronousStandbys: nil, + }, + Status: cluster.DBStatus{ + Healthy: true, + CurrentGeneration: 1, + }, + }, + "db2": &cluster.DB{ + UID: "db2", + Generation: 1, + ChangeTime: time.Time{}, + Spec: &cluster.DBSpec{ + KeeperUID: "keeper2", + RequestTimeout: cluster.Duration{Duration: cluster.DefaultRequestTimeout}, + MaxStandbys: cluster.DefaultMaxStandbys, + AdditionalWalSenders: cluster.DefaultAdditionalWalSenders, + InitMode: cluster.DBInitModeNone, + SynchronousReplication: false, + Role: common.RoleStandby, + Followers: []string{}, + FollowConfig: &cluster.FollowConfig{ + Type: cluster.FollowTypeInternal, + DBUID: "db1", + }, + SynchronousStandbys: nil, + ExternalSynchronousStandbys: nil, + }, + Status: cluster.DBStatus{ + Healthy: true, + CurrentGeneration: 1, + }, + }, + }, + Proxy: &cluster.Proxy{ + Generation: 1, + Spec: cluster.ProxySpec{ + MasterDBUID: "db1", + EnabledProxies: []string{}, + }, + }, + }, + outcd: &cluster.ClusterData{ + Cluster: &cluster.Cluster{ + UID: "cluster1", + Generation: 1, + Spec: &cluster.ClusterSpec{ + ConvergenceTimeout: &cluster.Duration{Duration: cluster.DefaultConvergenceTimeout}, + InitTimeout: &cluster.Duration{Duration: cluster.DefaultInitTimeout}, + SyncTimeout: &cluster.Duration{Duration: cluster.DefaultSyncTimeout}, + MaxStandbysPerSender: cluster.Uint16P(cluster.DefaultMaxStandbysPerSender), + }, + Status: cluster.ClusterStatus{ + CurrentGeneration: 1, + Phase: cluster.ClusterPhaseNormal, + Master: "db2", + }, + }, + Keepers: cluster.Keepers{ + "keeper1": &cluster.Keeper{ + UID: "keeper1", + Spec: &cluster.KeeperSpec{}, + Status: cluster.KeeperStatus{ + Healthy: true, + LastHealthyTime: now, + }, + }, + "keeper2": &cluster.Keeper{ + UID: "keeper2", + Spec: &cluster.KeeperSpec{ + Priority: 1, + }, + Status: cluster.KeeperStatus{ + Healthy: true, + LastHealthyTime: now, + }, + }, + }, + DBs: cluster.DBs{ + "db1": &cluster.DB{ + UID: "db1", + Generation: 2, + ChangeTime: time.Time{}, + Spec: &cluster.DBSpec{ + KeeperUID: "keeper1", + RequestTimeout: cluster.Duration{Duration: cluster.DefaultRequestTimeout}, + MaxStandbys: cluster.DefaultMaxStandbys, + AdditionalWalSenders: cluster.DefaultAdditionalWalSenders, + InitMode: cluster.DBInitModeNone, + SynchronousReplication: false, + Role: common.RoleMaster, + Followers: []string{}, + SynchronousStandbys: nil, + ExternalSynchronousStandbys: nil, + }, + Status: cluster.DBStatus{ + Healthy: true, + CurrentGeneration: 1, + }, + }, + "db2": &cluster.DB{ + UID: "db2", + Generation: 2, + ChangeTime: time.Time{}, + Spec: &cluster.DBSpec{ + KeeperUID: "keeper2", + RequestTimeout: cluster.Duration{Duration: cluster.DefaultRequestTimeout}, + MaxStandbys: cluster.DefaultMaxStandbys, + AdditionalWalSenders: cluster.DefaultAdditionalWalSenders, + InitMode: cluster.DBInitModeNone, + SynchronousReplication: false, + Role: common.RoleMaster, + Followers: []string{}, + FollowConfig: nil, + SynchronousStandbys: nil, + ExternalSynchronousStandbys: nil, + }, + Status: cluster.DBStatus{ + Healthy: true, + CurrentGeneration: 1, + }, + }, + }, + Proxy: &cluster.Proxy{ + Generation: 2, + Spec: cluster.ProxySpec{ + MasterDBUID: "", + EnabledProxies: []string{}, + }, + }, + }, + }, } for i, tt := range tests { diff --git a/cmd/stolonctl/cmd/setkeeperpriority.go b/cmd/stolonctl/cmd/setkeeperpriority.go new file mode 100644 index 000000000..68aa898bc --- /dev/null +++ b/cmd/stolonctl/cmd/setkeeperpriority.go @@ -0,0 +1,74 @@ +// Copyright 2019 Sorint.lab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied +// See the License for the specific language governing permissions and +// limitations under the License. + +package cmd + +import ( + "context" + "strconv" + + cmdcommon "github.com/sorintlab/stolon/cmd" + "github.com/spf13/cobra" +) + +var setKeeperPriorityCmd = &cobra.Command{ + Use: "setkeeperpriority [keeper uid] [priority]", + Short: `Set priority of keeper.`, + Long: `Stolon will promote available keeper with higher priority than current master, +if this is possible. This is the same as --priority keeper option, but allows to +change priority without restarting the keeper (along with underlying Postgres).`, + Run: setKeeperPriority, + Args: cobra.ExactArgs(2), // exactly 2 positional args +} + +func init() { + CmdStolonCtl.AddCommand(setKeeperPriorityCmd) +} + +func setKeeperPriority(cmd *cobra.Command, args []string) { + keeperID := args[0] + priority, err := strconv.Atoi(args[1]) + if err != nil { + die("priority must be integer: %v", err) + } + + store, err := cmdcommon.NewStore(&cfg.CommonConfig) + if err != nil { + die("%v", err) + } + + cd, pair, err := getClusterData(store) + if err != nil { + die("cannot get cluster data: %v", err) + } + if cd.Cluster == nil { + die("no cluster spec available") + } + if cd.Cluster.Spec == nil { + die("no cluster spec available") + } + + newCd := cd.DeepCopy() + keeper := newCd.Keepers[keeperID] + if keeper == nil { + die("keeper doesn't exist") + } + + keeper.Spec.Priority = priority + + _, err = store.AtomicPutClusterData(context.TODO(), newCd, pair) + if err != nil { + die("cannot update cluster data: %v", err) + } +} diff --git a/doc/commands/stolon-keeper.md b/doc/commands/stolon-keeper.md index 768ce5c57..e73597337 100644 --- a/doc/commands/stolon-keeper.md +++ b/doc/commands/stolon-keeper.md @@ -31,6 +31,7 @@ stolon-keeper [flags] --pg-su-password string postgres superuser password. Only one of --pg-su-password or --pg-su-passwordfile must be provided. Must be the same for all keepers. --pg-su-passwordfile string postgres superuser password file. Only one of --pg-su-password or --pg-su-passwordfile must be provided. Must be the same for all keepers) --pg-su-username string postgres superuser user name. Used for keeper managed instance access and pg_rewind based synchronization. It'll be created on db initialization. Defaults to the name of the effective user running stolon-keeper. Must be the same for all keepers. (default "motaboy") + --priority int keeper priority, integer. Stolon will promote available keeper with higher priority than current master, if this is possible. Default is 0. --store-backend string store backend type (etcdv2/etcd, etcdv3, consul or kubernetes) --store-ca-file string verify certificates of HTTPS-enabled store servers using this CA bundle --store-cert-file string certificate file for client identification to the store @@ -41,4 +42,4 @@ stolon-keeper [flags] --uid string keeper uid (must be unique in the cluster and can contain only lower-case letters, numbers and the underscore character). If not provided a random uid will be generated. ``` -###### Auto generated by spf13/cobra on 21-Aug-2018 +###### Auto generated by spf13/cobra on 11-Mar-2019 diff --git a/internal/cluster/cluster.go b/internal/cluster/cluster.go index deda5439b..874b6fa93 100644 --- a/internal/cluster/cluster.go +++ b/internal/cluster/cluster.go @@ -17,6 +17,7 @@ package cluster import ( "encoding/json" "fmt" + "math" "reflect" "sort" "strings" @@ -549,7 +550,15 @@ func NewCluster(uid string, cs *ClusterSpec) *Cluster { return c } -type KeeperSpec struct{} +const ( + // special value meaning user hadn't specified priority on keeper start + NotSpecifiedPrioriity = math.MaxInt32 + DefaultPriority = 0 +) + +type KeeperSpec struct { + Priority int `json:"priority,omitempty"` +} type KeeperStatus struct { Healthy bool `json:"healthy,omitempty"` @@ -574,11 +583,17 @@ type Keeper struct { } func NewKeeperFromKeeperInfo(ki *KeeperInfo) *Keeper { + priority := 0 // default value + if ki.Priority != nil { + priority = *ki.Priority + } return &Keeper{ UID: ki.UID, Generation: InitialGeneration, ChangeTime: time.Time{}, - Spec: &KeeperSpec{}, + Spec: &KeeperSpec{ + Priority: priority, + }, Status: KeeperStatus{ Healthy: true, LastHealthyTime: time.Now(), diff --git a/internal/cluster/member.go b/internal/cluster/member.go index 7a8b06227..8319cf3e0 100644 --- a/internal/cluster/member.go +++ b/internal/cluster/member.go @@ -49,6 +49,9 @@ type KeeperInfo struct { PostgresBinaryVersion PostgresBinaryVersion `json:"postgresBinaryVersion,omitempty"` + // nil means not specified + Priority *int `json:"priority,omitempty"` + PostgresState *PostgresState `json:"postgresState,omitempty"` } diff --git a/tests/integration/ha_test.go b/tests/integration/ha_test.go index 8d59281e5..d23e441ab 100644 --- a/tests/integration/ha_test.go +++ b/tests/integration/ha_test.go @@ -1933,6 +1933,97 @@ func TestForceFailSyncReplStandbyCluster(t *testing.T) { testForceFail(t, false, true) } +func testKeeperPriority(t *testing.T, syncRepl bool, standbyCluster bool) { + dir, err := ioutil.TempDir("", "stolon") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer os.RemoveAll(dir) + + // external primary in standbyCluster mode, nil otherwise + var ptk *TestKeeper + if standbyCluster { + primaryClusterName := uuid.NewV4().String() + ptks, ptss, ptp, ptstore := setupServers(t, primaryClusterName, dir, 1, 1, false, false, nil) + defer shutdown(ptks, ptss, ptp, ptstore) + for _, ptk = range ptks { + break + } + } + + // spin up cluster from single keeper... + clusterName := uuid.NewV4().String() + tks, tss, tp, tstore := setupServers(t, clusterName, dir, 1, 1, syncRepl, false, ptk) + defer shutdown(tks, tss, tp, tstore) + + // wait till it is up and running + storeEndpoints := fmt.Sprintf("%s:%s", tstore.listenAddress, tstore.port) + storePath := filepath.Join(common.StorePrefix, clusterName) + sm := store.NewKVBackedStore(tstore.store, storePath) + + keeper1, _ := waitMasterStandbysReady(t, sm, tks) + + // now add another keeper with higher priority + keeper2, err := NewTestKeeperWithPriority(t, dir, clusterName, pgSUUsername, pgSUPassword, pgReplUsername, pgReplPassword, tstore.storeBackend, storeEndpoints, 1) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + tks[keeper2.uid] = keeper2 + if err := keeper2.Start(); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + // it must become master + if err := keeper2.WaitDBRole(common.RoleMaster, ptk, 30*time.Second); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + // take it down... + t.Logf("Stopping current standby keeper: %s", keeper2.uid) + keeper2.Stop() + // now keeper 1 will be master again + if err := keeper1.WaitDBRole(common.RoleMaster, ptk, 30*time.Second); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + // but if we take keeper 2 up, it will be promoted + if err := keeper2.Start(); err != nil { + t.Fatalf("unexpected err: %v", err) + } + if err := keeper2.WaitDBRole(common.RoleMaster, ptk, 30*time.Second); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + // and if increase keeper 1 priority online, it should become master back again + err = StolonCtl(t, clusterName, tstore.storeBackend, storeEndpoints, "setkeeperpriority", keeper1.Process.uid, "2") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if err := keeper1.WaitDBRole(common.RoleMaster, ptk, 30*time.Second); err != nil { + t.Fatalf("unexpected err: %v", err) + } +} + +func TestKeeperPriority(t *testing.T) { + t.Parallel() + testKeeperPriority(t, false, false) +} + +func TestKeeperPrioritySyncRepl(t *testing.T) { + t.Parallel() + testKeeperPriority(t, true, false) +} + +func TestKeeperPriorityStandbyCluster(t *testing.T) { + t.Parallel() + testKeeperPriority(t, false, true) +} + +func TestKeeperPrioritySyncReplStandbyCluster(t *testing.T) { + t.Parallel() + testKeeperPriority(t, false, true) +} + // TestSyncStandbyNotInSync tests that, when using synchronous replication, a // normal user cannot connect to primary db after it has restarted until all // defined synchronous standbys are in sync. diff --git a/tests/integration/utils.go b/tests/integration/utils.go index c067f1cc4..d16ee55b5 100644 --- a/tests/integration/utils.go +++ b/tests/integration/utils.go @@ -331,6 +331,10 @@ type TestKeeper struct { } func NewTestKeeperWithID(t *testing.T, dir, uid, clusterName, pgSUUsername, pgSUPassword, pgReplUsername, pgReplPassword string, storeBackend store.Backend, storeEndpoints string, a ...string) (*TestKeeper, error) { + return NewTestKeeperWithIDWithPriority(t, dir, uid, clusterName, pgSUUsername, pgSUPassword, pgReplUsername, pgReplPassword, storeBackend, storeEndpoints, 0) +} + +func NewTestKeeperWithIDWithPriority(t *testing.T, dir, uid, clusterName, pgSUUsername, pgSUPassword, pgReplUsername, pgReplPassword string, storeBackend store.Backend, storeEndpoints string, priority int, a ...string) (*TestKeeper, error) { args := []string{} dataDir := filepath.Join(dir, fmt.Sprintf("st%s", uid)) @@ -353,6 +357,7 @@ func NewTestKeeperWithID(t *testing.T, dir, uid, clusterName, pgSUUsername, pgSU } args = append(args, fmt.Sprintf("--pg-repl-username=%s", pgReplUsername)) args = append(args, fmt.Sprintf("--pg-repl-password=%s", pgReplPassword)) + args = append(args, fmt.Sprintf("--priority=%d", priority)) if os.Getenv("DEBUG") != "" { args = append(args, "--debug") } @@ -416,10 +421,14 @@ func NewTestKeeperWithID(t *testing.T, dir, uid, clusterName, pgSUUsername, pgSU } func NewTestKeeper(t *testing.T, dir, clusterName, pgSUUsername, pgSUPassword, pgReplUsername, pgReplPassword string, storeBackend store.Backend, storeEndpoints string, a ...string) (*TestKeeper, error) { + return NewTestKeeperWithPriority(t, dir, clusterName, pgSUUsername, pgSUPassword, pgReplUsername, pgReplPassword, storeBackend, storeEndpoints, 0, a...) +} + +func NewTestKeeperWithPriority(t *testing.T, dir, clusterName, pgSUUsername, pgSUPassword, pgReplUsername, pgReplPassword string, storeBackend store.Backend, storeEndpoints string, priority int, a ...string) (*TestKeeper, error) { u := uuid.NewV4() uid := fmt.Sprintf("%x", u[:4]) - return NewTestKeeperWithID(t, dir, uid, clusterName, pgSUUsername, pgSUPassword, pgReplUsername, pgReplPassword, storeBackend, storeEndpoints, a...) + return NewTestKeeperWithIDWithPriority(t, dir, uid, clusterName, pgSUUsername, pgSUPassword, pgReplUsername, pgReplPassword, storeBackend, storeEndpoints, priority, a...) } func (tk *TestKeeper) PGDataVersion() (int, int, error) {