Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KEP-2170: Adding validation webhook for v2 trainjob #2307

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

akshaychitneni
Copy link
Contributor

@akshaychitneni akshaychitneni commented Oct 24, 2024

Adds validation webhook for v2 trainjob.
Relates to #2209

What this PR does / why we need it:

Which issue(s) this PR fixes (optional, in Fixes #<issue number>, #<issue number>, ... format, will close the issue(s) when PR gets merged):
Fixes # #2209

Checklist:

  • Docs included if any changes are user facing

@akshaychitneni
Copy link
Contributor Author

cc @tenzen-y @andreyvelich

@akshaychitneni akshaychitneni force-pushed the webhookv2 branch 5 times, most recently from 892a40b to f1a06c4 Compare October 25, 2024 16:36
@google-oss-prow google-oss-prow bot added size/L and removed size/XL labels Oct 25, 2024
@akshaychitneni akshaychitneni force-pushed the webhookv2 branch 3 times, most recently from ce983eb to 736a759 Compare October 25, 2024 17:09
@coveralls
Copy link

coveralls commented Oct 25, 2024

Pull Request Test Coverage Report for Build 11784298214

Details

  • 6 of 6 (100.0%) changed or added relevant lines in 1 file are covered.
  • No unchanged relevant lines lost coverage.
  • Overall coverage remained the same at 100.0%

Totals Coverage Status
Change from base Build 11758410179: 0.0%
Covered Lines: 78
Relevant Lines: 78

💛 - Coveralls

Copy link
Member

@tenzen-y tenzen-y left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for taking this, and moving this forward.
And Sorry for the delay.

Comment on lines 69 to 76
Namespace: new.Namespace,
Name: new.Spec.RuntimeRef.Name,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you ever seen the isseus when we use the old object names?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we get new object here and not old ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I am validating updated object instead of the existing one

@@ -140,3 +143,115 @@ func (j *JobSet) ReconcilerBuilders() []runtime.ReconcilerBuilder {
},
}
}

func (j *JobSet) Validate(oldObj, newObj *kubeflowv2.TrainJob, runtimeInfo *runtime.Info) (admission.Warnings, field.ErrorList) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that there are some conflicts between @andreyvelich PR and this.
@akshaychitneni Could you consult with @andreyvelich, then which PRs should we merge into the main, first.

Copy link
Contributor Author

@akshaychitneni akshaychitneni Nov 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rebased with @andreyvelich's changes

@@ -31,7 +31,7 @@ func Setup(mgr ctrl.Manager, runtimes map[string]runtime.Runtime) (string, error
return kubeflowv2.TrainingRuntimeKind, err
}
if err := setupWebhookForTrainJob(mgr, runtimes); err != nil {
return "TrainJob", err
return kubeflowv2.TrainJobKind, err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool! This is what I imagined architechture in my KubeflowJobPipeline framework design phase.

failedCtrlName, err := controllerv2.SetupControllers(mgr, runtimes)
gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred(), "controller", failedCtrlName)
gomega.ExpectWithOffset(1, failedCtrlName).To(gomega.BeEmpty())
if startControllers {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you ever seen any issues like null pointer when we start the controllers for webhook testing, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I have seen but we might not need to start the controllers just to validate create/update requests and leave to reconciler tests to cover reconciliation

@akshaychitneni akshaychitneni force-pushed the webhookv2 branch 4 times, most recently from f4d1430 to a93ffb7 Compare November 6, 2024 18:08
Copy link
Member

@andreyvelich andreyvelich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this effort @akshaychitneni!
I left initial comments.

Comment on lines 29 to 31
// JobExporter is the Job name for the exporter.
JobExporter string = "exporter"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please can we implement the validation for exporter in the future once we design it as part of: #2245 ?
We should discuss whether we want to use sidecar container or another ReplicatedJob for model checkpointing.
cc @saileshd1402 @akshaychitneni @tenzen-y

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. Makes sense

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@akshaychitneni Please can you remove the values from your PR that we will not use for now (e.g. JobExporter).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM

return r.framework.RunComponentBuilderPlugins(ctx, jobSetTemplate.DeepCopy(), info, trainJob)
}

