Skip to content

Commit

Permalink
add support for try and undo allocate
Browse files Browse the repository at this point in the history
  • Loading branch information
atantawi committed Jul 25, 2023
1 parent a4d16e1 commit 8c8ee6b
Show file tree
Hide file tree
Showing 8 changed files with 659 additions and 32 deletions.
5 changes: 5 additions & 0 deletions pkg/quotaplugins/quota-forest/quota-manager/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ A summary of the API interface to the Quota Manager follows.
- Forest Updates
- refresh (effect) updates from caches
- `UpdateForest(forestName)`
- Undo consumer allocation: Two calls are provided to try to allocate a consumer, and if unaccepted, to undo the effect of the allocation trial. If the trial is accepted, no further action is needed. Otherwise, the undo has to be called right after the try allocation, without making any calls to change the trees or allocate/deallocate consumers. These operations are intended only during Normal mode.
- `TryAllocate(treeName, consumerID)`
- `UndoAllocate(treeName, consumerID)`
- `TryAllocateForest(forestName, consumerID)`
- `UndoAllocateForest(forestName, consumerID)`

Examples of using the Quota Manager in the case of a [single tree](demos/manager/tree/demo.go) and a [forest](demos/manager/forest/demo.go) are provided.

Expand Down
122 changes: 122 additions & 0 deletions pkg/quotaplugins/quota-forest/quota-manager/demos/undo/forest/demo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
Copyright 2023 The Multi-Cluster App Dispatcher Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main

import (
"flag"
"fmt"
"os"

"github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/quotaplugins/quota-forest/quota-manager/quota"
"k8s.io/klog/v2"
)

func main() {
klog.InitFlags(nil)
flag.Set("v", "4")
flag.Set("skip_headers", "true")
klog.SetOutput(os.Stdout)
flag.Parse()
defer klog.Flush()

fmt.Println("Demo of allocation and de-allocation of consumers on a forest using the quota manager.")
fmt.Println()
prefix := "../../../samples/forest/"
indent := "===> "
forestName := "Context-Service"
treeNames := []string{"ContextTree", "ServiceTree"}

// create a quota manager
fmt.Println(indent + "Creating quota manager ... " + "\n")
quotaManager := quota.NewManager()
quotaManager.SetMode(quota.Normal)
fmt.Println(quotaManager.GetModeString())
fmt.Println()

// create multiple trees
fmt.Println(indent + "Creating multiple trees ..." + "\n")
for _, treeName := range treeNames {
fName := prefix + treeName + ".json"
fmt.Printf("Tree file name: %s\n", fName)
jsonTree, err := os.ReadFile(fName)
if err != nil {
fmt.Printf("error reading quota tree file: %s", fName)
return
}
_, err = quotaManager.AddTreeFromString(string(jsonTree))
if err != nil {
fmt.Printf("error adding tree %s: %v", treeName, err)
return
}
}

// create forest
fmt.Println(indent + "Creating forest " + forestName + " ..." + "\n")
quotaManager.AddForest(forestName)
for _, treeName := range treeNames {
quotaManager.AddTreeToForest(forestName, treeName)
}
fmt.Println(quotaManager)

// create consumer jobs
fmt.Println(indent + "Allocating consumers on forest ..." + "\n")
jobs := []string{"job1", "job2", "job3", "job4", "job5"}
for _, job := range jobs {

// create consumer info
fName := prefix + job + ".json"
fmt.Printf("Consumer file name: %s\n", fName)
consumerInfo, err := quota.NewConsumerInfoFromFile(fName)
if err != nil {
fmt.Printf("error reading consumer file: %s \n", fName)
continue
}
consumerID := consumerInfo.GetID()

// add consumer info to quota manager
quotaManager.AddConsumer(consumerInfo)

// allocate forest consumer instance of the consumer info
if job == "job4" {
_, err = quotaManager.TryAllocateForest(forestName, consumerID)
if err != nil {
fmt.Printf("error allocating consumer: %v \n", err)
}
err = quotaManager.UndoAllocateForest(forestName, "job-4")
if err != nil {
fmt.Printf("error undoing allocation consumer: %v \n", err)
}
_, err = quotaManager.AllocateForest(forestName, consumerID)
} else {
_, err = quotaManager.AllocateForest(forestName, consumerID)
}

if err != nil {
fmt.Printf("error allocating consumer: %v \n", err)
quotaManager.RemoveConsumer((consumerID))
continue
}
}

// de-allocate consumers from forest
fmt.Println(indent + "De-allocating consumers from forest ..." + "\n")
for _, id := range quotaManager.GetAllConsumerIDs() {
quotaManager.DeAllocateForest(forestName, id)
quotaManager.RemoveConsumer(id)
}
fmt.Println()
fmt.Println(quotaManager)
}
124 changes: 124 additions & 0 deletions pkg/quotaplugins/quota-forest/quota-manager/demos/undo/tree/demo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
Copyright 2023 The Multi-Cluster App Dispatcher Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main

