Skip to content

Commit

Permalink
add kind
Browse files Browse the repository at this point in the history
Signed-off-by: Oleg Vasilev <[email protected]>
  • Loading branch information
Omrigan committed Jul 4, 2024
1 parent c795a99 commit 7331087
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 28 deletions.
47 changes: 31 additions & 16 deletions pkg/agent/core/logiclock/logiclock.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,45 @@ import (
vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1"
)

type Kind string

const (
KindUpscale Kind = "upscale"
KindDownscale Kind = "downscale"
)

type measurement struct {
createdAt time.Time
kind Kind
}

type Clock struct {
cb func(time.Duration)
times []time.Time
offset int64
cb func(time.Duration, Kind)
measurements []measurement
offset int64
}

func NewClock(cb func(time.Duration)) *Clock {
func NewClock(cb func(time.Duration, Kind)) *Clock {
return &Clock{
cb: cb,
times: nil,
offset: 0,
cb: cb,
measurements: nil,
offset: 0,
}
}

func (c *Clock) NextValue() int64 {
return c.offset + int64(len(c.times))
return c.offset + int64(len(c.measurements))
}

func (c *Clock) Next(now time.Time) *vmv1.LogicalTime {
func (c *Clock) Next(now time.Time, kind Kind) *vmv1.LogicalTime {
ret := vmv1.LogicalTime{
Value: c.NextValue(),
UpdatedAt: v1.NewTime(now),
}
c.times = append(c.times, ret.UpdatedAt.Time)
c.measurements = append(c.measurements, measurement{
createdAt: ret.UpdatedAt.Time,
kind: kind,
})
return &ret
}

Expand All @@ -45,23 +60,23 @@ func (c *Clock) Observe(logicalTime *vmv1.LogicalTime) error {
}

idx := logicalTime.Value - c.offset
if idx > int64(len(c.times)) {
if idx > int64(len(c.measurements)) {
return errors.New("logicalTime value is in the future")
}

diff := logicalTime.UpdatedAt.Time.Sub(c.times[idx])
diff := logicalTime.UpdatedAt.Time.Sub(c.measurements[idx].createdAt)

if c.cb != nil {
c.cb(diff)
c.cb(diff, c.measurements[idx].kind)
}

c.offset = logicalTime.Value + 1
c.times = c.times[idx+1:]
c.measurements = c.measurements[idx+1:]

return nil
}

type NilClock struct{}

func (c *NilClock) Next(now time.Time) *vmv1.LogicalTime { return nil }
func (c *NilClock) Observe(_ *vmv1.LogicalTime) error { return nil }
func (c *NilClock) Next(now time.Time, _ Kind) *vmv1.LogicalTime { return nil }
func (c *NilClock) Observe(_ *vmv1.LogicalTime) error { return nil }
12 changes: 7 additions & 5 deletions pkg/agent/core/logiclock/logiclock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ import (

type testClockMetric struct {
*logiclock.Clock
t *testing.T
now v1.Time
result *time.Duration
t *testing.T
now v1.Time
result *time.Duration
resultKind logiclock.Kind
}

func (tcm *testClockMetric) advance(d time.Duration) {
Expand All @@ -31,7 +32,7 @@ func (tcm *testClockMetric) assertResult(d time.Duration) {
}

func (tcm *testClockMetric) nextNow() *vmv1.LogicalTime {
return tcm.Next(tcm.now.Time)
return tcm.Next(tcm.now.Time, logiclock.KindUpscale)
}

func newTestClockMetric(t *testing.T) *testClockMetric {
Expand All @@ -42,8 +43,9 @@ func newTestClockMetric(t *testing.T) *testClockMetric {
result: nil,
}

cb := func(d time.Duration) {
cb := func(d time.Duration, kind logiclock.Kind) {
tcm.result = &d
tcm.resultKind = kind
}
tcm.Clock = logiclock.NewClock(cb)

Expand Down
12 changes: 10 additions & 2 deletions pkg/agent/core/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package core
import (
"errors"
"fmt"
"github.com/neondatabase/autoscaling/pkg/agent/core/logiclock"

Check failure on line 26 in pkg/agent/core/state.go

View workflow job for this annotation

GitHub Actions / golangci-lint

File is not `gci`-ed with --skip-generated -s standard -s default -s Prefix(k8s.io) -s Prefix(github.com/neondatabase/autoscaling) --custom-order (gci)
"math"
"strings"
"time"
Expand Down Expand Up @@ -204,7 +205,7 @@ func (ns *neonvmState) ongoingRequest() bool {
}

type LogicClock interface {
Next(ts time.Time) *vmv1.LogicalTime
Next(ts time.Time, kind logiclock.Kind) *vmv1.LogicalTime
Observe(logicalTime *vmv1.LogicalTime) error
}

Expand Down Expand Up @@ -273,7 +274,14 @@ func (s *state) nextActions(now time.Time) ActionSet {
// our handling later on is easier if we can assume it's non-nil
calcDesiredResourcesWait = func(ActionSet) *time.Duration { return nil }
}
desiredLogicalTime := s.ClockSource.Next(now)
var kind logiclock.Kind
if desiredResources.HasFieldLessThan(s.VM.Using()) {
kind = logiclock.KindDownscale
} else {
kind = logiclock.KindUpscale
}

desiredLogicalTime := s.ClockSource.Next(now, kind)

fmt.Printf("new desired time: %v\n", desiredLogicalTime)

Expand Down
6 changes: 3 additions & 3 deletions pkg/agent/prommetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type GlobalMetrics struct {
runnerRestarts prometheus.Counter
runnerNextActions prometheus.Counter

scalingLatency prometheus.Histogram
scalingLatency prometheus.HistogramVec
}

type resourceChangePair struct {
Expand Down Expand Up @@ -220,11 +220,11 @@ func makeGlobalMetrics() (GlobalMetrics, *prometheus.Registry) {
},
)),

scalingLatency: util.RegisterMetric(reg, prometheus.NewHistogram(
scalingLatency: *util.RegisterMetric(reg, prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "autoscaling_agent_scaling_latency_seconds",
Help: "End-to-end scaling latency",
},
}, []string{"kind"},
)),
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/agent/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,9 @@ func (r *Runner) Run(ctx context.Context, logger *zap.Logger, vmInfoUpdated util
pluginRequestJitter := util.NewTimeRange(time.Millisecond, 0, 100).Random()

coreExecLogger := execLogger.Named("core")
clock := logiclock.NewClock(func(duration time.Duration) {
r.global.metrics.scalingLatency.Observe(duration.Seconds())
clock := logiclock.NewClock(func(duration time.Duration, kind logiclock.Kind) {
labels := []string{string(kind)}
r.global.metrics.scalingLatency.WithLabelValues(labels...).Observe(duration.Seconds())
})
executorCore := executor.NewExecutorCore(coreExecLogger, getVmInfo(), executor.Config{
OnNextActions: r.global.metrics.runnerNextActions.Inc,
Expand Down

0 comments on commit 7331087

Please sign in to comment.