diff --git a/routing/endpointregistry.go b/routing/endpointregistry.go index 4609d7bc96..221ca45262 100644 --- a/routing/endpointregistry.go +++ b/routing/endpointregistry.go @@ -1,6 +1,10 @@ package routing import ( + "encoding/json" + "fmt" + "io" + "net/http" "sync" "sync/atomic" "time" @@ -105,6 +109,8 @@ type EndpointRegistry struct { now func() time.Time data sync.Map // map[string]*entry + + members sync.Map // map[string][]string, for example: kube-service -> endpoints } var _ PostProcessor = &EndpointRegistry{} @@ -194,6 +200,105 @@ func (r *EndpointRegistry) Close() { close(r.quit) } +// Members is a structure for JSON parsing members for the LIST endpoints +// +// {"len": 2, "members": [ +// {"foo": +// {"len": 2, "member": ["10.0.1.23:8000", "10.0.1.25:9000"]} +type Members struct { + Len int `json:"len"` + Items []*Member `json:"items"` +} + +// Member is a structure for JSON parsing members for the PUT endpoint +// +// {"Len": 2, "Member": ["10.0.1.23:8000", "10.0.1.25:9000"]} +type Member struct { + Len int `json:"len"` + Name string `json:"name,omitempty"` + Member []string `json:"member"` +} + +func (r *EndpointRegistry) ListHandler(w http.ResponseWriter, req *http.Request) { + switch req.Method { + case http.MethodGet: + w.WriteHeader(http.StatusOK) + + var ( + result Members + count int + ) + r.members.Range(func(svcName, value any) bool { + m := value.(*Member) + count++ + result.Items = append(result.Items, m) + return true + }) + + result.Len = count + items, err := json.Marshal(result) + if err != nil { + log.Errorf("Failed to marshal members: %v", err) + w.WriteHeader(http.StatusNotFound) + w.Write([]byte(http.StatusText(http.StatusNotFound))) + return + } + + w.Write([]byte(fmt.Sprintf(`%s`, items))) + } +} + +func (r *EndpointRegistry) GetHandler(w http.ResponseWriter, req *http.Request) { + switch req.Method { + case http.MethodGet: + svcName := req.PathValue("svc") + val, ok := r.members.Load(svcName) + if !ok { + w.WriteHeader(http.StatusNotFound) + w.Write([]byte(http.StatusText(http.StatusNotFound))) + return + } + + member := val.(*Member) + data, err := json.Marshal(member) + if err != nil { + log.Errorf("Failed to marshal GET %q: %v", svcName, err) + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte(http.StatusText(http.StatusInternalServerError))) + return + } + + w.WriteHeader(http.StatusOK) + w.Write(data) + } +} + +func (r *EndpointRegistry) PutHandler(w http.ResponseWriter, req *http.Request) { + data, err := io.ReadAll(req.Body) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(http.StatusText(http.StatusBadRequest))) + return + } + + var m Member + err = json.Unmarshal(data, &m) + if err != nil { + log.Infof("Failed to unmarshal members: %v", err) + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(http.StatusText(http.StatusBadRequest))) + return + } + + svcName := req.PathValue("svc") + m.Name = svcName + r.members.Store(svcName, &m) + log.Infof("PUT: stored %s -> %+v", svcName, m) + + w.WriteHeader(http.StatusAccepted) + w.Write([]byte(http.StatusText(http.StatusAccepted))) +} + func NewEndpointRegistry(o RegistryOptions) *EndpointRegistry { if o.LastSeenTimeout == 0 { o.LastSeenTimeout = defaultLastSeenTimeout diff --git a/skipper.go b/skipper.go index 83a12a673a..771bc80205 100644 --- a/skipper.go +++ b/skipper.go @@ -2102,6 +2102,10 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error { mux.Handle("/debug/pprof", metricsHandler) mux.Handle("/debug/pprof/", metricsHandler) + mux.HandleFunc("GET /endpoints", endpointRegistry.ListHandler) + mux.HandleFunc("GET /endpoints/{svc}", endpointRegistry.GetHandler) + mux.HandleFunc("PUT /endpoints/{svc}", endpointRegistry.PutHandler) + log.Infof("support listener on %s", supportListener) go func() { /* #nosec */