diff --git a/cmd/keeper/cmd/keeper.go b/cmd/keeper/cmd/keeper.go index b5f815018..a4638d95c 100644 --- a/cmd/keeper/cmd/keeper.go +++ b/cmd/keeper/cmd/keeper.go @@ -99,6 +99,8 @@ type config struct { uid string dataDir string debug bool + priority int + prioritySpecified bool // true iff explicitly set by user pgListenAddress string pgAdvertiseAddress string pgPort string @@ -139,6 +141,7 @@ func init() { CmdKeeper.PersistentFlags().StringVar(&cfg.pgSUPassword, "pg-su-password", "", "postgres superuser password. Only one of --pg-su-password or --pg-su-passwordfile must be provided. Must be the same for all keepers.") CmdKeeper.PersistentFlags().StringVar(&cfg.pgSUPasswordFile, "pg-su-passwordfile", "", "postgres superuser password file. Only one of --pg-su-password or --pg-su-passwordfile must be provided. Must be the same for all keepers)") CmdKeeper.PersistentFlags().BoolVar(&cfg.debug, "debug", false, "enable debug logging") + CmdKeeper.PersistentFlags().IntVar(&cfg.priority, "priority", 0, "keeper priority, integer. Stolon will promote available keeper with higher priority than current master, if this is possible. Healthy keeper with higher priority will be elected even if current master is online. If not specified, priority is set to "+strconv.Itoa(cluster.DefaultPriority)+" on first keeper invocation; on subsequent invocations, last value (which could be also set with 'stolonctl setkeeperpriority') is reused.") CmdKeeper.PersistentFlags().BoolVar(&cfg.canBeMaster, "can-be-master", true, "prevent keeper from being elected as master") CmdKeeper.PersistentFlags().BoolVar(&cfg.canBeSynchronousReplica, "can-be-synchronous-replica", true, "prevent keeper from being chosen as synchronous replica") @@ -451,6 +454,8 @@ type PostgresKeeper struct { pgSUUsername string pgSUPassword string + priority *int // nil means not specified + sleepInterval time.Duration requestTimeout time.Duration @@ -484,6 +489,10 @@ func NewPostgresKeeper(cfg *config, end chan error) (*PostgresKeeper, error) { return nil, fmt.Errorf("cannot get absolute datadir path for %q: %v", cfg.dataDir, err) } + var priority *int = nil + if cfg.prioritySpecified { + priority = &cfg.priority + } p := &PostgresKeeper{ cfg: cfg, @@ -503,6 +512,8 @@ func NewPostgresKeeper(cfg *config, end chan error) (*PostgresKeeper, error) { pgSUUsername: cfg.pgSUUsername, pgSUPassword: cfg.pgSUPassword, + priority: priority, + sleepInterval: cluster.DefaultSleepInterval, requestTimeout: cluster.DefaultRequestTimeout, @@ -578,6 +589,7 @@ func (p *PostgresKeeper) updateKeeperInfo() error { Maj: maj, Min: min, }, + Priority: p.priority, PostgresState: p.getLastPGState(), CanBeMaster: p.canBeMaster, @@ -2029,6 +2041,10 @@ func keeper(c *cobra.Command, args []string) { } } + // if --priority wasn't specified explictily, last value is reused, so + // remember it + cfg.prioritySpecified = c.Flags().Changed("priority") + // Open (and create if needed) the lock file. // There is no need to clean up this file since we don't use the file as an actual lock. We get a lock // on the file. So the lock get released when our process stops (or log.Fatalfs). diff --git a/cmd/sentinel/cmd/sentinel.go b/cmd/sentinel/cmd/sentinel.go index 96a44c59a..319e26dc0 100644 --- a/cmd/sentinel/cmd/sentinel.go +++ b/cmd/sentinel/cmd/sentinel.go @@ -243,6 +243,11 @@ func (s *Sentinel) updateKeepersStatus(cd *cluster.ClusterData, keepersInfo clus } else { s.CleanKeeperError(keeperUID) // Update keeper status infos + // If keeper restarted with specified priority, update it + if ki.Priority != nil && + k.Status.BootUUID != ki.BootUUID { + k.Spec.Priority = *ki.Priority + } k.Status.BootUUID = ki.BootUUID k.Status.PostgresBinaryVersion.Maj = ki.PostgresBinaryVersion.Maj k.Status.PostgresBinaryVersion.Min = ki.PostgresBinaryVersion.Min @@ -699,12 +704,17 @@ func (s *Sentinel) validStandbysByStatus(cd *cluster.ClusterData) (map[string]*c return goodStandbys, failedStandbys, convergingStandbys } -// dbSlice implements sort interface to sort by XLogPos -type dbSlice []*cluster.DB - -func (p dbSlice) Len() int { return len(p) } -func (p dbSlice) Less(i, j int) bool { return p[i].Status.XLogPos < p[j].Status.XLogPos } -func (p dbSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } +// sort dbs by XLogPos and keeper's priority +func sortDBs(cd *cluster.ClusterData, dbs []*cluster.DB) { + sort.Slice(dbs, func(i, j int) bool { + if dbs[i].Status.XLogPos != dbs[j].Status.XLogPos { + return dbs[i].Status.XLogPos < dbs[j].Status.XLogPos + } + pi := cd.Keepers[dbs[i].Spec.KeeperUID].Spec.Priority + pj := cd.Keepers[dbs[j].Spec.KeeperUID].Spec.Priority + return pi < pj + }) +} func (s *Sentinel) findBestStandbys(cd *cluster.ClusterData, masterDB *cluster.DB) []*cluster.DB { goodStandbys, _, _ := s.validStandbysByStatus(cd) @@ -726,7 +736,7 @@ func (s *Sentinel) findBestStandbys(cd *cluster.ClusterData, masterDB *cluster.D bestDBs = append(bestDBs, db) } // Sort by XLogPos - sort.Sort(dbSlice(bestDBs)) + sortDBs(cd, bestDBs) return bestDBs } @@ -773,11 +783,54 @@ func (s *Sentinel) findBestNewMasters(cd *cluster.ClusterData, masterDB *cluster } // Sort by XLogPos - sort.Sort(dbSlice(bestNewMasters)) + sortDBs(cd, bestNewMasters) log.Debugf("bestNewMasters: %s", spew.Sdump(bestNewMasters)) return bestNewMasters } +// findBestNewMaster returns the DB who can be a new master. This function mostly takes care of +// sync mode; in async case new master is just a first element of findBestNewMasters. +func (s *Sentinel) findBestNewMaster(cd *cluster.ClusterData, curMasterDB *cluster.DB, logErrors bool) *cluster.DB { + bestNewMasters := s.findBestNewMasters(cd, curMasterDB) + if len(bestNewMasters) == 0 { + if logErrors { + log.Errorw("no eligible masters") + } + return nil + } + + // if synchronous replication is enabled, only choose new master in the synchronous replication standbys. + var bestNewMasterDB *cluster.DB = nil + if curMasterDB.Spec.SynchronousReplication { + commonSyncStandbys := util.CommonElements(curMasterDB.Status.SynchronousStandbys, curMasterDB.Spec.SynchronousStandbys) + if len(commonSyncStandbys) == 0 { + if logErrors { + log.Warnw("cannot choose synchronous standby since there are no common elements between the latest master reported synchronous standbys and the db spec ones", "reported", curMasterDB.Status.SynchronousStandbys, "spec", curMasterDB.Spec.SynchronousStandbys) + } + return nil + } + // In synchronous mode there is no need to choose DB with + // highest LSN; all found dbs must be in sync, so pick the one + // with highest priority. + var newMasterPriority int + for _, nm := range bestNewMasters { + if util.StringInSlice(commonSyncStandbys, nm.UID) { + nmPriority := cd.Keepers[nm.Spec.KeeperUID].Spec.Priority + if (bestNewMasterDB == nil) || (nmPriority > newMasterPriority) { + bestNewMasterDB = nm + newMasterPriority = nmPriority + } + } + } + if bestNewMasterDB == nil && logErrors { + log.Warnw("cannot choose synchronous standby since there's not match between the possible masters and the usable synchronousStandbys", "reported", curMasterDB.Status.SynchronousStandbys, "spec", curMasterDB.Spec.SynchronousStandbys, "common", commonSyncStandbys, "possibleMasters", bestNewMasters) + } + } else { + bestNewMasterDB = bestNewMasters[0] + } + return bestNewMasterDB +} + func (s *Sentinel) updateCluster(cd *cluster.ClusterData, pis cluster.ProxiesInfo) (*cluster.ClusterData, error) { // take a cd deepCopy to check that the code isn't changing it (it'll be a bug) origcd := cd.DeepCopy() @@ -1002,37 +1055,20 @@ func (s *Sentinel) updateCluster(cd *cluster.ClusterData, pis cluster.ProxiesInf masterOK = false } - if !masterOK { - log.Infow("trying to find a new master to replace failed master") - bestNewMasters := s.findBestNewMasters(newcd, curMasterDB) - if len(bestNewMasters) == 0 { - log.Errorw("no eligible masters") + bestNewMasterDB := s.findBestNewMaster(newcd, curMasterDB, !masterOK) + if bestNewMasterDB != nil { + if !masterOK { + log.Infow("electing db as the new master", "db", bestNewMasterDB.UID, "keeper", bestNewMasterDB.Spec.KeeperUID) + wantedMasterDBUID = bestNewMasterDB.UID } else { - // if synchronous replication is enabled, only choose new master in the synchronous replication standbys. - var bestNewMasterDB *cluster.DB - if curMasterDB.Spec.SynchronousReplication { - commonSyncStandbys := util.CommonElements(curMasterDB.Status.SynchronousStandbys, curMasterDB.Spec.SynchronousStandbys) - if len(commonSyncStandbys) == 0 { - log.Warnw("cannot choose synchronous standby since there are no common elements between the latest master reported synchronous standbys and the db spec ones", "reported", curMasterDB.Status.SynchronousStandbys, "spec", curMasterDB.Spec.SynchronousStandbys) - } else { - for _, nm := range bestNewMasters { - if util.StringInSlice(commonSyncStandbys, nm.UID) { - bestNewMasterDB = nm - break - } - } - if bestNewMasterDB == nil { - log.Warnw("cannot choose synchronous standby since there's not match between the possible masters and the usable synchronousStandbys", "reported", curMasterDB.Status.SynchronousStandbys, "spec", curMasterDB.Spec.SynchronousStandbys, "common", commonSyncStandbys, "possibleMasters", bestNewMasters) - } - } - } else { - bestNewMasterDB = bestNewMasters[0] - } - if bestNewMasterDB != nil { - log.Infow("electing db as the new master", "db", bestNewMasterDB.UID, "keeper", bestNewMasterDB.Spec.KeeperUID) + // Even if current master is ok, we probably still + // want to change it if there is ready DB with higher + // keeper priority. + curMasterPriority := cd.Keepers[curMasterDB.Spec.KeeperUID].Spec.Priority + newMasterPriority := cd.Keepers[bestNewMasterDB.Spec.KeeperUID].Spec.Priority + if newMasterPriority > curMasterPriority { + log.Infow("electing db as the new master because it has higher priority", "db", bestNewMasterDB.UID, "keeper", bestNewMasterDB.Spec.KeeperUID, "currPriority", curMasterPriority, "newPriority", newMasterPriority) wantedMasterDBUID = bestNewMasterDB.UID - } else { - log.Errorw("no eligible masters") } } } diff --git a/cmd/sentinel/cmd/sentinel_test.go b/cmd/sentinel/cmd/sentinel_test.go index 67851ce24..2070ee0ce 100644 --- a/cmd/sentinel/cmd/sentinel_test.go +++ b/cmd/sentinel/cmd/sentinel_test.go @@ -5349,6 +5349,192 @@ func TestUpdateCluster(t *testing.T) { }, }, }, + // #28 Test keeper's priority. One master and one healthy + // standby. Master is ok, but standy has higher priority and + // gets elected. + { + cd: &cluster.ClusterData{ + Cluster: &cluster.Cluster{ + UID: "cluster1", + Generation: 1, + Spec: &cluster.ClusterSpec{ + ConvergenceTimeout: &cluster.Duration{Duration: cluster.DefaultConvergenceTimeout}, + InitTimeout: &cluster.Duration{Duration: cluster.DefaultInitTimeout}, + SyncTimeout: &cluster.Duration{Duration: cluster.DefaultSyncTimeout}, + MaxStandbysPerSender: cluster.Uint16P(cluster.DefaultMaxStandbysPerSender), + }, + Status: cluster.ClusterStatus{ + CurrentGeneration: 1, + Phase: cluster.ClusterPhaseNormal, + Master: "db1", + }, + }, + Keepers: cluster.Keepers{ + "keeper1": &cluster.Keeper{ + UID: "keeper1", + Spec: &cluster.KeeperSpec{}, + Status: cluster.KeeperStatus{ + Healthy: true, + LastHealthyTime: now, + }, + }, + "keeper2": &cluster.Keeper{ + UID: "keeper2", + Spec: &cluster.KeeperSpec{ + Priority: 1, + }, + Status: cluster.KeeperStatus{ + Healthy: true, + LastHealthyTime: now, + }, + }, + }, + DBs: cluster.DBs{ + "db1": &cluster.DB{ + UID: "db1", + Generation: 1, + ChangeTime: time.Time{}, + Spec: &cluster.DBSpec{ + KeeperUID: "keeper1", + RequestTimeout: cluster.Duration{Duration: cluster.DefaultRequestTimeout}, + MaxStandbys: cluster.DefaultMaxStandbys, + AdditionalWalSenders: cluster.DefaultAdditionalWalSenders, + InitMode: cluster.DBInitModeNone, + SynchronousReplication: false, + Role: common.RoleMaster, + Followers: []string{"db2"}, + SynchronousStandbys: nil, + ExternalSynchronousStandbys: nil, + }, + Status: cluster.DBStatus{ + Healthy: true, + CurrentGeneration: 1, + }, + }, + "db2": &cluster.DB{ + UID: "db2", + Generation: 1, + ChangeTime: time.Time{}, + Spec: &cluster.DBSpec{ + KeeperUID: "keeper2", + RequestTimeout: cluster.Duration{Duration: cluster.DefaultRequestTimeout}, + MaxStandbys: cluster.DefaultMaxStandbys, + AdditionalWalSenders: cluster.DefaultAdditionalWalSenders, + InitMode: cluster.DBInitModeNone, + SynchronousReplication: false, + Role: common.RoleStandby, + Followers: []string{}, + FollowConfig: &cluster.FollowConfig{ + Type: cluster.FollowTypeInternal, + DBUID: "db1", + }, + SynchronousStandbys: nil, + ExternalSynchronousStandbys: nil, + }, + Status: cluster.DBStatus{ + Healthy: true, + CurrentGeneration: 1, + }, + }, + }, + Proxy: &cluster.Proxy{ + Generation: 1, + Spec: cluster.ProxySpec{ + MasterDBUID: "db1", + EnabledProxies: []string{}, + }, + }, + }, + outcd: &cluster.ClusterData{ + Cluster: &cluster.Cluster{ + UID: "cluster1", + Generation: 1, + Spec: &cluster.ClusterSpec{ + ConvergenceTimeout: &cluster.Duration{Duration: cluster.DefaultConvergenceTimeout}, + InitTimeout: &cluster.Duration{Duration: cluster.DefaultInitTimeout}, + SyncTimeout: &cluster.Duration{Duration: cluster.DefaultSyncTimeout}, + MaxStandbysPerSender: cluster.Uint16P(cluster.DefaultMaxStandbysPerSender), + }, + Status: cluster.ClusterStatus{ + CurrentGeneration: 1, + Phase: cluster.ClusterPhaseNormal, + Master: "db2", + }, + }, + Keepers: cluster.Keepers{ + "keeper1": &cluster.Keeper{ + UID: "keeper1", + Spec: &cluster.KeeperSpec{}, + Status: cluster.KeeperStatus{ + Healthy: true, + LastHealthyTime: now, + }, + }, + "keeper2": &cluster.Keeper{ + UID: "keeper2", + Spec: &cluster.KeeperSpec{ + Priority: 1, + }, + Status: cluster.KeeperStatus{ + Healthy: true, + LastHealthyTime: now, + }, + }, + }, + DBs: cluster.DBs{ + "db1": &cluster.DB{ + UID: "db1", + Generation: 2, + ChangeTime: time.Time{}, + Spec: &cluster.DBSpec{ + KeeperUID: "keeper1", + RequestTimeout: cluster.Duration{Duration: cluster.DefaultRequestTimeout}, + MaxStandbys: cluster.DefaultMaxStandbys, + AdditionalWalSenders: cluster.DefaultAdditionalWalSenders, + InitMode: cluster.DBInitModeNone, + SynchronousReplication: false, + Role: common.RoleMaster, + Followers: []string{}, + SynchronousStandbys: nil, + ExternalSynchronousStandbys: nil, + }, + Status: cluster.DBStatus{ + Healthy: true, + CurrentGeneration: 1, + }, + }, + "db2": &cluster.DB{ + UID: "db2", + Generation: 2, + ChangeTime: time.Time{}, + Spec: &cluster.DBSpec{ + KeeperUID: "keeper2", + RequestTimeout: cluster.Duration{Duration: cluster.DefaultRequestTimeout}, + MaxStandbys: cluster.DefaultMaxStandbys, + AdditionalWalSenders: cluster.DefaultAdditionalWalSenders, + InitMode: cluster.DBInitModeNone, + SynchronousReplication: false, + Role: common.RoleMaster, + Followers: []string{}, + FollowConfig: nil, + SynchronousStandbys: nil, + ExternalSynchronousStandbys: nil, + }, + Status: cluster.DBStatus{ + Healthy: true, + CurrentGeneration: 1, + }, + }, + }, + Proxy: &cluster.Proxy{ + Generation: 2, + Spec: cluster.ProxySpec{ + MasterDBUID: "", + EnabledProxies: []string{}, + }, + }, + }, + }, } for i, tt := range tests { diff --git a/cmd/stolonctl/cmd/setkeeperpriority.go b/cmd/stolonctl/cmd/setkeeperpriority.go new file mode 100644 index 000000000..68aa898bc --- /dev/null +++ b/cmd/stolonctl/cmd/setkeeperpriority.go @@ -0,0 +1,74 @@ +// Copyright 2019 Sorint.lab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied +// See the License for the specific language governing permissions and +// limitations under the License. + +package cmd + +import ( + "context" + "strconv" + + cmdcommon "github.com/sorintlab/stolon/cmd" + "github.com/spf13/cobra" +) + +var setKeeperPriorityCmd = &cobra.Command{ + Use: "setkeeperpriority [keeper uid] [priority]", + Short: `Set priority of keeper.`, + Long: `Stolon will promote available keeper with higher priority than current master, +if this is possible. This is the same as --priority keeper option, but allows to +change priority without restarting the keeper (along with underlying Postgres).`, + Run: setKeeperPriority, + Args: cobra.ExactArgs(2), // exactly 2 positional args +} + +func init() { + CmdStolonCtl.AddCommand(setKeeperPriorityCmd) +} + +func setKeeperPriority(cmd *cobra.Command, args []string) { + keeperID := args[0] + priority, err := strconv.Atoi(args[1]) + if err != nil { + die("priority must be integer: %v", err) + } + + store, err := cmdcommon.NewStore(&cfg.CommonConfig) + if err != nil { + die("%v", err) + } + + cd, pair, err := getClusterData(store) + if err != nil { + die("cannot get cluster data: %v", err) + } + if cd.Cluster == nil { + die("no cluster spec available") + } + if cd.Cluster.Spec == nil { + die("no cluster spec available") + } + + newCd := cd.DeepCopy() + keeper := newCd.Keepers[keeperID] + if keeper == nil { + die("keeper doesn't exist") + } + + keeper.Spec.Priority = priority + + _, err = store.AtomicPutClusterData(context.TODO(), newCd, pair) + if err != nil { + die("cannot update cluster data: %v", err) + } +} diff --git a/internal/cluster/cluster.go b/internal/cluster/cluster.go index 9578c82bc..e4a8c2d94 100644 --- a/internal/cluster/cluster.go +++ b/internal/cluster/cluster.go @@ -567,7 +567,13 @@ func NewCluster(uid string, cs *ClusterSpec) *Cluster { return c } -type KeeperSpec struct{} +const ( + DefaultPriority = 0 +) + +type KeeperSpec struct { + Priority int `json:"priority,omitempty"` +} type KeeperStatus struct { Healthy bool `json:"healthy,omitempty"` @@ -595,11 +601,17 @@ type Keeper struct { } func NewKeeperFromKeeperInfo(ki *KeeperInfo) *Keeper { + priority := DefaultPriority + if ki.Priority != nil { + priority = *ki.Priority + } return &Keeper{ UID: ki.UID, Generation: InitialGeneration, ChangeTime: time.Time{}, - Spec: &KeeperSpec{}, + Spec: &KeeperSpec{ + Priority: priority, + }, Status: KeeperStatus{ Healthy: true, LastHealthyTime: time.Now(), diff --git a/internal/cluster/member.go b/internal/cluster/member.go index e6e2b793e..99292fff4 100644 --- a/internal/cluster/member.go +++ b/internal/cluster/member.go @@ -50,6 +50,9 @@ type KeeperInfo struct { PostgresBinaryVersion PostgresBinaryVersion `json:"postgresBinaryVersion,omitempty"` + // nil means not specified + Priority *int `json:"priority,omitempty"` + PostgresState *PostgresState `json:"postgresState,omitempty"` CanBeMaster *bool `json:"canBeMaster,omitempty"` diff --git a/tests/integration/ha_test.go b/tests/integration/ha_test.go index a32f51d4f..b2ea9f260 100644 --- a/tests/integration/ha_test.go +++ b/tests/integration/ha_test.go @@ -1955,6 +1955,97 @@ func TestForceFailSyncReplStandbyCluster(t *testing.T) { testForceFail(t, false, true) } +func testKeeperPriority(t *testing.T, syncRepl bool, standbyCluster bool) { + dir, err := ioutil.TempDir("", "stolon") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer os.RemoveAll(dir) + + // external primary in standbyCluster mode, nil otherwise + var ptk *TestKeeper + if standbyCluster { + primaryClusterName := uuid.NewV4().String() + ptks, ptss, ptp, ptstore := setupServers(t, primaryClusterName, dir, 1, 1, false, false, nil) + defer shutdown(ptks, ptss, ptp, ptstore) + for _, ptk = range ptks { + break + } + } + + // spin up cluster from single keeper... + clusterName := uuid.NewV4().String() + tks, tss, tp, tstore := setupServers(t, clusterName, dir, 1, 1, syncRepl, false, ptk) + defer shutdown(tks, tss, tp, tstore) + + // wait till it is up and running + storeEndpoints := fmt.Sprintf("%s:%s", tstore.listenAddress, tstore.port) + storePath := filepath.Join(common.StorePrefix, clusterName) + sm := store.NewKVBackedStore(tstore.store, storePath) + + keeper1, _ := waitMasterStandbysReady(t, sm, tks) + + // now add another keeper with higher priority + keeper2, err := NewTestKeeperWithPriority(t, dir, clusterName, pgSUUsername, pgSUPassword, pgReplUsername, pgReplPassword, tstore.storeBackend, storeEndpoints, 1) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + tks[keeper2.uid] = keeper2 + if err := keeper2.Start(); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + // it must become master + if err := keeper2.WaitDBRole(common.RoleMaster, ptk, 30*time.Second); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + // take it down... + t.Logf("Stopping current standby keeper: %s", keeper2.uid) + keeper2.Stop() + // now keeper 1 will be master again + if err := keeper1.WaitDBRole(common.RoleMaster, ptk, 30*time.Second); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + // but if we take keeper 2 up, it will be promoted + if err := keeper2.Start(); err != nil { + t.Fatalf("unexpected err: %v", err) + } + if err := keeper2.WaitDBRole(common.RoleMaster, ptk, 30*time.Second); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + // and if increase keeper 1 priority online, it should become master back again + err = StolonCtl(t, clusterName, tstore.storeBackend, storeEndpoints, "setkeeperpriority", keeper1.Process.uid, "2") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if err := keeper1.WaitDBRole(common.RoleMaster, ptk, 30*time.Second); err != nil { + t.Fatalf("unexpected err: %v", err) + } +} + +func TestKeeperPriority(t *testing.T) { + t.Parallel() + testKeeperPriority(t, false, false) +} + +func TestKeeperPrioritySyncRepl(t *testing.T) { + t.Parallel() + testKeeperPriority(t, true, false) +} + +func TestKeeperPriorityStandbyCluster(t *testing.T) { + t.Parallel() + testKeeperPriority(t, false, true) +} + +func TestKeeperPrioritySyncReplStandbyCluster(t *testing.T) { + t.Parallel() + testKeeperPriority(t, false, true) +} + // TestSyncStandbyNotInSync tests that, when using synchronous replication, a // normal user cannot connect to primary db after it has restarted until all // defined synchronous standbys are in sync. diff --git a/tests/integration/utils.go b/tests/integration/utils.go index 58d446cc8..6ab87407d 100644 --- a/tests/integration/utils.go +++ b/tests/integration/utils.go @@ -337,6 +337,10 @@ type TestKeeper struct { } func NewTestKeeperWithID(t *testing.T, dir, uid, clusterName, pgSUUsername, pgSUPassword, pgReplUsername, pgReplPassword string, storeBackend store.Backend, storeEndpoints string, a ...string) (*TestKeeper, error) { + return NewTestKeeperWithIDWithPriority(t, dir, uid, clusterName, pgSUUsername, pgSUPassword, pgReplUsername, pgReplPassword, storeBackend, storeEndpoints, 0) +} + +func NewTestKeeperWithIDWithPriority(t *testing.T, dir, uid, clusterName, pgSUUsername, pgSUPassword, pgReplUsername, pgReplPassword string, storeBackend store.Backend, storeEndpoints string, priority int, a ...string) (*TestKeeper, error) { args := []string{} dataDir := filepath.Join(dir, fmt.Sprintf("st%s", uid)) @@ -359,6 +363,7 @@ func NewTestKeeperWithID(t *testing.T, dir, uid, clusterName, pgSUUsername, pgSU } args = append(args, fmt.Sprintf("--pg-repl-username=%s", pgReplUsername)) args = append(args, fmt.Sprintf("--pg-repl-password=%s", pgReplPassword)) + args = append(args, fmt.Sprintf("--priority=%d", priority)) if os.Getenv("DEBUG") != "" { args = append(args, "--debug") } @@ -422,10 +427,14 @@ func NewTestKeeperWithID(t *testing.T, dir, uid, clusterName, pgSUUsername, pgSU } func NewTestKeeper(t *testing.T, dir, clusterName, pgSUUsername, pgSUPassword, pgReplUsername, pgReplPassword string, storeBackend store.Backend, storeEndpoints string, a ...string) (*TestKeeper, error) { + return NewTestKeeperWithPriority(t, dir, clusterName, pgSUUsername, pgSUPassword, pgReplUsername, pgReplPassword, storeBackend, storeEndpoints, 0, a...) +} + +func NewTestKeeperWithPriority(t *testing.T, dir, clusterName, pgSUUsername, pgSUPassword, pgReplUsername, pgReplPassword string, storeBackend store.Backend, storeEndpoints string, priority int, a ...string) (*TestKeeper, error) { u := uuid.NewV4() uid := fmt.Sprintf("%x", u[:4]) - return NewTestKeeperWithID(t, dir, uid, clusterName, pgSUUsername, pgSUPassword, pgReplUsername, pgReplPassword, storeBackend, storeEndpoints, a...) + return NewTestKeeperWithIDWithPriority(t, dir, uid, clusterName, pgSUUsername, pgSUPassword, pgReplUsername, pgReplPassword, storeBackend, storeEndpoints, priority, a...) } func (tk *TestKeeper) PGDataVersion() (int, int, error) {