import (
"flag"
"fmt"
"os"

"github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/quotaplugins/quota-forest/quota-manager/quota"
"github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/quotaplugins/quota-forest/quota-manager/quota/core"
klog "k8s.io/klog/v2"
)

func main() {
klog.InitFlags(nil)
flag.Set("v", "4")
flag.Set("skip_headers", "true")
klog.SetOutput(os.Stdout)
flag.Parse()
defer klog.Flush()

prefix := "../../../samples/tree/"
treeFileName := prefix + "tree.json"
caFileName := prefix + "ca.json"
cbFileName := prefix + "cb.json"
ccFileName := prefix + "cc.json"
cdFileName := prefix + "cd.json"
ceFileName := prefix + "ce.json"

// create a quota manager
fmt.Println("==> Creating Quota Manager")
fmt.Println("**************************")
quotaManager := quota.NewManager()
treeJsonString, err := os.ReadFile(treeFileName)
if err != nil {
fmt.Printf("error reading quota tree file: %s", treeFileName)
return
}
quotaManager.SetMode(quota.Normal)

// add a quota tree from file
treeName, err := quotaManager.AddTreeFromString(string(treeJsonString))
if err != nil {
fmt.Printf("error adding tree %s: %v", treeName, err)
return
}

// allocate consumers
allocate(quotaManager, treeName, caFileName, false)
allocate(quotaManager, treeName, cbFileName, false)
allocate(quotaManager, treeName, ccFileName, false)

// try and undo allocation
allocate(quotaManager, treeName, cdFileName, true)
undoAllocate(quotaManager, treeName, cdFileName)

// allocate consumers
allocate(quotaManager, treeName, ceFileName, false)
}

// allocate consumer from file
func allocate(quotaManager *quota.Manager, treeName string, consumerFileName string, try bool) {
consumerInfo := getConsumerInfo(consumerFileName)
if consumerInfo == nil {
fmt.Printf("error reading consumer file: %s", consumerFileName)
return
}
consumerID := consumerInfo.GetID()
fmt.Println("==> Allocating consumer " + consumerID)
fmt.Println("**************************")
quotaManager.AddConsumer(consumerInfo)

var allocResponse *core.AllocationResponse
var err error
if try {
allocResponse, err = quotaManager.TryAllocate(treeName, consumerID)
} else {
allocResponse, err = quotaManager.Allocate(treeName, consumerID)
}
if err != nil {
fmt.Printf("error allocating consumer: %v", err)
return
}
fmt.Println(allocResponse)
fmt.Println(quotaManager)
}

// undo most recent consumer allocation
func undoAllocate(quotaManager *quota.Manager, treeName string, consumerFileName string) {
consumerInfo := getConsumerInfo(consumerFileName)
if consumerInfo == nil {
fmt.Printf("error reading consumer file: %s", consumerFileName)
return
}
consumerID := consumerInfo.GetID()
fmt.Println("==> Undo allocating consumer " + consumerID)
fmt.Println("**************************")
quotaManager.UndoAllocate(treeName, consumerID)
fmt.Println(quotaManager)
}

