Skip to content

Commit

Permalink
Add support for implicit paging in un/structured clients
Browse files Browse the repository at this point in the history
  • Loading branch information
Nimrod Shneor committed Jan 26, 2021
1 parent 9e78e65 commit d3492ab
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 0 deletions.
48 changes: 48 additions & 0 deletions pkg/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package client_test
import (
"context"
"fmt"
"strconv"
"sync/atomic"
"time"

Expand Down Expand Up @@ -47,6 +48,14 @@ func deleteDeployment(ctx context.Context, dep *appsv1.Deployment, ns string) {
}
}

func deletePod(ctx context.Context, pod *corev1.Pod, ns string) {
_, err := clientset.CoreV1().Pods(ns).Get(ctx, pod.Name, metav1.GetOptions{})
if err == nil {
err = clientset.CoreV1().Pods(ns).Delete(ctx, pod.Name, metav1.DeleteOptions{})
Expect(err).NotTo(HaveOccurred())
}
}

func deleteNamespace(ctx context.Context, ns *corev1.Namespace) {
ns, err := clientset.CoreV1().Namespaces().Get(ctx, ns.Name, metav1.GetOptions{})
if err != nil {
Expand Down Expand Up @@ -2085,6 +2094,45 @@ var _ = Describe("Client", func() {
Expect(deps.Items[1].Name).To(Equal(dep4.Name))
}, serverSideTimeoutSeconds)

It("should list in pages large sets of objects using ListPages", func() {
buildPod := func(suffix string) *corev1.Pod {
return &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("pod-%s", suffix),
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{Name: "nginx", Image: "nginx"}},
},
}
}

By("creating 150 pods")
workLoad := 150
for workLoad > 0 {
pod := buildPod(strconv.Itoa(workLoad))
defer deletePod(ctx, pod, ns)
pod, err := clientset.
CoreV1().
Pods(ns).
Create(ctx, pod, metav1.CreateOptions{})
Expect(err).NotTo(HaveOccurred())
workLoad--
defer deletePod(ctx, pod, ns)
}

cl, err := client.New(cfg, client.Options{})
Expect(err).NotTo(HaveOccurred())

By("listing all pods with ListPages")
pods := &corev1.PodList{}
err = cl.ListPages(context.Background(), pods, nil)
Expect(err).NotTo(HaveOccurred())
Expect(pods.Items).To(HaveLen(client.DefaultPageLimit))
Expect(pods.Continue).NotTo(BeEmpty())
Expect(pods.Items[0].Name).To(Equal("pod-1"))

}, serverSideTimeoutSeconds)

PIt("should fail if the object doesn't have meta", func() {

})
Expand Down
7 changes: 7 additions & 0 deletions pkg/client/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ type Reader interface {
// successful call, Items field in the list will be populated with the
// result returned from the server.
List(ctx context.Context, list ObjectList, opts ...ListOption) error

// // Retrieves a list of objects in "chunks" (of size one hundred by default)
// // for a given namespace and list options.
// // One can pass a callback function to process each chunk recieved from the server.
// // On a successful call, Items field in the list will be populated with the
// // result returned from the server.
// ListPages(ctx context.Context, obj ObjectList, callback func(obj ObjectList) error, opts ...ListOption) error
}

// Writer knows how to create, delete, and update Kubernetes objects.
Expand Down
5 changes: 5 additions & 0 deletions pkg/client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ import (
"k8s.io/apimachinery/pkg/selection"
)

const (
// DefaultPageLimit represents the default limit used when specifyinh the 'Page' ListOption.
DefaultPageLimit = 100
)

// {{{ "Functional" Option Interfaces

// CreateOption is some configuration that modifies options for a create request.
Expand Down
64 changes: 64 additions & 0 deletions pkg/client/typed_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package client
import (
"context"

apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
)

Expand Down Expand Up @@ -143,6 +144,68 @@ func (c *typedClient) Get(ctx context.Context, key ObjectKey, obj Object) error
Name(key.Name).Do(ctx).Into(obj)
}

func (c *typedClient) ListPages(ctx context.Context, obj ObjectList,
callback func(obj ObjectList) error, opts ...ListOption) error {
r, err := c.cache.getResource(obj)
if err != nil {
return err
}
listOpts := ListOptions{}
listOpts.ApplyOptions(opts)

// Fetch items at chunks of one hundred if not specified differently.
if listOpts.Limit == 0 {
Limit(DefaultPageLimit).ApplyToList(&listOpts)
}

// Retrieve initial chunck of data.
var allItems []runtime.Object
var interimResult ObjectList
err = r.Get().
NamespaceIfScoped(listOpts.Namespace, r.isNamespaced()).
Resource(r.resource()).
VersionedParams(listOpts.AsListOptions(), c.paramCodec).
Do(ctx).Into(interimResult)
if err != nil {
return err
}

if err := callback(interimResult); err != nil {
return err
}

items, err := apimeta.ExtractList(interimResult)
if err != nil {
return err
}
allItems = append(allItems, items...)

// Continue while there are more chunks.
for interimResult.GetContinue() != "" {
Continue(interimResult.GetContinue()).ApplyToList(&listOpts)
err = r.Get().
NamespaceIfScoped(listOpts.Namespace, r.isNamespaced()).
Resource(r.resource()).
VersionedParams(listOpts.AsListOptions(), c.paramCodec).
Do(ctx).Into(interimResult)
if err != nil {
return err
}

if err := callback(interimResult); err != nil {
return err
}

items, err = apimeta.ExtractList(interimResult)
if err != nil {
return err
}
allItems = append(allItems, items...)
}

return apimeta.SetList(obj, allItems)
}

// List implements client.Client
func (c *typedClient) List(ctx context.Context, obj ObjectList, opts ...ListOption) error {
r, err := c.cache.getResource(obj)
Expand All @@ -151,6 +214,7 @@ func (c *typedClient) List(ctx context.Context, obj ObjectList, opts ...ListOpti
}
listOpts := ListOptions{}
listOpts.ApplyOptions(opts)

return r.Get().
NamespaceIfScoped(listOpts.Namespace, r.isNamespaced()).
Resource(r.resource()).
Expand Down

0 comments on commit d3492ab

Please sign in to comment.