Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] ✨ Add support for implicit paging in un/structured clients #1358

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions pkg/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package client_test
import (
"context"
"fmt"
"strconv"
"sync/atomic"
"time"

Expand Down Expand Up @@ -47,6 +48,14 @@ func deleteDeployment(ctx context.Context, dep *appsv1.Deployment, ns string) {
}
}

func deletePod(ctx context.Context, pod *corev1.Pod, ns string) {
_, err := clientset.CoreV1().Pods(ns).Get(ctx, pod.Name, metav1.GetOptions{})
if err == nil {
err = clientset.CoreV1().Pods(ns).Delete(ctx, pod.Name, metav1.DeleteOptions{})
Expect(err).NotTo(HaveOccurred())
}
}

func deleteNamespace(ctx context.Context, ns *corev1.Namespace) {
ns, err := clientset.CoreV1().Namespaces().Get(ctx, ns.Name, metav1.GetOptions{})
if err != nil {
Expand Down Expand Up @@ -2085,6 +2094,45 @@ var _ = Describe("Client", func() {
Expect(deps.Items[1].Name).To(Equal(dep4.Name))
}, serverSideTimeoutSeconds)

It("should list in pages large sets of objects using ListPages", func() {
buildPod := func(suffix string) *corev1.Pod {
return &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("pod-%s", suffix),
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{Name: "nginx", Image: "nginx"}},
},
}
}

By("creating 150 pods")
workLoad := 150
for workLoad > 0 {
pod := buildPod(strconv.Itoa(workLoad))
defer deletePod(ctx, pod, ns)
pod, err := clientset.
CoreV1().
Pods(ns).
Create(ctx, pod, metav1.CreateOptions{})
Expect(err).NotTo(HaveOccurred())
workLoad--
defer deletePod(ctx, pod, ns)
}

cl, err := client.New(cfg, client.Options{})
Expect(err).NotTo(HaveOccurred())

By("listing all pods with ListPages")
pods := &corev1.PodList{}
err = cl.ListPages(context.Background(), pods, func(obj client.ObjectList) error { return nil })
Expect(err).NotTo(HaveOccurred())
Expect(pods.Items).To(HaveLen(150))
Expect(pods.Continue).NotTo(BeEmpty())
Expect(pods.Items[0].Name).To(Equal("pod-1"))

}, serverSideTimeoutSeconds)

PIt("should fail if the object doesn't have meta", func() {

})
Expand Down
7 changes: 7 additions & 0 deletions pkg/client/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ type Reader interface {
// successful call, Items field in the list will be populated with the
// result returned from the server.
List(ctx context.Context, list ObjectList, opts ...ListOption) error

// Retrieves a list of objects in "chunks" (of size one hundred by default)
// for a given namespace and list options.
// One can pass a callback function to process each chunk recieved from the server.
// On a successful call, Items field in the list will be populated with the
// result returned from the server.
ListPages(ctx context.Context, obj ObjectList, callback func(obj ObjectList) error, opts ...ListOption) error
}

// Writer knows how to create, delete, and update Kubernetes objects.
Expand Down
6 changes: 6 additions & 0 deletions pkg/client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ import (
"k8s.io/apimachinery/pkg/selection"
)

const (
// DefaultPageLimit represents the default limit used for ListPaging when no "Limit"
// is specified as ListOption.
DefaultPageLimit = 100
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably needs a better name since it's not the default for List() too.

Copy link
Author

@nimrodshn nimrodshn Jan 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vincepri @cben Any suggestions?

)

// {{{ "Functional" Option Interfaces

// CreateOption is some configuration that modifies options for a create request.
Expand Down
67 changes: 67 additions & 0 deletions pkg/client/typed_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package client
import (
"context"

apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
)

Expand Down Expand Up @@ -143,6 +144,72 @@ func (c *typedClient) Get(ctx context.Context, key ObjectKey, obj Object) error
Name(key.Name).Do(ctx).Into(obj)
}

func (c *typedClient) ListPages(ctx context.Context, obj ObjectList,
callback func(obj ObjectList) error, opts ...ListOption) error {
r, err := c.cache.getResource(obj)
if err != nil {
return err
}
listOpts := ListOptions{}
listOpts.ApplyOptions(opts)

// Fetch items at chunks of one hundred if not specified differently.
if listOpts.Limit == 0 {
Limit(DefaultPageLimit).ApplyToList(&listOpts)
}

// Retrieve initial chunck of data.
var allItems []runtime.Object
var interimResult ObjectList
err = r.Get().
NamespaceIfScoped(listOpts.Namespace, r.isNamespaced()).
Resource(r.resource()).
VersionedParams(listOpts.AsListOptions(), c.paramCodec).
Do(ctx).Into(interimResult)
if err != nil {
return err
}

if err := callback(interimResult); err != nil {
return err
}

items, err := apimeta.ExtractList(interimResult)
if err != nil {
return err
}
allItems = append(allItems, items...)

// Continue while there are more chunks.
for {
if interimResult.GetContinue() == "" {
break
}

Continue(interimResult.GetContinue()).ApplyToList(&listOpts)
err = r.Get().
NamespaceIfScoped(listOpts.Namespace, r.isNamespaced()).
Resource(r.resource()).
VersionedParams(listOpts.AsListOptions(), c.paramCodec).
Do(ctx).Into(interimResult)
if err != nil {
return err
}

if err := callback(interimResult); err != nil {
return err
}

items, err = apimeta.ExtractList(interimResult)
if err != nil {
return err
}
allItems = append(allItems, items...)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want both a callback function and a huge list return, we should pick one pattern or the other (or move them to separate functions/options). The reason to use a visitor callback function is to avoid keeping the whole list in memory. I think it would be more Go-like to just do the callback style probably, ExtractList() requires a lot of runtime reflect magic since there is no unified API for accessing list items. But having both available could be convenient.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vincepri Do you have opinions on which API to support (or both)?

Copy link
Author

@nimrodshn nimrodshn Jan 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha. 👍

@coderanger Actoually thinking about it - if the user would like to accumulate the result she can simply use the same ExtractList inside the callback.

Copy link
Author

@nimrodshn nimrodshn Jan 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is, she can pass something like this:

out := &corev1.Pods{}
allItems := make([]runtime.Object, 0)
cl.ListPages(ctx, out,  func(obj client.ObjectList) error {
    items, err := apimeta.ExtractList(interimResult)
    if err != nil {
        return err
    }
    allItems = append(allItems, items...)
    return nil
})

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, or more likely cast it to the correct type and skip needing ExtractList :)

}

return apimeta.SetList(obj, allItems)
}

// List implements client.Client
func (c *typedClient) List(ctx context.Context, obj ObjectList, opts ...ListOption) error {
r, err := c.cache.getResource(obj)
Expand Down