Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(rsp): current pod resource usage should be included in resource calculation #21

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions cmd/controller-manager/app/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func startGlobalScheduler(
controllerCtx.FedInformerFactory.Core().V1alpha1().FederatedClusters(),
controllerCtx.FedInformerFactory.Core().V1alpha1().SchedulingProfiles(),
controllerCtx.FedInformerFactory.Core().V1alpha1().SchedulerPluginWebhookConfigurations(),
controllerCtx.FederatedClientFactory,
controllerCtx.Metrics,
controllerCtx.WorkerCount,
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/federatedcluster/clusterstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func updateClusterResources(
}
}

allocatable, available := aggregateResources(nodes, pods)
allocatable, available := AggregateResources(nodes, pods)
clusterStatus.Resources = fedcorev1a1.Resources{
SchedulableNodes: &schedulableNodes,
Allocatable: allocatable,
Expand Down
30 changes: 21 additions & 9 deletions pkg/controllers/federatedcluster/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,10 @@ func getPodResourceRequests(pod *corev1.Pod) corev1.ResourceList {
return reqs
}

// aggregateResources returns
// AggregateResources returns
// - allocatable resources from the nodes and,
// - available resources after considering allocations to the given pods.
func aggregateResources(
func AggregateResources(
nodes []*corev1.Node,
pods []*corev1.Pod,
) (corev1.ResourceList, corev1.ResourceList) {
Expand All @@ -207,24 +207,36 @@ func aggregateResources(
// Don't consider pod resource for now
delete(allocatable, corev1.ResourcePods)

available := make(corev1.ResourceList)
for name, quantity := range allocatable {
available[name] = quantity.DeepCopy()
available := allocatable.DeepCopy()
usage := AggregatePodUsage(pods, func(pod *corev1.Pod) *corev1.Pod { return pod })

for name, quantity := range available {
// `quantity` is a copy here; pointer methods do not mutate `available[name]`
quantity.Sub(usage[name])
available[name] = quantity
}

return allocatable, available
}

func AggregatePodUsage[T any](pods []T, podFunc func(T) *corev1.Pod) corev1.ResourceList {
list := make(corev1.ResourceList)

for _, pod := range pods {
pod := podFunc(pod)

if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed {
continue
}

podRequests := getPodResourceRequests(pod)
for name, requestedQuantity := range podRequests {
if availableQuantity, ok := available[name]; ok {
availableQuantity.Sub(requestedQuantity)
available[name] = availableQuantity
if q, exists := list[name]; exists {
requestedQuantity.Add(q)
}
list[name] = requestedQuantity
}
}

return allocatable, available
return list
}
20 changes: 16 additions & 4 deletions pkg/controllers/federatedcluster/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,13 +260,21 @@ func Test_aggregateResources(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
allocatable, available := aggregateResources(tc.nodes, tc.pods)
allocatable, available := AggregateResources(tc.nodes, tc.pods)
if len(allocatable) != len(tc.expectedAllocatable) {
t.Fatalf("expected allocatable %s differs from actual allocatable %s", spew.Sdump(tc.expectedAllocatable), spew.Sdump(allocatable))
t.Fatalf(
"expected allocatable %s differs from actual allocatable %s",
spew.Sdump(tc.expectedAllocatable),
spew.Sdump(allocatable),
)
}
for name, actualQuantity := range allocatable {
if expectedQuantity, ok := tc.expectedAllocatable[name]; !ok || !actualQuantity.Equal(expectedQuantity) {
t.Fatalf("expected allocatable %s differs from actual allocatable %s", spew.Sdump(tc.expectedAllocatable), spew.Sdump(allocatable))
t.Fatalf(
"expected allocatable %s differs from actual allocatable %s",
spew.Sdump(tc.expectedAllocatable),
spew.Sdump(allocatable),
)
}
}

Expand All @@ -275,7 +283,11 @@ func Test_aggregateResources(t *testing.T) {
}
for name, actualQuantity := range available {
if expectedQuantity, ok := tc.expectedAvailable[name]; !ok || !actualQuantity.Equal(expectedQuantity) {
t.Fatalf("expected available %s differs from actual available %s", spew.Sdump(tc.expectedAvailable), spew.Sdump(available))
t.Fatalf(
"expected available %s differs from actual available %s",
spew.Sdump(tc.expectedAvailable),
spew.Sdump(available),
)
}
}
})
Expand Down
28 changes: 11 additions & 17 deletions pkg/controllers/scheduler/framework/plugins/rsp/rsp.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,7 @@ const (
allocatableResource string = "allocatable"
)

var (
ErrNoCPUResource = errors.New("no cpu resource")
)
var ErrNoCPUResource = errors.New("no cpu resource")

type ClusterCapacityWeight struct{}

Expand All @@ -77,7 +75,7 @@ func (pl *ClusterCapacityWeight) ReplicaScheduling(

var schedulingWeights map[string]int64
if dynamicSchedulingEnabled {
clusterAvailables := QueryClusterResource(clusters, availableResource)
clusterAvailables := QueryAvailable(clusters, su.CurrentUsage)
if len(clusters) != len(clusterAvailables) {
return clusterReplicasList, framework.NewResult(framework.Error)
}
Expand Down Expand Up @@ -185,7 +183,7 @@ func CalcWeightLimit(
clusters []*fedcorev1a1.FederatedCluster,
supplyLimitRatio float64,
) (weightLimit map[string]int64, err error) {
allocatables := QueryClusterResource(clusters, allocatableResource)
allocatables := QueryAllocatable(clusters)
if len(allocatables) != len(clusters) {
err = fmt.Errorf("allocatables are incomplete: %v", allocatables)
return
Expand Down Expand Up @@ -272,19 +270,8 @@ func AvailableToPercentage(
return
}

// QueryClusterResource aggregate cluster resources, accept available and allocatable.
func QueryClusterResource(clusters []*fedcorev1a1.FederatedCluster, resource string) map[string]corev1.ResourceList {
switch resource {
case availableResource:
return QueryAvailable(clusters)
case allocatableResource:
return QueryAllocatable(clusters)
}
return nil
}

// QueryAvailable aggregate cluster available resource.
func QueryAvailable(clusters []*fedcorev1a1.FederatedCluster) map[string]corev1.ResourceList {
func QueryAvailable(clusters []*fedcorev1a1.FederatedCluster, currentUsage map[string]framework.Resource) map[string]corev1.ResourceList {
ret := make(map[string]corev1.ResourceList)
for _, cluster := range clusters {
available := make(corev1.ResourceList)
Expand All @@ -299,6 +286,13 @@ func QueryAvailable(clusters []*fedcorev1a1.FederatedCluster) map[string]corev1.
available[resourceName] = cluster.Status.Resources.Available[resourceName]
}
}

usageTmp := currentUsage[cluster.Name]
usage := *usageTmp.Clone()

usage.Add(available)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment here to explain why this is necessary

available = usage.ResourceList()

ret[cluster.GetName()] = available
}
return ret
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func TestAvailableToPercentage(t *testing.T) {
}
makeArgs := func(clusters ...*fedcorev1a1.FederatedCluster) args {
return args{
clusterAvailables: QueryAvailable(clusters),
clusterAvailables: QueryAvailable(clusters, nil),
weightLimit: func() map[string]int64 {
weightLimit, _ := CalcWeightLimit(clusters, 1.0)
return weightLimit
Expand Down
1 change: 1 addition & 0 deletions pkg/controllers/scheduler/framework/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type SchedulingUnit struct {
// Describes the current scheduling state
CurrentClusters map[string]*int64
AutoMigration *AutoMigrationSpec
CurrentUsage map[string]Resource

// Controls the scheduling behavior
SchedulingMode fedcorev1a1.SchedulingMode
Expand Down
19 changes: 12 additions & 7 deletions pkg/controllers/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
annotationutil "github.com/kubewharf/kubeadmiral/pkg/controllers/util/annotation"
"github.com/kubewharf/kubeadmiral/pkg/controllers/util/delayingdeliver"
"github.com/kubewharf/kubeadmiral/pkg/controllers/util/eventsink"
"github.com/kubewharf/kubeadmiral/pkg/controllers/util/federatedclient"
"github.com/kubewharf/kubeadmiral/pkg/controllers/util/pendingcontrollers"
schemautil "github.com/kubewharf/kubeadmiral/pkg/controllers/util/schema"
"github.com/kubewharf/kubeadmiral/pkg/controllers/util/worker"
Expand Down Expand Up @@ -86,6 +87,8 @@ type Scheduler struct {
webhookConfigurationSynced cache.InformerSynced
webhookPlugins sync.Map

federatedClient federatedclient.FederatedClientFactory

worker worker.ReconcileWorker
eventRecorder record.EventRecorder

Expand All @@ -106,18 +109,20 @@ func NewScheduler(
clusterInformer fedcorev1a1informers.FederatedClusterInformer,
schedulingProfileInformer fedcorev1a1informers.SchedulingProfileInformer,
webhookConfigurationInformer fedcorev1a1informers.SchedulerPluginWebhookConfigurationInformer,
federatedClient federatedclient.FederatedClientFactory,
metrics stats.Metrics,
workerCount int,
) (*Scheduler, error) {
schedulerName := fmt.Sprintf("%s-scheduler", typeConfig.GetFederatedType().Name)

s := &Scheduler{
typeConfig: typeConfig,
name: schedulerName,
fedClient: fedClient,
dynamicClient: dynamicClient,
metrics: metrics,
logger: klog.LoggerWithName(klog.Background(), schedulerName),
typeConfig: typeConfig,
name: schedulerName,
fedClient: fedClient,
dynamicClient: dynamicClient,
federatedClient: federatedClient,
metrics: metrics,
logger: klog.LoggerWithName(klog.Background(), schedulerName),
}

s.worker = worker.NewReconcileWorker(
Expand Down Expand Up @@ -353,7 +358,7 @@ func (s *Scheduler) reconcile(qualifiedName common.QualifiedName) (status worker
policyKey.String(),
)

schedulingUnit, err := s.schedulingUnitForFedObject(fedObject, policy)
schedulingUnit, err := s.schedulingUnitForFedObject(context.TODO(), fedObject, policy)
if err != nil {
keyedLogger.Error(err, "Failed to get scheduling unit")
s.eventRecorder.Eventf(
Expand Down
5 changes: 3 additions & 2 deletions pkg/controllers/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package scheduler

import (
"context"
"testing"

"github.com/onsi/gomega"
Expand Down Expand Up @@ -318,7 +319,7 @@ func TestGetSchedulingUnitWithAnnotationOverrides(t *testing.T) {
err = unstructured.SetNestedMap(obj.Object, templateObjectMetaUns, common.TemplatePath...)
g.Expect(err).NotTo(gomega.HaveOccurred())

su, err := scheduler.schedulingUnitForFedObject(obj, test.policy)
su, err := scheduler.schedulingUnitForFedObject(context.Background(), obj, test.policy)
g.Expect(err).NotTo(gomega.HaveOccurred())

// override fields we don't want to test
Expand Down Expand Up @@ -403,7 +404,7 @@ func TestSchedulingMode(t *testing.T) {
g.Expect(err).NotTo(gomega.HaveOccurred())
err = unstructured.SetNestedMap(obj.Object, templateObjectMetaUns, common.TemplatePath...)
g.Expect(err).NotTo(gomega.HaveOccurred())
su, err := scheduler.schedulingUnitForFedObject(obj, test.policy)
su, err := scheduler.schedulingUnitForFedObject(context.Background(), obj, test.policy)
g.Expect(err).NotTo(gomega.HaveOccurred())
g.Expect(su.SchedulingMode).To(gomega.Equal(test.expectedResult))
})
Expand Down
72 changes: 72 additions & 0 deletions pkg/controllers/scheduler/schedulingunit.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,29 @@ limitations under the License.
package scheduler

import (
"context"
"encoding/json"
"fmt"
"strconv"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/klog/v2"

fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1"
"github.com/kubewharf/kubeadmiral/pkg/controllers/common"
"github.com/kubewharf/kubeadmiral/pkg/controllers/federatedcluster"
"github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler/framework"
"github.com/kubewharf/kubeadmiral/pkg/controllers/util"
utilunstructured "github.com/kubewharf/kubeadmiral/pkg/controllers/util/unstructured"
)

func (s *Scheduler) schedulingUnitForFedObject(
ctx context.Context,
fedObject *unstructured.Unstructured,
policy fedcorev1a1.GenericPropagationPolicy,
) (*framework.SchedulingUnit, error) {
Expand Down Expand Up @@ -75,6 +79,15 @@ func (s *Scheduler) schedulingUnitForFedObject(
return nil, err
}

var currentUsage map[string]framework.Resource
selectorPath := s.typeConfig.Spec.PathDefinition.LabelSelector
if selectorPath != "" {
currentUsage, err = s.getPodUsage(ctx, fedObject, selectorPath)
if err != nil {
return nil, fmt.Errorf("get pod resource usage: %w", err)
}
}

schedulingUnit := &framework.SchedulingUnit{
GroupVersion: schema.GroupVersion{Group: targetType.Group, Version: targetType.Version},
Kind: targetType.Kind,
Expand All @@ -85,6 +98,7 @@ func (s *Scheduler) schedulingUnitForFedObject(
Annotations: objectMeta.GetAnnotations(),
DesiredReplicas: desiredReplicasOption,
CurrentClusters: currentReplicas,
CurrentUsage: currentUsage,
AvoidDisruption: true,
}

Expand Down Expand Up @@ -162,6 +176,64 @@ func (s *Scheduler) schedulingUnitForFedObject(
return schedulingUnit, nil
}

func (s *Scheduler) getPodUsage(
ctx context.Context,
fedObject *unstructured.Unstructured,
selectorPath string,
) (map[string]framework.Resource, error) {
clusters, err := s.clusterLister.List(labels.Everything())
if err != nil {
return nil, fmt.Errorf("failed to get clusters from store: %w", err)
}

selector, err := utilunstructured.GetLabelSelectorFromPath(fedObject, selectorPath, common.TemplatePath)
if err != nil {
return nil, fmt.Errorf("invalid label selector: %w", err)
}

currentUsage := make(map[string]framework.Resource, len(clusters))

// this loop is intentionally not parallelized to reduce memory overhead.
for _, cluster := range clusters {
if !util.IsClusterJoined(&cluster.Status) {
continue
}

currentUsage[cluster.Name], err = s.getClusterPodUsage(ctx, cluster, fedObject, selector)
if err != nil {
return nil, fmt.Errorf("failed to get pod resource usage in cluster %q: %w", cluster.Name, err)
}
}

return currentUsage, nil
}

func (s *Scheduler) getClusterPodUsage(
ctx context.Context,
cluster *fedcorev1a1.FederatedCluster,
fedObject *unstructured.Unstructured,
selector *metav1.LabelSelector,
) (res framework.Resource, err error) {
client, exists, err := s.federatedClient.KubeClientsetForCluster(cluster.Name)
if err != nil {
return res, fmt.Errorf("get clientset: %w", err)
}
if !exists {
return res, fmt.Errorf("clientset does not exist yet") // wait for the clientset to get created
}

pods, err := client.CoreV1().Pods(metav1.NamespaceAll).List(ctx, metav1.ListOptions{
ResourceVersion: "0",
LabelSelector: metav1.FormatLabelSelector(selector),
})
if err != nil {
return res, fmt.Errorf("cannot list pods: %w", err)
}

usage := federatedcluster.AggregatePodUsage(pods.Items, func(pod corev1.Pod) *corev1.Pod { return &pod })
return *framework.NewResource(usage), nil
}

func getTemplateObjectMeta(fedObject *unstructured.Unstructured) (*metav1.ObjectMeta, error) {
templateContent, exists, err := unstructured.NestedMap(fedObject.Object, common.TemplatePath...)
if err != nil {
Expand Down