Skip to content

Commit

Permalink
Add option to configure max-rate for pg_basebackup
Browse files Browse the repository at this point in the history
Signed-off-by: Aswin Karthik <[email protected]>
  • Loading branch information
aswinkarthik committed May 24, 2019
1 parent a27bcae commit 6ede9b8
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 0 deletions.
1 change: 1 addition & 0 deletions cmd/keeper/cmd/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -1014,6 +1014,7 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {

// Generate hba auth from clusterData
pgm.SetHba(p.generateHBA(cd, db, p.waitSyncStandbysSynced))
pgm.SetPGBasebackupMaxRate(cd.Cluster.Spec.PGBasebackupMaxRate)

var pgParameters common.Parameters

Expand Down
13 changes: 13 additions & 0 deletions internal/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,8 @@ type ClusterSpec struct {
PGHBA []string `json:"pgHBA"`
// Enable automatic pg restart when pg parameters that requires restart changes
AutomaticPgRestart *bool `json:"automaticPgRestart"`
// PGBasebackupMaxRate will be passed as --max-rate flag to pg_basebackup
PGBasebackupMaxRate string `json:"pgBasebackupMaxRate"`
}

type ClusterStatus struct {
Expand Down Expand Up @@ -442,6 +444,17 @@ func (os *ClusterSpec) Validate() error {
if s.InitMode == nil {
return fmt.Errorf("initMode undefined")
}
if s.PGBasebackupMaxRate != "" {
rate, err := common.ParseBytesize(s.PGBasebackupMaxRate)
if err != nil {
return fmt.Errorf("pgBasebackupMaxRate is invalid: %v", err)
}

if rate < 32*common.Kilobyte || rate > 1024*common.Megabyte {
return fmt.Errorf("pgBasebackupMaxRate should be >= 32k and <=1024M")
}
}

for _, replicationSlot := range s.AdditionalMasterReplicationSlots {
if err := validateReplicationSlot(replicationSlot); err != nil {
return err
Expand Down
38 changes: 38 additions & 0 deletions internal/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,41 @@ func TestValidateReplicationSlots(t *testing.T) {
}
}
}

func TestClusterSpec_Validate(t *testing.T) {
t.Run("should validate PGBasebackupMaxRate", func(t *testing.T) {
var initMode = ClusterInitMode("new")
var spec *ClusterSpec

tests := []struct {
rate string
shouldError bool
}{
{"10", true},
{"10k", true},
{"32k", false},
{"10M", false},
{"1024M", false},
{"1023M", false},
{"1025M", true},
{"fM", true},
{"", false},
}
for i, tt := range tests {
spec = &ClusterSpec{
InitMode: &initMode,
PGBasebackupMaxRate: tt.rate,
}

err := spec.Validate()

if tt.shouldError && (err == nil) {
t.Errorf("#%d: expected error for max-rate: %s, actual no error", i, tt.rate)
}

if !tt.shouldError && (err != nil) {
t.Errorf("#%d: unexpected error for max-rate: %s, error: %v", i, tt.rate, err)
}
}
})
}
58 changes: 58 additions & 0 deletions internal/common/bytesize.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2016 Sorint.lab
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
// See the License for the specific language governing permissions and
// limitations under the License.

package common

import (
"fmt"
"regexp"
"strconv"
)

// Bytesize is useful to represent bytes and perform operations on it
type Bytesize int

// Common bytesizes than can be used
const (
Byte Bytesize = 1
Kilobyte = 1024 * Byte
Megabyte = 1024 * Kilobyte
)

const pattern = `([0-9]+)(k|M)`

// ParseBytesize allows to parse the given input into Bytesize
// Allowed formats is [:digit:] suffixed by k (kilobyte) or M (Megabyte)
func ParseBytesize(input string) (Bytesize, error) {
re := regexp.MustCompile(pattern)
allMatches := re.FindStringSubmatch(input)
if len(allMatches) != 3 {
return 0, fmt.Errorf("unable to parse %s: must be of the format %s", input, pattern)
}

size, err := strconv.Atoi(allMatches[1])
if err != nil {
return 0, fmt.Errorf("unable to parse as integer %s: %v", allMatches[1], err)
}

unit := allMatches[2]
switch unit {
case "k":
return Bytesize(size) * Kilobyte, nil
case "M":
return Bytesize(size) * Megabyte, nil
default:
return 0, fmt.Errorf("%s can either be suffixed with k or M", allMatches[2])
}
}
48 changes: 48 additions & 0 deletions internal/common/bytesize_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2016 Sorint.lab
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
// See the License for the specific language governing permissions and
// limitations under the License.

package common

import "testing"

func TestParseBytesize(t *testing.T) {
tests := []struct {
input string
expected Bytesize
shouldError bool
}{
{"10", 0, true},
{"10k", 10 * Kilobyte, false},
{"32k", 32 * Kilobyte, false},
{"10M", 10 * Megabyte, false},
{"fM", 0, true},
{"", 0, true},
}

for i, tt := range tests {
actual, err := ParseBytesize(tt.input)

if tt.shouldError && (err == nil) {
t.Errorf("#%d: expected error for: %s, actual no error", i, tt.input)
}

if !tt.shouldError && (err != nil) {
t.Errorf("#%d: unexpected error for: %s, error: %v", i, tt.input, err)
}

if actual != tt.expected {
t.Errorf("#%d: unexpected value. got: %d, expected: %d", i, actual, tt.expected)
}
}
}
9 changes: 9 additions & 0 deletions internal/postgresql/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type Manager struct {
replUsername string
replPassword string
requestTimeout time.Duration
pgBasebackupMaxRate string
}

type SystemData struct {
Expand Down Expand Up @@ -139,6 +140,11 @@ func (p *Manager) CurHba() []string {
return p.curHba
}

// SetPGBasebackupMaxRate is a setter for pgBasebackupMaxRate
func (p *Manager) SetPGBasebackupMaxRate(rate string) {
p.pgBasebackupMaxRate = rate
}

func (p *Manager) UpdateCurParameters() {
n, err := copystructure.Copy(p.parameters)
if err != nil {
Expand Down Expand Up @@ -827,6 +833,9 @@ func (p *Manager) SyncFromFollowed(followedConnParams ConnParams, replSlot strin
if replSlot != "" {
args = append(args, "--slot", replSlot)
}
if p.pgBasebackupMaxRate != "" {
args = append(args, "--max-rate", p.pgBasebackupMaxRate)
}
cmd := exec.Command(name, args...)

cmd.Env = append(os.Environ(), fmt.Sprintf("PGPASSFILE=%s", pgpass.Name()))
Expand Down

0 comments on commit 6ede9b8

Please sign in to comment.