Skip to content
This repository has been archived by the owner on Jan 13, 2021. It is now read-only.

Commit

Permalink
Use Store to Get resources
Browse files Browse the repository at this point in the history
Should reduce number of api calls significantly.
  • Loading branch information
ash2k committed Mar 24, 2017
1 parent d103032 commit 35c8711
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 61 deletions.
9 changes: 8 additions & 1 deletion pkg/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"k8s.io/client-go/dynamic"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/pkg/api"
extensions "k8s.io/client-go/pkg/apis/extensions/v1beta1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -55,13 +56,17 @@ func (a *App) Run(ctx context.Context) error {
deploymentInf := informerFactory.Extensions().V1beta1().Deployments().Informer()
ingressInf := informerFactory.Extensions().V1beta1().Ingresses().Informer()
serviceInf := informerFactory.Core().V1().Services().Informer()
configMapInf := informerFactory.Core().V1().ConfigMaps().Informer()
secretInf := informerFactory.Core().V1().Secrets().Informer()
bundleInf := bundleInformer(bundleClient)

store := NewStore()
store.AddInformer(tprGVK, tprInf)
store.AddInformer(schema.GroupVersionKind{Group: "extensions", Version: "v1beta1", Kind: "Deployment"}, deploymentInf)
store.AddInformer(schema.GroupVersionKind{Group: "extensions", Version: "v1beta1", Kind: "Ingress"}, ingressInf)
store.AddInformer(schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Service"}, serviceInf)
store.AddInformer(api.Unversioned.WithKind("Service"), serviceInf)
store.AddInformer(api.Unversioned.WithKind("ConfigMap"), configMapInf)
store.AddInformer(api.Unversioned.WithKind("Secret"), secretInf)
store.AddInformer(bundleGVK, bundleInf)

informerFactory.Start(ctx.Done()) // Must be after store.AddInformer()
Expand Down Expand Up @@ -128,6 +133,8 @@ func (a *App) Run(ctx context.Context) error {
deploymentInf.AddEventHandler(reh)
ingressInf.AddEventHandler(reh)
serviceInf.AddEventHandler(reh)
configMapInf.AddEventHandler(reh)
secretInf.AddEventHandler(reh)

// 7. Watch Third Party Resources to add watches for supported ones

Expand Down
144 changes: 84 additions & 60 deletions pkg/processor/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@ import (
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
unstructured_conversion "k8s.io/apimachinery/pkg/conversion/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
)

var converter unstructured_conversion.Converter = unstructured_conversion.NewConverter(false)

type worker struct {
bp *BundleProcessor
bo backoff.BackOff
Expand Down Expand Up @@ -125,29 +128,13 @@ nextVertex:
}

func (wrk *worker) checkResource(bundle *smith.Bundle, res *smith.Resource) (isReady bool, e error) {
gv, err := schema.ParseGroupVersion(res.Spec.GetAPIVersion())
if err != nil {
return false, err
}
kind := res.Spec.GetKind()
client, err := wrk.bp.clients.ClientForGroupVersionKind(gv.WithKind(kind))
if err != nil {
return false, err
}

resClient := client.Resource(&metav1.APIResource{
Name: resources.ResourceKindToPath(kind),
Namespaced: true,
Kind: kind,
}, wrk.namespace)

// 0. Update label to point at the parent bundle
// 1. Update label to point at the parent bundle
res.Spec.SetLabels(mergeLabels(
bundle.Metadata.Labels,
res.Spec.GetLabels(),
map[string]string{smith.BundleNameLabel: wrk.bundleName}))

// 1. Update OwnerReferences
// 2. Update OwnerReferences
// Hardcode APIVersion/Kind because of https://github.com/kubernetes/client-go/issues/60
// TODO uncomment when https://github.com/kubernetes/kubernetes/issues/39816 is fixed
//res.Spec.SetOwnerReferences(append(res.Spec.GetOwnerReferences(), metav1.OwnerReference{
Expand All @@ -157,58 +144,95 @@ func (wrk *worker) checkResource(bundle *smith.Bundle, res *smith.Resource) (isR
// UID: bundle.Metadata.UID,
//}))

// 3. Create or update resource
response, err := wrk.createOrUpdate(res)
if err != nil || response == nil {
return false, err
}

// 4. Check if resource is ready
return wrk.bp.rc.IsReady(response)
}

func (wrk *worker) createOrUpdate(res *smith.Resource) (*unstructured.Unstructured, error) {
gv, err := schema.ParseGroupVersion(res.Spec.GetAPIVersion())
if err != nil {
return nil, err
}
kind := res.Spec.GetKind()
gvk := gv.WithKind(kind)
client, err := wrk.bp.clients.ClientForGroupVersionKind(gvk)
if err != nil {
return nil, err
}

resClient := client.Resource(&metav1.APIResource{
Name: resources.ResourceKindToPath(kind),
Namespaced: true,
Kind: kind,
}, wrk.namespace)

name := res.Spec.GetName()
// 2. Try to get the resource. We do read first to avoid generating unnecessary events.
obj, exists, err := wrk.bp.store.Get(gvk, wrk.namespace, name)
if err != nil {
// Unexpected error
return nil, err
}
var response *unstructured.Unstructured
for {
// 2. Try to get the resource. We do read first to avoid generating unnecessary events.
// TODO Maybe create a cache of Stores to avoid uselessly bombarding api server with requests?
// TODO How do we ensure we have informers for all object types so that we will get events once object is updated by its controller?
response, err = resClient.Get(name)
if err != nil {
if !errors.IsNotFound(err) {
// Unexpected error
return false, err
}
log.Printf("[WORKER] bundle %s/%s: resource %q not found, creating", wrk.namespace, wrk.bundleName, res.Name)
// 3. Create if does not exist
response, err = resClient.Create(&res.Spec)
if err == nil {
log.Printf("[WORKER] bundle %s/%s: resource %q created", wrk.namespace, wrk.bundleName, res.Name)
break
if exists {
var ok bool
response, ok = obj.(*unstructured.Unstructured)
if !ok {
response = &unstructured.Unstructured{
Object: make(map[string]interface{}),
}
if errors.IsAlreadyExists(err) {
log.Printf("[WORKER] bundle %s/%s: resource %q found, restarting loop", wrk.namespace, wrk.bundleName, res.Name)
continue
if err = converter.ToUnstructured(obj, &response.Object); err != nil {
// Unexpected error
return nil, err
}
// Unexpected error
return false, err
}

// 4. Compare spec and existing resource
updated, err := updateResource(wrk.bp.scheme, res, response)
if err != nil {
// Unexpected error
return false, err
} else {
log.Printf("[WORKER] bundle %s/%s: resource %q not found, creating", wrk.namespace, wrk.bundleName, res.Name)
// 3. Create if does not exist
response, err = resClient.Create(&res.Spec)
if err == nil {
log.Printf("[WORKER] bundle %s/%s: resource %q created", wrk.namespace, wrk.bundleName, res.Name)
return response, nil
}
if updated == nil {
log.Printf("[WORKER] bundle %s/%s: resource %q has correct spec", wrk.namespace, wrk.bundleName, res.Name)
break
if errors.IsAlreadyExists(err) {
log.Printf("[WORKER] bundle %s/%s: resource %q found, but not in Store yet", wrk.namespace, wrk.bundleName, res.Name)
// We let the next rebuild() iteration, triggered by someone else creating the resource, to finish the work.
return nil, nil
}
// Unexpected error
return nil, err
}

// 5. Update if different
response, err = resClient.Update(updated)
if err != nil {
if errors.IsConflict(err) {
log.Printf("[WORKER] bundle %s/%s: resource %q update resulted in conflict, restarting loop", wrk.namespace, wrk.bundleName, res.Name)
continue
}
// Unexpected error
return false, err
// 4. Compare spec and existing resource
updated, err := updateResource(wrk.bp.scheme, res, response)
if err != nil {
// Unexpected error
return nil, err
}
if updated == nil {
log.Printf("[WORKER] bundle %s/%s: resource %q has correct spec", wrk.namespace, wrk.bundleName, res.Name)
return response, nil
}

// 5. Update if different
response, err = resClient.Update(updated)
if err != nil {
if errors.IsConflict(err) {
log.Printf("[WORKER] bundle %s/%s: resource %q update resulted in conflict, restarting loop", wrk.namespace, wrk.bundleName, res.Name)
// We let the next rebuild() iteration, triggered by someone else creating the resource, to finish the work.
return nil, nil
}
log.Printf("[WORKER] bundle %s/%s: resource %q updated", wrk.namespace, wrk.bundleName, res.Name)
break
// Unexpected error
return nil, err
}
return wrk.bp.rc.IsReady(response)
log.Printf("[WORKER] bundle %s/%s: resource %q updated", wrk.namespace, wrk.bundleName, res.Name)
return response, nil
}

func (wrk *worker) setBundleState(tpl *smith.Bundle, desired smith.ResourceState) error {
Expand Down

0 comments on commit 35c8711

Please sign in to comment.