-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement priority queueing in the resource syncer #1052
Conversation
🤖 Created branch: z_pr1052/tpantelis/priority_queue |
3a79f6a
to
9e218c4
Compare
pkg/workqueue/queue.go
Outdated
}), | ||
name: name, | ||
} | ||
} | ||
|
||
func (q *queueType) Enqueue(obj interface{}) { | ||
func (q *queueType) enqueue(obj interface{}, priority int) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could the priority use a specific type? It would make it more obvious that this expects one of the two constants, and not an arbitrary priority. (This isn’t all that important since the method is private.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could but like you said it's private. I only defined and exposed two priorities (via specific functions) at least for now. Perhaps we'll have a future need for users to specify arbitrary priorities in which case a specific type wouldn't make sense. I figure we can deal with that if/when. For now I kept it simple with "low" and "normal".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another approach is to move the low and normal priority concept internally to the resource syncer and have the work queue take an arbitrary priority, ie change EnqueueLowPriority
to EnqueueWithPriority(...int priority)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the method to EnqueueWithOpts
and also added a RateLimiting
flag (to mirror AddWithOpts
in the new controller-runtime PriorityQueue). Looking at the K8s controller-runtime code, they do not enqueue items from the initial listing on startup with rate limiting which makes sense b/c it already knows it's a one-time burst. I did the same in the resource syncer.
9e218c4
to
2503df7
Compare
...for use with the "container/heap" module that implements heap.Interface. The items in the queue are ordered by descending priority such that items with higher priority values are de-queued first. Signed-off-by: Tom Pantelis <[email protected]>
Signed-off-by: Tom Pantelis <[email protected]>
Signed-off-by: Tom Pantelis <[email protected]>
eca993c
to
a54ed39
Compare
submariner-io/admiral#1052 Signed-off-by: Tom Pantelis <[email protected]>
submariner-io/admiral#1052 Signed-off-by: Tom Pantelis <[email protected]>
...by configuring backend Queue for the client-go rate-limiting queue. The work queue exposes two priorities for enqueue: normal priority via the existing Enqueue method an arbitrary priority via a new EnqueueWithOpts method. The new method also takes a 'RateLimited' flag indicating to enqueue with or without rate limiting. Signed-off-by: Tom Pantelis <[email protected]>
Utilize the workqueue's EnqueueWithOpts method to prioritize newly created/updated resources over pre-existing resources on startup. The cache.ResourceEventHandler's OnAdd method has an 'isInInitialList' param that indicates if the obj is pre-existing in the initial listing retrieved from the API server. If 'isInInitialList' is true then enqueue with low priority otherwise use normal priority (Enqueue method). The underlying priority queue might adjust the ordering of items with the same priority so the ordering of items enqueued might not be the same order that they're dequeued. As such, the namespace event handling needed to be modified to use the 'operationQueues' to maintain the ordering of adds and deletes. Signed-off-by: Tom Pantelis <[email protected]>
On an informer re-sync, all resources are re-notified as update events. Most likely the resources didn't actually change so enqueue with low priority if the ResourceVersion didn't change. Signed-off-by: Tom Pantelis <[email protected]>
a54ed39
to
f974aca
Compare
|
||
func ConfigFromConfigMap(configMap *corev1.ConfigMap, keyPrefix string, defaultConfig *Config) *Config { | ||
if configMap == nil { | ||
return defaultConfig |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is really minor, but perhaps
return defaultConfig | |
return DefaultConfigIfNil(defaultConfig) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't do that b/c I assumed the caller probably wants nil returned if they passed nil defaultConfig
.
🤖 Closed branches: [z_pr1052/tpantelis/priority_queue] |
submariner-io/admiral#1052 Signed-off-by: Tom Pantelis <[email protected]>
All pre-existing resources are queued by the informer on startup. With many resources in the hundreds or thousands it can take significant time to process them all and the work queue rate limiter can cause further significant delays. This can block newly created resources or existing resource updates for potentially minutes. Most of the time a pod restart is fairly quick so most resources didn't likely change so it makes sense to give newly created/updated resources higher priority and process them in a more timely manner.
The client-go work queue allows the backend
Queue
implementation to be configurable. This PR adds an implementation to the admiralworkqueue
module that is backed by a priority queue that is used in conjunction with thecontainer/heap
module and implementsheap.Interface
. A newEnqueueWithPriority
method was added to the admiralworkqueue.Interface
that allows the caller to specify the priority and whether or not to enqueue with rate limiting. The existingEnqueue
method was modified to enqueue with normal priority 0.The resource syncer was modified to use the
EnqueueWithPriority
method to prioritize newly created/updated resources over pre-existing resources on startup. Thecache.ResourceEventHandler'
sOnAdd
method has anisInInitialList
param that indicates if the obj was pre-existing in the initial listing retrieved from the API server. IfisInInitialList
is true then enqueue with low priority otherwise use normal priority (Enqueue
method).See commits for more details.
Related to submariner-io/lighthouse#1706