diff --git a/pkg/quotaplugins/quota-forest/quota-manager/README.md b/pkg/quotaplugins/quota-forest/quota-manager/README.md index e9f03ae73..a35a0c190 100644 --- a/pkg/quotaplugins/quota-forest/quota-manager/README.md +++ b/pkg/quotaplugins/quota-forest/quota-manager/README.md @@ -188,6 +188,11 @@ A summary of the API interface to the Quota Manager follows. - Forest Updates - refresh (effect) updates from caches - `UpdateForest(forestName)` +- Undo consumer allocation: Two calls are provided to try to allocate a consumer, and if unaccepted, to undo the effect of the allocation trial. If the trial is accepted, no further action is needed. Otherwise, the undo has to be called right after the try allocation, without making any calls to change the trees or allocate/deallocate consumers. These operations are intended only during Normal mode. + - `TryAllocate(treeName, consumerID)` + - `UndoAllocate(treeName, consumerID)` + - `TryAllocateForest(forestName, consumerID)` + - `UndoAllocateForest(forestName, consumerID)` Examples of using the Quota Manager in the case of a [single tree](demos/manager/tree/demo.go) and a [forest](demos/manager/forest/demo.go) are provided. diff --git a/pkg/quotaplugins/quota-forest/quota-manager/demos/undo/forest/demo.go b/pkg/quotaplugins/quota-forest/quota-manager/demos/undo/forest/demo.go new file mode 100644 index 000000000..46438bb5c --- /dev/null +++ b/pkg/quotaplugins/quota-forest/quota-manager/demos/undo/forest/demo.go @@ -0,0 +1,122 @@ +/* +Copyright 2023 The Multi-Cluster App Dispatcher Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package main + +import ( + "flag" + "fmt" + "os" + + "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/quotaplugins/quota-forest/quota-manager/quota" + "k8s.io/klog/v2" +) + +func main() { + klog.InitFlags(nil) + flag.Set("v", "4") + flag.Set("skip_headers", "true") + klog.SetOutput(os.Stdout) + flag.Parse() + defer klog.Flush() + + fmt.Println("Demo of allocation and de-allocation of consumers on a forest using the quota manager.") + fmt.Println() + prefix := "../../../samples/forest/" + indent := "===> " + forestName := "Context-Service" + treeNames := []string{"ContextTree", "ServiceTree"} + + // create a quota manager + fmt.Println(indent + "Creating quota manager ... " + "\n") + quotaManager := quota.NewManager() + quotaManager.SetMode(quota.Normal) + fmt.Println(quotaManager.GetModeString()) + fmt.Println() + + // create multiple trees + fmt.Println(indent + "Creating multiple trees ..." + "\n") + for _, treeName := range treeNames { + fName := prefix + treeName + ".json" + fmt.Printf("Tree file name: %s\n", fName) + jsonTree, err := os.ReadFile(fName) + if err != nil { + fmt.Printf("error reading quota tree file: %s", fName) + return + } + _, err = quotaManager.AddTreeFromString(string(jsonTree)) + if err != nil { + fmt.Printf("error adding tree %s: %v", treeName, err) + return + } + } + + // create forest + fmt.Println(indent + "Creating forest " + forestName + " ..." + "\n") + quotaManager.AddForest(forestName) + for _, treeName := range treeNames { + quotaManager.AddTreeToForest(forestName, treeName) + } + fmt.Println(quotaManager) + + // create consumer jobs + fmt.Println(indent + "Allocating consumers on forest ..." + "\n") + jobs := []string{"job1", "job2", "job3", "job4", "job5"} + for _, job := range jobs { + + // create consumer info + fName := prefix + job + ".json" + fmt.Printf("Consumer file name: %s\n", fName) + consumerInfo, err := quota.NewConsumerInfoFromFile(fName) + if err != nil { + fmt.Printf("error reading consumer file: %s \n", fName) + continue + } + consumerID := consumerInfo.GetID() + + // add consumer info to quota manager + quotaManager.AddConsumer(consumerInfo) + + // allocate forest consumer instance of the consumer info + if job == "job4" { + _, err = quotaManager.TryAllocateForest(forestName, consumerID) + if err != nil { + fmt.Printf("error allocating consumer: %v \n", err) + } + err = quotaManager.UndoAllocateForest(forestName, "job-4") + if err != nil { + fmt.Printf("error undoing allocation consumer: %v \n", err) + } + _, err = quotaManager.AllocateForest(forestName, consumerID) + } else { + _, err = quotaManager.AllocateForest(forestName, consumerID) + } + + if err != nil { + fmt.Printf("error allocating consumer: %v \n", err) + quotaManager.RemoveConsumer((consumerID)) + continue + } + } + + // de-allocate consumers from forest + fmt.Println(indent + "De-allocating consumers from forest ..." + "\n") + for _, id := range quotaManager.GetAllConsumerIDs() { + quotaManager.DeAllocateForest(forestName, id) + quotaManager.RemoveConsumer(id) + } + fmt.Println() + fmt.Println(quotaManager) +} diff --git a/pkg/quotaplugins/quota-forest/quota-manager/demos/undo/tree/demo.go b/pkg/quotaplugins/quota-forest/quota-manager/demos/undo/tree/demo.go new file mode 100644 index 000000000..ef16b8751 --- /dev/null +++ b/pkg/quotaplugins/quota-forest/quota-manager/demos/undo/tree/demo.go @@ -0,0 +1,124 @@ +/* +Copyright 2023 The Multi-Cluster App Dispatcher Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package main + +import ( + "flag" + "fmt" + "os" + + "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/quotaplugins/quota-forest/quota-manager/quota" + "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/quotaplugins/quota-forest/quota-manager/quota/core" + klog "k8s.io/klog/v2" +) + +func main() { + klog.InitFlags(nil) + flag.Set("v", "4") + flag.Set("skip_headers", "true") + klog.SetOutput(os.Stdout) + flag.Parse() + defer klog.Flush() + + prefix := "../../../samples/tree/" + treeFileName := prefix + "tree.json" + caFileName := prefix + "ca.json" + cbFileName := prefix + "cb.json" + ccFileName := prefix + "cc.json" + cdFileName := prefix + "cd.json" + ceFileName := prefix + "ce.json" + + // create a quota manager + fmt.Println("==> Creating Quota Manager") + fmt.Println("**************************") + quotaManager := quota.NewManager() + treeJsonString, err := os.ReadFile(treeFileName) + if err != nil { + fmt.Printf("error reading quota tree file: %s", treeFileName) + return + } + quotaManager.SetMode(quota.Normal) + + // add a quota tree from file + treeName, err := quotaManager.AddTreeFromString(string(treeJsonString)) + if err != nil { + fmt.Printf("error adding tree %s: %v", treeName, err) + return + } + + // allocate consumers + allocate(quotaManager, treeName, caFileName, false) + allocate(quotaManager, treeName, cbFileName, false) + allocate(quotaManager, treeName, ccFileName, false) + + // try and undo allocation + allocate(quotaManager, treeName, cdFileName, true) + undoAllocate(quotaManager, treeName, cdFileName) + + // allocate consumers + allocate(quotaManager, treeName, ceFileName, false) +} + +// allocate consumer from file +func allocate(quotaManager *quota.Manager, treeName string, consumerFileName string, try bool) { + consumerInfo := getConsumerInfo(consumerFileName) + if consumerInfo == nil { + fmt.Printf("error reading consumer file: %s", consumerFileName) + return + } + consumerID := consumerInfo.GetID() + fmt.Println("==> Allocating consumer " + consumerID) + fmt.Println("**************************") + quotaManager.AddConsumer(consumerInfo) + + var allocResponse *core.AllocationResponse + var err error + if try { + allocResponse, err = quotaManager.TryAllocate(treeName, consumerID) + } else { + allocResponse, err = quotaManager.Allocate(treeName, consumerID) + } + if err != nil { + fmt.Printf("error allocating consumer: %v", err) + return + } + fmt.Println(allocResponse) + fmt.Println(quotaManager) +} + +// undo most recent consumer allocation +func undoAllocate(quotaManager *quota.Manager, treeName string, consumerFileName string) { + consumerInfo := getConsumerInfo(consumerFileName) + if consumerInfo == nil { + fmt.Printf("error reading consumer file: %s", consumerFileName) + return + } + consumerID := consumerInfo.GetID() + fmt.Println("==> Undo allocating consumer " + consumerID) + fmt.Println("**************************") + quotaManager.UndoAllocate(treeName, consumerID) + fmt.Println(quotaManager) +} + +// get consumer info from yaml file +func getConsumerInfo(consumerFileName string) *quota.ConsumerInfo { + consumerInfo, err := quota.NewConsumerInfoFromFile(consumerFileName) + if err != nil { + fmt.Printf("error reading consumer file: %s", consumerFileName) + return nil + } + return consumerInfo +} diff --git a/pkg/quotaplugins/quota-forest/quota-manager/quota/core/forestcontroller.go b/pkg/quotaplugins/quota-forest/quota-manager/quota/core/forestcontroller.go index 134ec8fbd..0540fb608 100644 --- a/pkg/quotaplugins/quota-forest/quota-manager/quota/core/forestcontroller.go +++ b/pkg/quotaplugins/quota-forest/quota-manager/quota/core/forestcontroller.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -144,7 +144,7 @@ func (fc *ForestController) Allocate(forestConsumer *ForestConsumer) *Allocation } else if len(groupID) == 0 { fmt.Fprintf(&msg, "No quota designations provided for '%s'", treeName) } else { - fmt.Fprintf(&msg, "Explected %d resources for quota designations '%s', received %d", + fmt.Fprintf(&msg, "Expected %d resources for quota designations '%s', received %d", controller.GetQuotaSize(), treeName, allocRequested.GetSize()) } return fc.failureRecover(consumerID, processedTrees, deletedConsumers, msg.String()) @@ -187,6 +187,7 @@ func (fc *ForestController) Allocate(forestConsumer *ForestConsumer) *Allocation /* * allocation failed - undo deletions of prior preempted consumers and recover */ + // TODO: make use of forest snapshot to recover for _, c := range treeDeletedConsumers { controller.Allocate(c) } @@ -245,6 +246,46 @@ func (fc *ForestController) failureRecover(consumerID string, processedTrees []s return failedResponse } +// TryAllocate : try allocating a consumer by taking a snapshot before attempting allocation +func (fc *ForestController) TryAllocate(forestConsumer *ForestConsumer) *AllocationResponse { + consumerID := forestConsumer.GetID() + consumers := forestConsumer.GetConsumers() + allocResponse := NewAllocationResponse(consumerID) + + // take a snapshot of the forest + for treeName, consumer := range consumers { + var msg bytes.Buffer + controller := fc.controllers[treeName] + controller.treeSnapshot = NewTreeSnapshot(controller.tree, consumer) + // TODO: limit the number of potentially affected consumers by the allocation + if !controller.treeSnapshot.Take(controller, controller.consumers) { + fmt.Fprintf(&msg, "Failed to take a state snapshot of tree '%s'", controller.GetTreeName()) + treeAllocResponse := NewAllocationResponse(consumer.GetID()) + preemptedIds := make([]string, 0) + treeAllocResponse.Append(false, msg.String(), &preemptedIds) + allocResponse.Merge(treeAllocResponse) + return allocResponse + } + } + + ar := fc.Allocate(forestConsumer) + allocResponse.Merge(ar) + return allocResponse +} + +// UndoAllocate : undo the most recent allocation trial +func (fc *ForestController) UndoAllocate(forestConsumer *ForestConsumer) bool { + klog.V(4).Infof("Multi-quota undo allocation of consumer: %s\n", forestConsumer.GetID()) + consumers := forestConsumer.GetConsumers() + success := true + for treeName, consumer := range consumers { + controller := fc.controllers[treeName] + treeSuccess := controller.UndoAllocate(consumer) + success = success && treeSuccess + } + return success +} + // ForceAllocate : force allocate a consumer on a given set of nodes on trees; // no recovery if not allocated on some trees; partial allocation allowed func (fc *ForestController) ForceAllocate(forestConsumer *ForestConsumer, nodeIDs map[string]string) *AllocationResponse { diff --git a/pkg/quotaplugins/quota-forest/quota-manager/quota/core/quotanode.go b/pkg/quotaplugins/quota-forest/quota-manager/quota/core/quotanode.go index dae2b1d59..2d585ffe5 100644 --- a/pkg/quotaplugins/quota-forest/quota-manager/quota/core/quotanode.go +++ b/pkg/quotaplugins/quota-forest/quota-manager/quota/core/quotanode.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -218,11 +218,21 @@ func (qn *QuotaNode) GetAllocated() *Allocation { return qn.allocated } +// SetAllocated : +func (qn *QuotaNode) SetAllocated(alloc *Allocation) { + qn.allocated = alloc +} + // GetConsumers : func (qn *QuotaNode) GetConsumers() []*Consumer { return qn.consumers } +// SetConsumers : +func (qn *QuotaNode) SetConsumers(consumers []*Consumer) { + qn.consumers = consumers +} + // String : print node with a specified level of indentation func (qn *QuotaNode) String(level int) string { var b bytes.Buffer diff --git a/pkg/quotaplugins/quota-forest/quota-manager/quota/core/treecontroller.go b/pkg/quotaplugins/quota-forest/quota-manager/quota/core/treecontroller.go index 466a8b4dd..4bf9e7847 100644 --- a/pkg/quotaplugins/quota-forest/quota-manager/quota/core/treecontroller.go +++ b/pkg/quotaplugins/quota-forest/quota-manager/quota/core/treecontroller.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -35,6 +35,9 @@ type Controller struct { preemptedConsumers []string // list of preempted consumer objects due to previous allocation request preemptedConsumersArray []*Consumer + + // snapshot of tree state to undo most recent allocation + treeSnapshot *TreeSnapshot } // NewController : create a quota controller @@ -44,6 +47,7 @@ func NewController(tree *QuotaTree) *Controller { consumers: make(map[string]*Consumer), preemptedConsumers: make([]string, 0), preemptedConsumersArray: make([]*Consumer, 0), + treeSnapshot: nil, } } @@ -66,7 +70,6 @@ func (controller *Controller) Allocate(consumer *Consumer) *AllocationResponse { } } else { fmt.Fprintf(&allocationMessage, "Failed to allocate quota on quota designation '%s'", controller.GetTreeName()) - } controller.PrintState(consumer, allocated) preemptedIds := make([]string, len(controller.preemptedConsumers)) @@ -76,6 +79,32 @@ func (controller *Controller) Allocate(consumer *Consumer) *AllocationResponse { return allocResponse } +// TryAllocate : try allocating a consumer by taking a snapshot before attempting allocation +func (controller *Controller) TryAllocate(consumer *Consumer) *AllocationResponse { + controller.treeSnapshot = NewTreeSnapshot(controller.tree, consumer) + if !controller.treeSnapshot.Take(controller, nil) { + var allocationMessage bytes.Buffer + fmt.Fprintf(&allocationMessage, "Failed to take a state snapshot of tree '%s'", controller.GetTreeName()) + allocResponse := NewAllocationResponse(consumer.GetID()) + preemptedIds := make([]string, 0) + allocResponse.Append(false, allocationMessage.String(), &preemptedIds) + return allocResponse + } + return controller.Allocate(consumer) +} + +// UndoAllocate : undo the most recent allocation trial +func (controller *Controller) UndoAllocate(consumer *Consumer) bool { + success := true + defer controller.PrintState(consumer, success) + if ts := controller.treeSnapshot; ts != nil && ts.targetConsumer.GetID() == consumer.GetID() { + ts.Reinstate(controller) + } else { + success = false + } + return success +} + // ForceAllocate : force allocate a consumer on a given node func (controller *Controller) ForceAllocate(consumer *Consumer, nodeID string) *AllocationResponse { consumerID := consumer.GetID() diff --git a/pkg/quotaplugins/quota-forest/quota-manager/quota/core/treesnapshot.go b/pkg/quotaplugins/quota-forest/quota-manager/quota/core/treesnapshot.go new file mode 100644 index 000000000..8516f8688 --- /dev/null +++ b/pkg/quotaplugins/quota-forest/quota-manager/quota/core/treesnapshot.go @@ -0,0 +1,197 @@ +/* +Copyright 2023 The Multi-Cluster App Dispatcher Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package core + +import ( + "unsafe" + + "k8s.io/klog/v2" +) + +// TreeSnapshot : A snapshot of a quota tree that could be used to undo a consumer allocation. +// +// A snapshop of the tree state is taken before a consumer allocation trial, including only data +// that could potentially change during allocation of the target consumer. Reinstating the snapshot +// to the tree in case it is desired to undo the allocation. +type TreeSnapshot struct { + // the target quota tree + targetTree *QuotaTree + // the target consumer of allocation + targetConsumer *Consumer + + // all consumers that could potentially change due to the allocation of the target consumer + allChangedConsumers []*Consumer + // snapshot of nodes state + nodeStates map[string]*NodeState + // snapshot of consumers state + consumerStates map[string]*ConsumerState + // consumers already allocated (running) + activeConsumers map[string]*Consumer + + // snapshot of preempted consumers + preemptedConsumers []string + // snapshot of list of preempted consumer objects + preemptedConsumersArray []*Consumer +} + +// NodeState : snapshot data about state of a node +type NodeState struct { + // the node + node *QuotaNode + // amount allocated on the node + allocated *Allocation + // list of consumers allocated on the node + consumers []*Consumer +} + +// ConsumerState : snapshot data about state of a consumer +type ConsumerState struct { + // the consumer + consumer *Consumer + // node the consumer is assigned to + aNode *QuotaNode +} + +// NewTreeSnapshot : create a new snapshot before allocating a consumer on a quota tree +func NewTreeSnapshot(tree *QuotaTree, consumer *Consumer) *TreeSnapshot { + ts := &TreeSnapshot{ + targetTree: tree, + targetConsumer: consumer, + } + ts.Reset() + ts.allChangedConsumers = append(ts.allChangedConsumers, consumer) + return ts +} + +// Take : take a snapshot +func (ts *TreeSnapshot) Take(controller *Controller, changedConsumers map[string]*Consumer) bool { + klog.V(4).Infof("Taking snapshot of tree %s prior to allocation of consumer %s \n", + ts.targetTree.GetName(), ts.targetConsumer.GetID()) + + // add potentially altered consumers + for _, c := range changedConsumers { + ts.allChangedConsumers = append(ts.allChangedConsumers, c) + } + + // make a copy of active consumers + for cid, c := range controller.consumers { + ts.activeConsumers[cid] = c + } + // make a copy of preempted consumers + ts.preemptedConsumers = make([]string, len(controller.preemptedConsumers)) + copy(ts.preemptedConsumers, controller.preemptedConsumers) + ts.preemptedConsumersArray = make([]*Consumer, len(controller.preemptedConsumersArray)) + copy(ts.preemptedConsumersArray, controller.preemptedConsumersArray) + + // copy node states for all potentially altered consumers + for _, c := range ts.allChangedConsumers { + + // copy state of consumer; skip if already visited this consumer + if taken := ts.takeConsumer(c); !taken { + continue + } + + // visit nodes along path from consumer leaf node to root + groupID := c.GetGroupID() + leafNode := ts.targetTree.GetLeafNode(groupID) + if leafNode == nil { + klog.V(4).Infof("Consumer %s member of unknown group %s \n", c.GetID(), groupID) + ts.Reset() + return false + } + path := leafNode.GetPathToRoot() + for _, n := range path { + node := (*QuotaNode)(unsafe.Pointer(n)) + // make copy of node state; skip remaining nodes along path if node already visited + if taken := ts.takeNode(node); !taken { + break + } + // make a copy of node consumers + for _, c := range node.GetConsumers() { + ts.takeConsumer(c) + } + } + } + return true +} + +// Reinstate : reinstate the snapshot +func (ts *TreeSnapshot) Reinstate(controller *Controller) { + // reinstate consumers state + for _, cs := range ts.consumerStates { + if c := cs.consumer; c != nil { + c.SetNode(cs.aNode) + } + } + + // reinstate nodes state + for _, ns := range ts.nodeStates { + if n := ns.node; n != nil { + n.SetAllocated(ns.allocated) + n.SetConsumers(ns.consumers) + } + } + + // reinstate preempted and active consumers + controller.consumers = ts.activeConsumers + controller.preemptedConsumers = ts.preemptedConsumers + controller.preemptedConsumersArray = ts.preemptedConsumersArray + + // reset the state of the snapshot + ts.Reset() +} + +// takeNode : take state of a node; return false if already taken +func (ts *TreeSnapshot) takeNode(node *QuotaNode) bool { + nodeID := node.GetID() + if _, exists := ts.nodeStates[nodeID]; exists { + return false + } + allocated := node.GetAllocated().Clone() + consumers := make([]*Consumer, len(node.GetConsumers())) + copy(consumers, node.GetConsumers()) + ts.nodeStates[nodeID] = &NodeState{ + node: node, + allocated: allocated, + consumers: consumers, + } + return true +} + +// takeConsumer : take state of consumer; return false if already taken +func (ts *TreeSnapshot) takeConsumer(c *Consumer) bool { + consumerID := c.GetID() + if _, exists := ts.consumerStates[consumerID]; exists { + return false + } + ts.consumerStates[consumerID] = &ConsumerState{ + consumer: c, + aNode: c.GetNode(), + } + return true +} + +// Reset : reset the snapshot data +func (ts *TreeSnapshot) Reset() { + ts.allChangedConsumers = make([]*Consumer, 0) + ts.nodeStates = make(map[string]*NodeState) + ts.consumerStates = make(map[string]*ConsumerState) + ts.activeConsumers = make(map[string]*Consumer) + + ts.preemptedConsumers = make([]string, 0) + ts.preemptedConsumersArray = make([]*Consumer, 0) +} diff --git a/pkg/quotaplugins/quota-forest/quota-manager/quota/quotamanager.go b/pkg/quotaplugins/quota-forest/quota-manager/quota/quotamanager.go index 6c2492390..7a90ee7a3 100644 --- a/pkg/quotaplugins/quota-forest/quota-manager/quota/quotamanager.go +++ b/pkg/quotaplugins/quota-forest/quota-manager/quota/quotamanager.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -269,26 +269,38 @@ func (m *Manager) Allocate(treeName string, consumerID string) (response *core.A m.mutex.Lock() defer m.mutex.Unlock() - agent := m.agents[treeName] - if agent == nil { - return nil, fmt.Errorf("invalid tree name %s", treeName) + agent, consumer, err := m.preAllocate(treeName, consumerID) + if err == nil && agent.controller.IsConsumerAllocated(consumerID) { + err = fmt.Errorf("consumer %s already allocated on tree %s", consumerID, treeName) } - consumerInfo := m.consumerInfos[consumerID] - if consumerInfo == nil { - return nil, fmt.Errorf("consumer %s does not exist, create and add first", consumerID) + if err != nil { + return nil, err } - if agent.controller.IsConsumerAllocated(consumerID) { - return nil, fmt.Errorf("consumer %s already allocated on tree %s", consumerID, treeName) + if m.mode == Normal { + response = agent.controller.Allocate(consumer) + } else { + response = agent.controller.ForceAllocate(consumer, consumer.GetGroupID()) } - resourceNames := agent.cache.GetResourceNames() - consumer, err := consumerInfo.CreateTreeConsumer(treeName, resourceNames) - if err != nil { - return nil, fmt.Errorf("failure creating consumer %s on tree %s", consumerID, treeName) + if !response.IsAllocated() { + return nil, fmt.Errorf(response.GetMessage()) } + return response, err +} - klog.V(4).Infoln(consumer) +// TryAllocate : try allocating a consumer on a tree +func (m *Manager) TryAllocate(treeName string, consumerID string) (response *core.AllocationResponse, err error) { + m.mutex.Lock() + defer m.mutex.Unlock() + + agent, consumer, err := m.preAllocate(treeName, consumerID) + if err == nil && agent.controller.IsConsumerAllocated(consumerID) { + err = fmt.Errorf("consumer %s already allocated on tree %s", consumerID, treeName) + } + if err != nil { + return nil, err + } if m.mode == Normal { - response = agent.controller.Allocate(consumer) + response = agent.controller.TryAllocate(consumer) } else { response = agent.controller.ForceAllocate(consumer, consumer.GetGroupID()) } @@ -298,6 +310,40 @@ func (m *Manager) Allocate(treeName string, consumerID string) (response *core.A return response, err } +// UndoAllocate : undo the most recent allocation trial on a tree +func (m *Manager) UndoAllocate(treeName string, consumerID string) (err error) { + m.mutex.Lock() + defer m.mutex.Unlock() + + agent, consumer, err := m.preAllocate(treeName, consumerID) + if err != nil { + return err + } + if !agent.controller.UndoAllocate(consumer) { + return fmt.Errorf("failed undo allocate tree name %s", treeName) + } + return nil +} + +// preAllocate : prepare for allocate +func (m *Manager) preAllocate(treeName string, consumerID string) (agent *agent, consumer *core.Consumer, err error) { + agent = m.agents[treeName] + if agent == nil { + return nil, nil, fmt.Errorf("invalid tree name %s", treeName) + } + consumerInfo := m.consumerInfos[consumerID] + if consumerInfo == nil { + return nil, nil, fmt.Errorf("consumer %s does not exist, create and add first", consumerID) + } + resourceNames := agent.cache.GetResourceNames() + consumer, err = consumerInfo.CreateTreeConsumer(treeName, resourceNames) + if err != nil { + return nil, nil, fmt.Errorf("failure creating consumer %s on tree %s", consumerID, treeName) + } + klog.V(4).Infoln(consumer) + return agent, consumer, nil +} + // IsAllocated : check if a consumer is allocated on a tree func (m *Manager) IsAllocated(treeName string, consumerID string) bool { m.mutex.RLock() @@ -425,25 +471,44 @@ func (m *Manager) AllocateForest(forestName string, consumerID string) (response m.mutex.Lock() defer m.mutex.Unlock() - forestController := m.forests[forestName] - if forestController == nil { - return nil, fmt.Errorf("invalid forest name %s", forestName) + forestController, forestConsumer, err := m.preAllocateForest(forestName, consumerID) + if err == nil && forestController.IsConsumerAllocated(consumerID) { + err = fmt.Errorf("consumer %s already allocated on forest %s", consumerID, forestName) } - consumerInfo := m.consumerInfos[consumerID] - if consumerInfo == nil { - return nil, fmt.Errorf("consumer %s does not exist, create and add first", consumerID) + if err != nil { + return nil, err + } + + if m.mode == Normal { + response = forestController.Allocate(forestConsumer) + } else { + groupIDs := make(map[string]string) + for treeName, consumer := range forestConsumer.GetConsumers() { + groupIDs[treeName] = consumer.GetGroupID() + } + response = forestController.ForceAllocate(forestConsumer, groupIDs) } - if forestController.IsConsumerAllocated(consumerID) { - return nil, fmt.Errorf("consumer %s already allocated on forest %s", consumerID, forestName) + if !response.IsAllocated() { + return nil, fmt.Errorf(response.GetMessage()) + } + return response, nil +} + +// TryAllocateForest : allocate a consumer on a forest +func (m *Manager) TryAllocateForest(forestName string, consumerID string) (response *core.AllocationResponse, err error) { + m.mutex.Lock() + defer m.mutex.Unlock() + + forestController, forestConsumer, err := m.preAllocateForest(forestName, consumerID) + if err == nil && forestController.IsConsumerAllocated(consumerID) { + err = fmt.Errorf("consumer %s already allocated on forest %s", consumerID, forestName) } - resourceNames := forestController.GetResourceNames() - forestConsumer, err := consumerInfo.CreateForestConsumer(forestName, resourceNames) if err != nil { - return nil, fmt.Errorf("failure creating forest consumer %s in forest %s", consumerID, forestName) + return nil, err } if m.mode == Normal { - response = forestController.Allocate(forestConsumer) + response = forestController.TryAllocate(forestConsumer) } else { groupIDs := make(map[string]string) for treeName, consumer := range forestConsumer.GetConsumers() { @@ -457,6 +522,40 @@ func (m *Manager) AllocateForest(forestName string, consumerID string) (response return response, nil } +// UndoAllocate : undo the most recent allocation trial on a tree +func (m *Manager) UndoAllocateForest(forestName string, consumerID string) (err error) { + m.mutex.Lock() + defer m.mutex.Unlock() + + forestController, forestConsumer, err := m.preAllocateForest(forestName, consumerID) + if err != nil { + return err + } + if !forestController.UndoAllocate(forestConsumer) { + return fmt.Errorf("failed undo allocate forest name %s", forestName) + } + return nil +} + +// preAllocateForest : prepare for allocate forest +func (m *Manager) preAllocateForest(forestName string, consumerID string) (forestController *core.ForestController, + forestConsumer *core.ForestConsumer, err error) { + forestController = m.forests[forestName] + if forestController == nil { + return nil, nil, fmt.Errorf("invalid forest name %s", forestName) + } + consumerInfo := m.consumerInfos[consumerID] + if consumerInfo == nil { + return nil, nil, fmt.Errorf("consumer %s does not exist, create and add first", consumerID) + } + resourceNames := forestController.GetResourceNames() + forestConsumer, err = consumerInfo.CreateForestConsumer(forestName, resourceNames) + if err != nil { + return nil, nil, fmt.Errorf("failure creating forest consumer %s in forest %s", consumerID, forestName) + } + return forestController, forestConsumer, nil +} + // IsAllocatedForest : check if a consumer is allocated on a forest func (m *Manager) IsAllocatedForest(forestName string, consumerID string) bool { m.mutex.RLock()