func (r *TrainingRuntime) runtimeInfo(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be part of Runtime interface: https://github.com/kubeflow/training-operator/blob/a93ffb7125c3899519058ba43fa1d5419b498e85/pkg/runtime.v2/interface.go#L32
And should we name this API more explicit (e.g. getRuntimeInfo() or initializeRuntimeInfo()) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be part of trainingRuntime as it depends on config from trainJob/trainingRuntume resources

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but the Info object will be used for every runtime that we register with our manager.
What is the main motivation to create this helper function to construct the Info object for the TrainingRuntime ?

Comment on lines 69 to 76
Namespace: new.Namespace,
Name: new.Spec.RuntimeRef.Name,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we get new object here and not old ?

numProcPerNodePath := specPath.Child("trainer").Child("numProcPerNode")
if runtimeInfo.MLPolicy.MPI != nil {
if _, err := strconv.Atoi(*newJobObj.Spec.Trainer.NumProcPerNode); err != nil {
allErrs = append(allErrs, field.Invalid(numProcPerNodePath, newJobObj.Spec.Trainer.NumProcPerNode, "should have an int value"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so, is this value compatible with the k8s API conventions: https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md ?

numProcPerNodePath := specPath.Child("trainer").Child("numProcPerNode")
if runtimeInfo.RuntimePolicy.MLPolicy.Torch != nil && newObj.Spec.Trainer.NumProcPerNode != nil {
allowedStringValList := []string{"auto", "cpu", "gpu"}
numProcPerNode := *newObj.Spec.Trainer.NumProcPerNode
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@akshaychitneni @tenzen-y Can't we use CEL for that validation since we just validate values for .nProcPerNode parameter ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have included CEL validation for this path in trainingRuntimes #2313 but CEL can't be added here for trainJob config as it is requires referenced trainingRuntime config to validate

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I see. Do we mean that in TrainJob NumProcPerNode can be different depends on the runtimeRef ?
E.g. for MPI we accept only int values, but for Torch we accept auto, cpu, gpu, and int values.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this already reloved, right? In that case, let's rely on the CEL validation.

return nil, nil
}

if newObj.Spec.ModelConfig != nil && newObj.Spec.ModelConfig.Input != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, for now we should check the initContainers in JobSet, as I mentioned here: https://github.com/kubeflow/training-operator/blob/master/pkg/runtime.v2/framework/plugins/jobset/builder.go#L87-L89

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am checking the initContainers here https://github.com/kubeflow/training-operator/pull/2307/files#diff-935da6e0f990201db2f6ddf15c768526f70993d5a2408814013e96e3fedd5ebfR165. The condition here is only to check presence to initializer job if input modelconfig or dataset config is present in the trainJob

gomega.Expect(k8sClient.DeleteAllOf(ctx, &kubeflowv2.TrainJob{}, client.InNamespace(ns.Name))).To(gomega.Succeed())
})

ginkgo.When("Creating TrainJob", func() {
Copy link
Member

@andreyvelich andreyvelich Nov 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tenzen-y @akshaychitneni What is right way to test our validations with integration or unit tests ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think integration tests might be helpful in this case as functioning of trainjob webhook relies on dependent objects like trainingRuntime

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both are useful. Basically, we add UTs for all testing cases including all edge case to UTs so that we can easily identify root cause under any problems. The integration tests have objectives to verify if the entire webhook mechanism works correct.

if we rely only on integration (or E2E) tests, it's challenging to identify the root cause and debug.

@akshaychitneni akshaychitneni force-pushed the webhookv2 branch 2 times, most recently from 241c4f1 to cb8c6c3 Compare November 11, 2024 18:37
Copy link
Member

@tenzen-y tenzen-y left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left very initial comments. I will revisit here after UTs are implemented.

Comment on lines 29 to 31
// JobExporter is the Job name for the exporter.
JobExporter string = "exporter"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM

@@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"k8s.io/utils/ptr"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this to second group.

Comment on lines 12 to 17
func RuntimeRefToGroupKind(runtimeRef kubeflowv2.RuntimeRef) schema.GroupKind {
return schema.GroupKind{
Group: ptr.Deref(runtimeRef.APIGroup, ""),
Kind: ptr.Deref(runtimeRef.Kind, ""),
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines 12 to 17
func RuntimeRefToGroupKind(runtimeRef kubeflowv2.RuntimeRef) schema.GroupKind {
return schema.GroupKind{
Group: ptr.Deref(runtimeRef.APIGroup, ""),
Kind: ptr.Deref(runtimeRef.Kind, ""),
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func RuntimeRefToGroupKind(runtimeRef kubeflowv2.RuntimeRef) schema.GroupKind {
return schema.GroupKind{
Group: ptr.Deref(runtimeRef.APIGroup, ""),
Kind: ptr.Deref(runtimeRef.Kind, ""),
}
}
func RuntimeRefToRuntimeRegistryKey(runtimeRef kubeflowv2.RuntimeRef) string {
return schema.GroupKind{
Group: ptr.Deref(runtimeRef.APIGroup, ""),
Kind: ptr.Deref(runtimeRef.Kind, ""),
}.String()
}

Additionally, could we make more specific helper since this objective is for runtime registry?

numProcPerNodePath := specPath.Child("trainer").Child("numProcPerNode")
if runtimeInfo.RuntimePolicy.MLPolicy.Torch != nil && newObj.Spec.Trainer.NumProcPerNode != nil {
allowedStringValList := []string{"auto", "cpu", "gpu"}
numProcPerNode := *newObj.Spec.Trainer.NumProcPerNode
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this already reloved, right? In that case, let's rely on the CEL validation.

Comment on lines 4 to 7
"errors"
kubeflowv2 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v2alpha1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/utils/ptr"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"errors"
kubeflowv2 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v2alpha1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/utils/ptr"
"errors"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/utils/ptr"
trainer "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v2alpha1"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add unit testings?

jobSetTemplate := jobsetv1alpha2.JobSet{
Spec: trainingRuntime.Spec.Template.Spec,
}
return r.framework.RunCustomValidationPlugins(jobSetTemplate.DeepCopy(), info, old, new)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm prefer current approach.
Ideally, combined all information to runtimeInfo, then use that each plugins.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add UTs?

gomega.Expect(k8sClient.DeleteAllOf(ctx, &kubeflowv2.TrainJob{}, client.InNamespace(ns.Name))).To(gomega.Succeed())
})

ginkgo.When("Creating TrainJob", func() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both are useful. Basically, we add UTs for all testing cases including all edge case to UTs so that we can easily identify root cause under any problems. The integration tests have objectives to verify if the entire webhook mechanism works correct.

if we rely only on integration (or E2E) tests, it's challenging to identify the root cause and debug.

Copy link

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign andreyvelich for approval. For more information see the Kubernetes Code Review Process.

The full list of commands accepted by this bot can be found here.

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

fixing runtime

Signed-off-by: Akshay Chitneni <[email protected]>
Copy link
Member

@andreyvelich andreyvelich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @akshaychitneni!
I left a few comments.
@kubeflow/wg-training-leads @Electronic-Waste @saileshd1402 @seanlaii please help with review.

@@ -140,3 +137,31 @@ func (t *Torch) EnforceMLPolicy(info *runtime.Info, trainJob *trainer.TrainJob)

return nil
}

func (t *Torch) Validate(runtimeJobTemplate client.Object, runtimeInfo *runtime.Info, oldObj, newObj *trainer.TrainJob) (admission.Warnings, field.ErrorList) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@akshaychitneni Please can you keep this Validate() function at the top of torch.go file for consistency with other plugins (e.g. MPI:

func (m *MPI) Validate(runtimeJobTemplate client.Object, runtimeInfo *runtime.Info, oldJobObj, newJobObj *trainer.TrainJob) (admission.Warnings, field.ErrorList) {
var allErrs field.ErrorList
specPath := field.NewPath("spec")
if newJobObj.Spec.Trainer != nil {
numProcPerNodePath := specPath.Child("trainer").Child("numProcPerNode")
if runtimeInfo.RuntimePolicy.MLPolicy != nil && runtimeInfo.RuntimePolicy.MLPolicy.MPI != nil {
numProcPerNode := *newJobObj.Spec.Trainer.NumProcPerNode
if numProcPerNode.Type != intstr.Int {
allErrs = append(allErrs, field.Invalid(numProcPerNodePath, newJobObj.Spec.Trainer.NumProcPerNode, "should have an int value"))
}
}
}
return nil, allErrs
}
)?

@@ -159,3 +164,52 @@ func (j *JobSet) TerminalCondition(ctx context.Context, trainJob *trainer.TrainJ
}
return nil, nil
}

func (j *JobSet) Validate(runtimeJobTemplate client.Object, runtimeInfo *runtime.Info, oldObj, newObj *trainer.TrainJob) (admission.Warnings, field.ErrorList) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question, can we move it after Name() API.

Comment on lines +75 to +77
// TorchEnvNamePrefix is the env name prefix for the distributed envs for torchrun.
TorchEnvNamePrefix = "PET_"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move this constant to the L60, and re-use it for other torchrun envs:

// TorchEnvNumNodes is the env name for the number of training nodes.

@@ -111,6 +114,8 @@ const (
// Distributed envs for mpirun.
// Values for OpenMPI implementation.
OpenMPIEnvHostFileLocation string = "OMPI_MCA_orte_default_hostfile"

UnsupportedRuntimeErrMsg string = "the specified runtime is not supported"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tenzen-y @akshaychitneni Do we want to keep this error in the trainjob_controller.go or constants ?

@@ -83,10 +81,10 @@ func (r *TrainJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, nil
}

runtimeRefGK := runtimeRefToGroupKind(trainJob.Spec.RuntimeRef).String()
runtimeRefGK := jobruntimes.RuntimeRefToRuntimeRegistryKey(trainJob.Spec.RuntimeRef)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we call this function as:

Suggested change
runtimeRefGK := jobruntimes.RuntimeRefToRuntimeRegistryKey(trainJob.Spec.RuntimeRef)
runtimeRefGK := jobruntimes.RuntimeRefToGroupKind(trainJob.Spec.RuntimeRef)

return r.framework.RunComponentBuilderPlugins(ctx, jobSetTemplate.DeepCopy(), info, trainJob)
}

func (r *TrainingRuntime) runtimeInfo(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Back to my point here: #2307 (comment)
Do we really need to generate Info object when we perform validation ?
The validation of TrainingRuntime executes before TrainJob is created, so we don't really need to construct Info object from TrainJob + TrainingRuntime for the TrainJob validation.

@akshaychitneni @tenzen-y Am I missing something ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see that we use it here, since we need to fetch data from the TrainJob and ClusterTrainingRuntime to define what type of TrainJob validation we need to perform

if runtimeInfo.RuntimePolicy.MLPolicy != nil && runtimeInfo.RuntimePolicy.MLPolicy.MPI != nil {
numProcPerNode := *newJobObj.Spec.Trainer.NumProcPerNode
if numProcPerNode.Type != intstr.Int {
allErrs = append(allErrs, field.Invalid(numProcPerNodePath, newJobObj.Spec.Trainer.NumProcPerNode, "should have an int value"))
}

@tenzen-y Is that something that you had in mind when you designed the Runtime Framework ?

if !ok {
return nil, fmt.Errorf("%s: %s", constants.UnsupportedRuntimeErrMsg, runtimeRefGK)
}
warnings, errorList := runtime.ValidateObjects(ctx, oldTrainJob, newTrainJob)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we use old TrainJob object in the ValidateObjects ?

func (m *MPI) Validate(oldObj, newObj *trainer.TrainJob) (admission.Warnings, field.ErrorList) {
return nil, nil
func (m *MPI) Validate(runtimeJobTemplate client.Object, runtimeInfo *runtime.Info, oldJobObj, newJobObj *trainer.TrainJob) (admission.Warnings, field.ErrorList) {
var allErrs field.ErrorList
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to be consistent with Enforce API, and just exit this validation if validation is not required:

Suggested change
var allErrs field.ErrorList
var allErrs field.ErrorList
if info == nil || info.RuntimePolicy.MLPolicy == nil || info.RuntimePolicy.MLPolicy.MPI == nil {
return nil, allErrs
}

}
}

if slices.ContainsFunc(newObj.Spec.Trainer.Env, func(x corev1.EnvVar) bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this TODO:

// TODO (andreyvelich): Add validation to check that TrainJob doesn't have "PET_" envs.

Comment on lines +179 to +184
if newObj.Spec.ModelConfig != nil && newObj.Spec.ModelConfig.Input != nil {
if !slices.ContainsFunc(jobSet.Spec.ReplicatedJobs, func(x jobsetv1alpha2.ReplicatedJob) bool {
return x.Name == constants.JobInitializer
}) {
allErrs = append(allErrs, field.Invalid(runtimeRefPath, newObj.Spec.RuntimeRef, fmt.Sprintf("trainingRuntime should have %s job when trainJob is configured with input modelConfig", constants.JobInitializer)))
} else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, we can simplify this since if user sets DatasetConfig or ModelConfig we need to check that ReplicatedJob contains Initializer Job.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants