From 4a252748b1a8128701b1022e72e055b6be4d8dbf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sandor=20Sz=C3=BCcs?= Date: Wed, 4 Sep 2024 16:20:19 +0200 Subject: [PATCH 1/3] sevehttp handler MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Sandor Szücs --- routing/endpointregistry.go | 12 ++++++++++++ skipper.go | 2 ++ 2 files changed, 14 insertions(+) diff --git a/routing/endpointregistry.go b/routing/endpointregistry.go index 4609d7bc96..07523123d4 100644 --- a/routing/endpointregistry.go +++ b/routing/endpointregistry.go @@ -1,6 +1,7 @@ package routing import ( + "net/http" "sync" "sync/atomic" "time" @@ -194,6 +195,17 @@ func (r *EndpointRegistry) Close() { close(r.quit) } +func (r *EndpointRegistry) ServeHTTP(w http.ResponseWriter, req *http.Request) { + switch req.Method { + case http.MethodGet: + w.WriteHeader(http.StatusOK) + w.Write([]byte("OK")) + case http.MethodPut: + w.WriteHeader(http.StatusAccepted) + w.Write([]byte("Accepted")) + } +} + func NewEndpointRegistry(o RegistryOptions) *EndpointRegistry { if o.LastSeenTimeout == 0 { o.LastSeenTimeout = defaultLastSeenTimeout diff --git a/skipper.go b/skipper.go index 83a12a673a..c7b005390d 100644 --- a/skipper.go +++ b/skipper.go @@ -2102,6 +2102,8 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error { mux.Handle("/debug/pprof", metricsHandler) mux.Handle("/debug/pprof/", metricsHandler) + mux.Handle("/endpoints", endpointRegistry) + log.Infof("support listener on %s", supportListener) go func() { /* #nosec */ From 5ebcdb59a913af7bf958ef16587206423e98b9d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sandor=20Sz=C3=BCcs?= Date: Tue, 10 Sep 2024 12:47:24 +0200 Subject: [PATCH 2/3] safe work MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Sandor Szücs --- routing/endpointregistry.go | 104 ++++++++++++++++++++++++++++++++++-- skipper.go | 4 +- 2 files changed, 102 insertions(+), 6 deletions(-) diff --git a/routing/endpointregistry.go b/routing/endpointregistry.go index 07523123d4..e678dcbefc 100644 --- a/routing/endpointregistry.go +++ b/routing/endpointregistry.go @@ -1,11 +1,15 @@ package routing import ( + "encoding/json" + "fmt" + "io" "net/http" "sync" "sync/atomic" "time" + "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus" "github.com/zalando/skipper/eskip" @@ -106,6 +110,8 @@ type EndpointRegistry struct { now func() time.Time data sync.Map // map[string]*entry + + members sync.Map // map[string][]string, for example: kube-service -> endpoints } var _ PostProcessor = &EndpointRegistry{} @@ -195,17 +201,105 @@ func (r *EndpointRegistry) Close() { close(r.quit) } -func (r *EndpointRegistry) ServeHTTP(w http.ResponseWriter, req *http.Request) { +// Members is a structure for JSON parsing members for the LIST endpoints +// +// {"len": 2, "members": [ +// {"foo": +// {"len": 2, "member": ["10.0.1.23:8000", "10.0.1.25:9000"]} +type Members struct { + Len int `json:"len"` + Items []*Member `json:"items"` +} + +// Member is a structure for JSON parsing members for the PUT endpoint +// +// {"Len": 2, "Member": ["10.0.1.23:8000", "10.0.1.25:9000"]} +type Member struct { + Len int `json:"len"` + Name string `json:"name,omitempty"` + Member []string `json:"member"` +} + +func (r *EndpointRegistry) ListHandler(w http.ResponseWriter, req *http.Request) { switch req.Method { case http.MethodGet: w.WriteHeader(http.StatusOK) - w.Write([]byte("OK")) - case http.MethodPut: - w.WriteHeader(http.StatusAccepted) - w.Write([]byte("Accepted")) + + var ( + result Members + count int + ) + r.members.Range(func(svcName, value any) bool { + m := value.(*Member) + count++ + result.Items = append(result.Items, m) + return true + }) + + result.Len = count + items, err := json.Marshal(result) + if err != nil { + logrus.Errorf("Failed to marshal members: %v", err) + w.WriteHeader(http.StatusNotFound) + w.Write([]byte(http.StatusText(http.StatusNotFound))) + return + } + + w.Write([]byte(fmt.Sprintf(`%s`, items))) } } +func (r *EndpointRegistry) GetHandler(w http.ResponseWriter, req *http.Request) { + switch req.Method { + case http.MethodGet: + svcName := req.PathValue("svc") + val, ok := r.members.Load(svcName) + if !ok { + w.WriteHeader(http.StatusNotFound) + w.Write([]byte(http.StatusText(http.StatusNotFound))) + return + } + + member := val.(*Member) + data, err := json.Marshal(member) + if err != nil { + log.Errorf("Failed to marshal GET %q: %v", svcName, err) + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte(http.StatusText(http.StatusInternalServerError))) + return + } + + w.WriteHeader(http.StatusOK) + w.Write(data) + } +} + +func (r *EndpointRegistry) PutHandler(w http.ResponseWriter, req *http.Request) { + data, err := io.ReadAll(req.Body) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(http.StatusText(http.StatusBadRequest))) + return + } + + var m Member + err = json.Unmarshal(data, &m) + if err != nil { + logrus.Infof("Failed to unmarshal members: %v", err) + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(http.StatusText(http.StatusBadRequest))) + return + } + + svcName := req.PathValue("svc") + m.Name = svcName + r.members.Store(svcName, &m) + logrus.Infof("PUT: stored %s -> %+v", svcName, m) + + w.WriteHeader(http.StatusAccepted) + w.Write([]byte(http.StatusText(http.StatusAccepted))) +} + func NewEndpointRegistry(o RegistryOptions) *EndpointRegistry { if o.LastSeenTimeout == 0 { o.LastSeenTimeout = defaultLastSeenTimeout diff --git a/skipper.go b/skipper.go index c7b005390d..771bc80205 100644 --- a/skipper.go +++ b/skipper.go @@ -2102,7 +2102,9 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error { mux.Handle("/debug/pprof", metricsHandler) mux.Handle("/debug/pprof/", metricsHandler) - mux.Handle("/endpoints", endpointRegistry) + mux.HandleFunc("GET /endpoints", endpointRegistry.ListHandler) + mux.HandleFunc("GET /endpoints/{svc}", endpointRegistry.GetHandler) + mux.HandleFunc("PUT /endpoints/{svc}", endpointRegistry.PutHandler) log.Infof("support listener on %s", supportListener) go func() { From 640de804beed3fd29e6acd3af05fa20eecee588f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sandor=20Sz=C3=BCcs?= Date: Wed, 18 Sep 2024 21:42:10 +0200 Subject: [PATCH 3/3] fix imports MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Sandor Szücs --- routing/endpointregistry.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/routing/endpointregistry.go b/routing/endpointregistry.go index e678dcbefc..221ca45262 100644 --- a/routing/endpointregistry.go +++ b/routing/endpointregistry.go @@ -9,7 +9,6 @@ import ( "sync/atomic" "time" - "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus" "github.com/zalando/skipper/eskip" @@ -239,7 +238,7 @@ func (r *EndpointRegistry) ListHandler(w http.ResponseWriter, req *http.Request) result.Len = count items, err := json.Marshal(result) if err != nil { - logrus.Errorf("Failed to marshal members: %v", err) + log.Errorf("Failed to marshal members: %v", err) w.WriteHeader(http.StatusNotFound) w.Write([]byte(http.StatusText(http.StatusNotFound))) return @@ -285,7 +284,7 @@ func (r *EndpointRegistry) PutHandler(w http.ResponseWriter, req *http.Request) var m Member err = json.Unmarshal(data, &m) if err != nil { - logrus.Infof("Failed to unmarshal members: %v", err) + log.Infof("Failed to unmarshal members: %v", err) w.WriteHeader(http.StatusBadRequest) w.Write([]byte(http.StatusText(http.StatusBadRequest))) return @@ -294,7 +293,7 @@ func (r *EndpointRegistry) PutHandler(w http.ResponseWriter, req *http.Request) svcName := req.PathValue("svc") m.Name = svcName r.members.Store(svcName, &m) - logrus.Infof("PUT: stored %s -> %+v", svcName, m) + log.Infof("PUT: stored %s -> %+v", svcName, m) w.WriteHeader(http.StatusAccepted) w.Write([]byte(http.StatusText(http.StatusAccepted)))