// get consumer info from yaml file
func getConsumerInfo(consumerFileName string) *quota.ConsumerInfo {
consumerInfo, err := quota.NewConsumerInfoFromFile(consumerFileName)
if err != nil {
fmt.Printf("error reading consumer file: %s", consumerFileName)
return nil
}
return consumerInfo
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
Expand Down Expand Up @@ -144,7 +144,7 @@ func (fc *ForestController) Allocate(forestConsumer *ForestConsumer) *Allocation
} else if len(groupID) == 0 {
fmt.Fprintf(&msg, "No quota designations provided for '%s'", treeName)
} else {
fmt.Fprintf(&msg, "Explected %d resources for quota designations '%s', received %d",
fmt.Fprintf(&msg, "Expected %d resources for quota designations '%s', received %d",
controller.GetQuotaSize(), treeName, allocRequested.GetSize())
}
return fc.failureRecover(consumerID, processedTrees, deletedConsumers, msg.String())
Expand Down Expand Up @@ -187,6 +187,7 @@ func (fc *ForestController) Allocate(forestConsumer *ForestConsumer) *Allocation
/*
* allocation failed - undo deletions of prior preempted consumers and recover
*/
// TODO: make use of forest snapshot to recover
for _, c := range treeDeletedConsumers {
controller.Allocate(c)
}
Expand Down Expand Up @@ -245,6 +246,46 @@ func (fc *ForestController) failureRecover(consumerID string, processedTrees []s
return failedResponse
}

// TryAllocate : try allocating a consumer by taking a snapshot before attempting allocation
func (fc *ForestController) TryAllocate(forestConsumer *ForestConsumer) *AllocationResponse {
consumerID := forestConsumer.GetID()
consumers := forestConsumer.GetConsumers()
allocResponse := NewAllocationResponse(consumerID)

// take a snapshot of the forest
for treeName, consumer := range consumers {
var msg bytes.Buffer
controller := fc.controllers[treeName]
controller.treeSnapshot = NewTreeSnapshot(controller.tree, consumer)
// TODO: limit the number of potentially affected consumers by the allocation
if !controller.treeSnapshot.Take(controller, controller.consumers) {
fmt.Fprintf(&msg, "Failed to take a state snapshot of tree '%s'", controller.GetTreeName())
treeAllocResponse := NewAllocationResponse(consumer.GetID())
preemptedIds := make([]string, 0)
treeAllocResponse.Append(false, msg.String(), &preemptedIds)
allocResponse.Merge(treeAllocResponse)
return allocResponse
}
}

ar := fc.Allocate(forestConsumer)
allocResponse.Merge(ar)
return allocResponse
}

// UndoAllocate : undo the most recent allocation trial
func (fc *ForestController) UndoAllocate(forestConsumer *ForestConsumer) bool {
klog.V(4).Infof("Multi-quota undo allocation of consumer: %s\n", forestConsumer.GetID())
consumers := forestConsumer.GetConsumers()
success := true
for treeName, consumer := range consumers {
controller := fc.controllers[treeName]
treeSuccess := controller.UndoAllocate(consumer)
success = success && treeSuccess
}
return success
}

// ForceAllocate : force allocate a consumer on a given set of nodes on trees;
// no recovery if not allocated on some trees; partial allocation allowed
func (fc *ForestController) ForceAllocate(forestConsumer *ForestConsumer, nodeIDs map[string]string) *AllocationResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
Expand Down Expand Up @@ -218,11 +218,21 @@ func (qn *QuotaNode) GetAllocated() *Allocation {
return qn.allocated
}

// SetAllocated :
func (qn *QuotaNode) SetAllocated(alloc *Allocation) {
qn.allocated = alloc
}

// GetConsumers :
func (qn *QuotaNode) GetConsumers() []*Consumer {
return qn.consumers
}

// SetConsumers :
func (qn *QuotaNode) SetConsumers(consumers []*Consumer) {
qn.consumers = consumers
}

// String : print node with a specified level of indentation
func (qn *QuotaNode) String(level int) string {
var b bytes.Buffer
Expand Down
Loading

0 comments on commit 8c8ee6b

Please sign in to comment.