Skip to content

Commit

Permalink
Merge branch 'refactor/unified-federated-type' into fix-e2e-tests
Browse files Browse the repository at this point in the history
  • Loading branch information
limhawjia authored Aug 25, 2023
2 parents 6d0a0a6 + 1717536 commit 539dafd
Show file tree
Hide file tree
Showing 23 changed files with 432 additions and 55 deletions.
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@ require (
github.com/onsi/gomega v1.27.8
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.14.0
github.com/sirupsen/logrus v1.9.3
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.4
golang.org/x/exp v0.0.0-20230224173230-c95f2b4c22f2
golang.org/x/sync v0.2.0
golang.org/x/time v0.3.0
k8s.io/api v0.26.6
Expand Down
6 changes: 0 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,6 @@ github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/spf13/cobra v1.4.0/go.mod h1:Wo4iy3BUC+X2Fybo0PDqwJIv3dNRiZLHQymsfxlB84g=
github.com/spf13/cobra v1.6.1 h1:o94oiPyS4KD1mPy2fmcYYHHfCxLqYjJOhGsCHFZtEzA=
github.com/spf13/cobra v1.6.1/go.mod h1:IOw/AERYS7UzyrGinqmz6HLUo219MORXGxhbaJUqzrY=
Expand All @@ -285,7 +283,6 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down Expand Up @@ -316,8 +313,6 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
golang.org/x/exp v0.0.0-20230224173230-c95f2b4c22f2 h1:Jvc7gsqn21cJHCmAWx0LiimpP18LZmUxkT5Mp7EZ1mI=
golang.org/x/exp v0.0.0-20230224173230-c95f2b4c22f2/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
Expand Down Expand Up @@ -433,7 +428,6 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.9.0 h1:KS/R3tvhPqvJvwcKfnBHJwwthS11LRhmM5D59eEXa0s=
golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
Expand Down
12 changes: 12 additions & 0 deletions pkg/apis/core/v1alpha1/types_schedulingprofile.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ const (
WebhookPlugin PluginType = "Webhook"
)

const (
// DefaultSchedulerName defines the name of default scheduler.
DefaultSchedulerName = "default-scheduler"
)

// PluginConfig specifies arguments that should be passed to a plugin at the time of initialization.
// A plugin that is invoked at multiple extension points is initialized once. Args can have arbitrary structure.
// It is up to the plugin to process these Args.
Expand All @@ -118,3 +123,10 @@ type PluginConfig struct {
// +optional
Args apiextensionsv1.JSON `json:"args"`
}

func (s *SchedulingProfile) ProfileName() string {
if s == nil {
return DefaultSchedulerName
}
return s.Name
}
18 changes: 18 additions & 0 deletions pkg/controllers/federatedcluster/clusterjoin.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (

fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1"
"github.com/kubewharf/kubeadmiral/pkg/controllers/common"
"github.com/kubewharf/kubeadmiral/pkg/stats"
"github.com/kubewharf/kubeadmiral/pkg/util/logging"
)

Expand Down Expand Up @@ -72,6 +73,11 @@ const (
EventReasonClusterUnjoinable = "ClusterUnjoinable"
)

const (
joinSuccess = "success"
joinFailure = "failed"
)

// Processes a cluster that has not joined.
// If either condition or joinPerformed returned is non-nil, the caller should merge them into
// the cluster status and update the cluster.
Expand All @@ -93,6 +99,10 @@ func (c *FederatedClusterController) handleNotJoinedCluster(
time.Since(joinedCondition.LastTransitionTime.Time) > c.clusterJoinTimeout {
// Join timed out
logger.Error(nil, "Cluster join timed out")
c.metrics.Duration("cluster_joined_duration", cluster.CreationTimestamp.Time,
stats.Tag{Name: "cluster_name", Value: cluster.Name},
stats.Tag{Name: "result", Value: joinFailure},
stats.Tag{Name: "reason", Value: EventReasonJoinClusterTimeoutExceeded})
c.eventRecorder.Eventf(
cluster,
corev1.EventTypeWarning,
Expand Down Expand Up @@ -152,6 +162,10 @@ func (c *FederatedClusterController) handleNotJoinedCluster(
// Namespace exists and is not created by us - the cluster is managed by another control plane.
msg := "Cluster is unjoinable (check if cluster is already joined to another federation)"
logger.Error(nil, msg, "UID", memberFedNamespace.Annotations[FederatedClusterUID], "clusterUID", string(cluster.UID))
c.metrics.Duration("cluster_joined_duration", cluster.CreationTimestamp.Time,
stats.Tag{Name: "cluster_name", Value: cluster.Name},
stats.Tag{Name: "result", Value: joinFailure},
stats.Tag{Name: "reason", Value: EventReasonClusterUnjoinable})
c.eventRecorder.Eventf(
cluster,
corev1.EventTypeWarning,
Expand Down Expand Up @@ -223,6 +237,10 @@ func (c *FederatedClusterController) handleNotJoinedCluster(
// 5. Cluster is joined, update condition

logger.V(2).Info("Cluster joined successfully")
c.metrics.Duration("cluster_joined_duration", cluster.CreationTimestamp.Time,
stats.Tag{Name: "cluster_name", Value: cluster.Name},
stats.Tag{Name: "result", Value: joinSuccess},
stats.Tag{Name: "reason", Value: EventReasonJoinClusterSuccess})
c.eventRecorder.Eventf(
cluster,
corev1.EventTypeNormal,
Expand Down
60 changes: 60 additions & 0 deletions pkg/controllers/federatedcluster/clusterstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
"k8s.io/klog/v2"

fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1"
"github.com/kubewharf/kubeadmiral/pkg/stats"
clusterutil "github.com/kubewharf/kubeadmiral/pkg/util/cluster"
)

const (
Expand All @@ -63,6 +65,10 @@ func (c *FederatedClusterController) collectIndividualClusterStatus(
ctx context.Context,
cluster *fedcorev1a1.FederatedCluster,
) (retryAfter time.Duration, err error) {
startTime := time.Now()
defer func() {
c.recordClusterStatus(cluster, startTime)
}()
logger := klog.FromContext(ctx)

clusterKubeClient, exists := c.federatedInformerManager.GetClusterKubeClient(cluster.Name)
Expand Down Expand Up @@ -308,3 +314,57 @@ func shouldCollectClusterStatus(cluster *fedcorev1a1.FederatedCluster, collectIn
nextCollectTime := readyCond.LastProbeTime.Time.Add(collectInterval)
return time.Now().After(nextCollectTime)
}

func (c *FederatedClusterController) recordClusterStatus(cluster *fedcorev1a1.FederatedCluster, startTime time.Time) {
if clusterutil.IsClusterReady(&cluster.Status) {
c.metrics.Store("cluster_ready_state",
1,
stats.Tag{Name: "cluster_name", Value: cluster.Name})
} else {
c.metrics.Store("cluster_ready_state",
0,
stats.Tag{Name: "cluster_name", Value: cluster.Name})
}
if clusterutil.IsClusterOffline(&cluster.Status) {
c.metrics.Store("cluster_offline_state",
1,
stats.Tag{Name: "cluster_name", Value: cluster.Name})
} else {
c.metrics.Store("cluster_offline_state",
0,
stats.Tag{Name: "cluster_name", Value: cluster.Name})
}
if clusterutil.IsClusterJoined(&cluster.Status) {
c.metrics.Store("cluster_joined_state",
1,
stats.Tag{Name: "cluster_name", Value: cluster.Name})
} else {
c.metrics.Store("cluster_joined_state",
0,
stats.Tag{Name: "cluster_name", Value: cluster.Name})
}
c.metrics.Duration("cluster_sync_status_duration",
startTime,
stats.Tag{Name: "cluster_name", Value: cluster.Name})
if cluster.Status.Resources.Allocatable != nil {
c.metrics.Store("cluster_memory_allocatable_bytes",
cluster.Status.Resources.Allocatable.Memory().AsApproximateFloat64(),
stats.Tag{Name: "cluster_name", Value: cluster.Name})
c.metrics.Store("cluster_cpu_allocatable_number",
cluster.Status.Resources.Allocatable.Cpu().AsApproximateFloat64(),
stats.Tag{Name: "cluster_name", Value: cluster.Name})
}
if cluster.Status.Resources.Available != nil {
c.metrics.Store("cluster_memory_available_bytes",
cluster.Status.Resources.Available.Memory().AsApproximateFloat64(),
stats.Tag{Name: "cluster_name", Value: cluster.Name})
c.metrics.Store("cluster_cpu_available_number",
cluster.Status.Resources.Available.Cpu().AsApproximateFloat64(),
stats.Tag{Name: "cluster_name", Value: cluster.Name})
}
if cluster.Status.Resources.SchedulableNodes != nil {
c.metrics.Store("cluster_schedulable_nodes_total",
*cluster.Status.Resources.SchedulableNodes,
stats.Tag{Name: "cluster_name", Value: cluster.Name})
}
}
12 changes: 12 additions & 0 deletions pkg/controllers/federatedcluster/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,12 @@ func (c *FederatedClusterController) reconcile(
cluster = cluster.DeepCopy()

if cluster.GetDeletionTimestamp() != nil {
c.metrics.Store("cluster_deletion_state", 1,
stats.Tag{Name: "cluster_name", Value: cluster.Name},
stats.Tag{Name: "status", Value: "deleting"})
c.metrics.Store("cluster_deletion_state", 0,
stats.Tag{Name: "cluster_name", Value: cluster.Name},
stats.Tag{Name: "status", Value: "deleted"})
logger.V(2).Info("Handle terminating cluster")
if err := c.handleTerminatingCluster(ctx, cluster); err != nil {
if apierrors.IsConflict(err) {
Expand Down Expand Up @@ -446,6 +452,12 @@ func (c *FederatedClusterController) handleTerminatingCluster(
return fmt.Errorf("failed to update cluster for finalizer removal: %w", err)
}

c.metrics.Store("cluster_deletion_state", 0,
stats.Tag{Name: "cluster_name", Value: cluster.Name},
stats.Tag{Name: "status", Value: "deleting"})
c.metrics.Store("cluster_deletion_state", 1,
stats.Tag{Name: "cluster_name", Value: cluster.Name},
stats.Tag{Name: "status", Value: "deleted"})
return nil
}

Expand Down
11 changes: 11 additions & 0 deletions pkg/controllers/follower/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,12 @@ func (c *Controller) reconcileLeader(
}
return worker.StatusError
}

c.metrics.Store("followers_total", len(desiredFollowers),
stats.Tag{Name: "namespace", Value: key.namespace},
stats.Tag{Name: "name", Value: key.sourceName},
stats.Tag{Name: "group", Value: key.sourceGK.Group},
stats.Tag{Name: "kind", Value: key.sourceGK.Kind})
}

c.cacheObservedFromLeaders.update(leader, desiredFollowers)
Expand Down Expand Up @@ -458,6 +464,11 @@ func (c *Controller) reconcileFollower(
)
return worker.StatusError
} else if updated {
c.metrics.Store("leaders_total", len(desiredLeaders),
stats.Tag{Name: "namespace", Value: key.namespace},
stats.Tag{Name: "name", Value: key.sourceName},
stats.Tag{Name: "group", Value: key.sourceGK.Group},
stats.Tag{Name: "kind", Value: key.sourceGK.Kind})
keyedLogger.V(1).Info("Updated follower to sync with leaders")
}

Expand Down
16 changes: 16 additions & 0 deletions pkg/controllers/nsautoprop/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ func (c *Controller) reconcile(ctx context.Context, qualifiedName common.Qualifi
return worker.StatusAllOK
}

c.recordNamespacePropagationFailedMetric(fedNamespace)

needsUpdate := false

// Set placement to propagate to all clusters
Expand Down Expand Up @@ -395,3 +397,17 @@ func (c *Controller) HasSynced() bool {
c.namespaceInformer.Informer().HasSynced() &&
c.informerManager.HasSynced()
}

func (c *Controller) recordNamespacePropagationFailedMetric(fedNamespace *fedcorev1a1.ClusterFederatedObject) {
errorClusterCount := 0

for _, clusterStatus := range fedNamespace.Status.Clusters {
if clusterStatus.Status != fedcorev1a1.ClusterPropagationOK && clusterStatus.Status != fedcorev1a1.WaitingForRemoval {
errorClusterCount++
}
}

if errorClusterCount != 0 {
c.metrics.Store("namespace_propagate_failed_total", errorClusterCount, stats.Tag{Name: "namespace", Value: fedNamespace.Name})
}
}
9 changes: 7 additions & 2 deletions pkg/controllers/policyrc/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func NewPolicyRCController(
func(ctx context.Context, qualifiedName common.QualifiedName) worker.Result {
return c.reconcilePersist(
ctx,
"propagation-policy",
"propagation_policy_reference_count",
qualifiedName,
c.propagationPolicyInformer.Informer().GetStore(),
c.clusterPropagationPolicyInformer.Informer().GetStore(),
Expand All @@ -112,7 +112,7 @@ func NewPolicyRCController(
func(ctx context.Context, qualifiedName common.QualifiedName) worker.Result {
return c.reconcilePersist(
ctx,
"override-policy",
"override_policy_reference_count",
qualifiedName,
c.overridePolicyInformer.Informer().GetStore(),
c.clusterOverridePolicyInformer.Informer().GetStore(),
Expand Down Expand Up @@ -317,5 +317,10 @@ func (c *Controller) reconcilePersist(
}
}

c.metrics.Store(metricName, newRefCount, []stats.Tag{
{Name: "name", Value: qualifiedName.Name},
{Name: "namespace", Value: qualifiedName.Namespace},
}...)

return worker.StatusAllOK
}
11 changes: 11 additions & 0 deletions pkg/controllers/scheduler/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,14 @@ const (
SchedulingTriggersAnnotation = common.DefaultPrefix + "scheduling-triggers"
SchedulingDeferredReasonsAnnotation = common.DefaultPrefix + "scheduling-deferred-reasons"
)

const (
// FedObjChanged is the event when FederatedObject/ClusterFederatedObject changes.
FedObjChanged = "FedObjChanged"
// PolicyChanged is the event when PropagationPolicy/ClusterPropagationPolicy changes.
PolicyChanged = "PolicyChanged"
// ClusterChanged is the event when cluster changes.
ClusterChanged = "ClusterChanged"
// FTCChanged is the event when FTC changes.
FTCChanged = "FTCChanged"
)
4 changes: 3 additions & 1 deletion pkg/controllers/scheduler/core/generic_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1"
"github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler/framework"
"github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler/framework/runtime"
"github.com/kubewharf/kubeadmiral/pkg/stats"
)

type naiveReplicasPlugin struct{}
Expand Down Expand Up @@ -57,7 +58,8 @@ func getFramework() framework.Framework {
DefaultRegistry := runtime.Registry{
"NaiveReplicas": newNaiveReplicas,
}
f, _ := runtime.NewFramework(DefaultRegistry, nil, &fedcore.EnabledPlugins{ReplicasPlugins: []string{"NaiveReplicas"}})
metrics := stats.NewMock("test", "kubeadmiral_controller_manager", false)
f, _ := runtime.NewFramework(DefaultRegistry, nil, &fedcore.EnabledPlugins{ReplicasPlugins: []string{"NaiveReplicas"}}, "", metrics)
return f
}

Expand Down
Loading

0 comments on commit 539dafd

Please sign in to comment.