Skip to content

Commit

Permalink
Mirror remaining Yunikorn core APIs. (#401)
Browse files Browse the repository at this point in the history
* implement mirror for /ws/v1/clusters core API.

* implement mirror for ws/v1/scheduler/healthcheck core API.

* implement mirror for ws/v1/scheduler/node-utilizations core API.
  • Loading branch information
sudiptob2 authored Jan 20, 2025
1 parent 8c43100 commit 4007aaa
Show file tree
Hide file tree
Showing 7 changed files with 317 additions and 6 deletions.
2 changes: 1 addition & 1 deletion cmd/unicorn-history-server/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func Run(ctx context.Context, cfg *config.Config) error {

healthService := health.New(info.Version, health.NewYunikornComponent(client), health.NewPostgresComponent(pool))

ws := webservice.NewWebService(cfg.UHSConfig, mainRepository, eventRepository, healthService)
ws := webservice.NewWebService(cfg.UHSConfig, mainRepository, eventRepository, healthService, client)
g.Add(
func() error {
return ws.Start(ctx)
Expand Down
73 changes: 68 additions & 5 deletions internal/webservice/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const (
routeAppsHistory = "/api/v1/history/apps"
routeContainersHistory = "/api/v1/history/containers"
routeNodesPerPartition = "/api/v1/partition/{partition_id}/nodes"
routeNodeUtilizations = "/api/v1/scheduler/node-utilizations"
routeSchedulerHealthcheck = "/api/v1/scheduler/healthcheck"
routeEventStatistics = "/api/v1/event-statistics"
routeHealthLiveness = "/api/v1/health/liveness"
Expand All @@ -42,7 +43,15 @@ var startupTime = time.Now()

func (ws *WebService) init(ctx context.Context) {
service := new(restful.WebService)

service.Route(
service.GET(routeClusters).
To(ws.getClusters).
Produces(restful.MIME_JSON).
Writes([]*dao.ClusterDAOInfo{}).
Returns(200, "OK", []*dao.ClusterDAOInfo{}).
Returns(500, "Internal Server Error", ProblemDetails{}).
Doc("Get cluster information"),
)
service.Route(
service.GET(routePartitions).
To(ws.getPartitions).
Expand Down Expand Up @@ -172,14 +181,23 @@ func (ws *WebService) init(ctx context.Context) {
Returns(500, "Internal Server Error", ProblemDetails{}).
Doc("Get event statistics"),
)
service.Route(
service.GET(routeNodeUtilizations).
To(ws.getNodeUtilizations).
Produces(restful.MIME_JSON).
Writes([]*dao.PartitionNodesUtilDAOInfo{}).
Returns(200, "OK", []*dao.PartitionNodesUtilDAOInfo{}).
Returns(500, "Internal Server Error", ProblemDetails{}).
Doc("Get node utilization information"),
)
service.Route(
service.GET(routeSchedulerHealthcheck).
To(ws.LivenessHealthcheck).
To(ws.schedulerHealthcheck).
Produces(restful.MIME_JSON).
Writes(health.LivenessStatus{}).
Returns(200, "OK", health.LivenessStatus{}).
Writes(dao.SchedulerHealthDAOInfo{}).
Returns(200, "OK", dao.SchedulerHealthDAOInfo{}).
Returns(500, "Internal Server Error", ProblemDetails{}).
Doc("Scheduler liveness healthcheck"),
Doc("Scheduler healthcheck"),
)
service.Route(
service.GET(routeHealthLiveness).
Expand Down Expand Up @@ -387,6 +405,21 @@ func (ws *WebService) getNodesPerPartition(req *restful.Request, resp *restful.R
jsonResponse(resp, nodes)
}

func (ws *WebService) getClusters(req *restful.Request, resp *restful.Response) {
// mirror of yunikorn-core ws/v1/clusters
ctx := req.Request.Context()
clusters, err := ws.client.GetClusters(ctx)
if err != nil {
errorResponse(req, resp, err)
return
}
if clusters == nil {
notFoundResponse(req, resp, fmt.Errorf("no cluster found"))
return
}
jsonResponse(resp, clusters)
}

func (ws *WebService) getAppsHistory(req *restful.Request, resp *restful.Response) {
ctx := req.Request.Context()
filters, err := parseHistoryFilters(req.Request)
Expand Down Expand Up @@ -436,6 +469,36 @@ func (ws *WebService) getEventStatistics(req *restful.Request, resp *restful.Res
jsonResponse(resp, counts)
}

func (ws *WebService) getNodeUtilizations(req *restful.Request, resp *restful.Response) {
// mirror of yunikorn-core ws/v1/scheduler/node-utilizations
ctx := req.Request.Context()
nu, err := ws.client.NodeUtilizations(ctx)
if err != nil {
errorResponse(req, resp, err)
return
}
if nu == nil {
notFoundResponse(req, resp, fmt.Errorf("no node utilizations data found"))
return
}
jsonResponse(resp, nu)
}

func (ws *WebService) schedulerHealthcheck(req *restful.Request, resp *restful.Response) {
// mirror of yunikorn-core ws/v1/scheduler/healthcheck
ctx := req.Request.Context()
healthCheck, err := ws.client.Healthcheck(ctx)
if err != nil {
errorResponse(req, resp, err)
return
}
if healthCheck == nil {
notFoundResponse(req, resp, fmt.Errorf("no healthcheck data found"))
return
}
jsonResponse(resp, healthCheck)
}

func (ws *WebService) LivenessHealthcheck(req *restful.Request, resp *restful.Response) {
ctx := req.Request.Context()
jsonResponse(resp, ws.healthService.Liveness(ctx))
Expand Down
172 changes: 172 additions & 0 deletions internal/webservice/routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/G-Research/unicorn-history-server/internal/database/repository"
"github.com/G-Research/unicorn-history-server/internal/yunikorn"
"github.com/G-Research/yunikorn-core/pkg/webservice/dao"
"github.com/emicklei/go-restful/v3"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -545,3 +546,174 @@ func TestGetContainersHistory(t *testing.T) {
})
}
}

func TestGetCluster(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockClient := yunikorn.NewMockClient(ctrl)

tests := []struct {
name string
expectedClusters []*dao.ClusterDAOInfo
expectedStatus int
}{
{
name: "ClusterInfo exists",
expectedClusters: []*dao.ClusterDAOInfo{
{
StartTime: time.Now().UnixNano(),
ClusterName: "cluster1",
PartitionName: "default",
},
},
expectedStatus: http.StatusOK,
},
{
name: "No Cluster found",
expectedClusters: nil,
expectedStatus: http.StatusNotFound,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mockClient.EXPECT().
GetClusters(gomock.Any()).
Return(tt.expectedClusters, nil)

ws := &WebService{client: mockClient}

req, err := http.NewRequest(http.MethodGet, "/api/v1/clusters", nil)
require.NoError(t, err)

rr := httptest.NewRecorder()

ws.getClusters(restful.NewRequest(req), restful.NewResponse(rr))
require.Equal(t, tt.expectedStatus, rr.Code)
})
}
}

func TestSchedulerHealthcheck(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockClient := yunikorn.NewMockClient(ctrl)

tests := []struct {
name string
expected *dao.SchedulerHealthDAOInfo
expectedStatus int
}{
{
name: "SchedulerHealthDAOInfo exists",
expected: &dao.SchedulerHealthDAOInfo{
Healthy: true,
HealthChecks: []dao.HealthCheckInfo{
{
Name: "Scheduling errors",
Succeeded: true,
Description: "Check for scheduling error entries in metrics",
DiagnosisMessage: "There were 0 scheduling errors logged in the metrics",
},
{
Name: "Failed nodes",
Succeeded: true,
Description: "Check for failed nodes entries in metrics",
DiagnosisMessage: "There were 0 failed nodes logged in the metrics",
},
},
},
expectedStatus: http.StatusOK,
},
{
name: "No SchedulerHealthDAOInfo found",
expected: nil,
expectedStatus: http.StatusNotFound,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mockClient.EXPECT().
Healthcheck(gomock.Any()).
Return(tt.expected, nil)

ws := &WebService{client: mockClient}

req, err := http.NewRequest(http.MethodGet, "/api/v1/scheduler/healthcheck", nil)
require.NoError(t, err)

rr := httptest.NewRecorder()

ws.schedulerHealthcheck(restful.NewRequest(req), restful.NewResponse(rr))
require.Equal(t, tt.expectedStatus, rr.Code)
})
}
}

func TestNodeUtilizations(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockClient := yunikorn.NewMockClient(ctrl)

tests := []struct {
name string
expected []*dao.PartitionNodesUtilDAOInfo
expectedStatus int
}{
{
name: "NodeUtilizations exist",
expected: []*dao.PartitionNodesUtilDAOInfo{
{
ClusterID: "mycluster",
Partition: "default",
NodesUtilList: []*dao.NodesUtilDAOInfo{
{
ResourceType: "memory",
NodesUtil: []*dao.NodeUtilDAOInfo{
{
BucketName: "0-10%",
NumOfNodes: 1,
NodeNames: []string{"uhs-control-plane"},
},
{BucketName: "10-20%"},
{BucketName: "20-30%"},
{BucketName: "30-40%"},
{BucketName: "40-50%"},
{BucketName: "50-60%"},
{BucketName: "60-70%"},
{BucketName: "70-80%"},
{BucketName: "80-90%"},
{BucketName: "90-100%"},
},
},
},
},
},
expectedStatus: http.StatusOK,
},
{
name: "No NodeUtilizations found",
expected: nil,
expectedStatus: http.StatusNotFound,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mockClient.EXPECT().
NodeUtilizations(gomock.Any()).
Return(tt.expected, nil)

ws := &WebService{client: mockClient}

req, err := http.NewRequest(http.MethodGet, "/api/v1/scheduler/node-utilizations", nil)
require.NoError(t, err)

rr := httptest.NewRecorder()

ws.getNodeUtilizations(restful.NewRequest(req), restful.NewResponse(rr))
require.Equal(t, tt.expectedStatus, rr.Code)
})
}
}
4 changes: 4 additions & 0 deletions internal/webservice/webservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/G-Research/unicorn-history-server/internal/database/repository"
"github.com/G-Research/unicorn-history-server/internal/health"
"github.com/G-Research/unicorn-history-server/internal/log"
"github.com/G-Research/unicorn-history-server/internal/yunikorn"
)

type WebService struct {
Expand All @@ -18,13 +19,15 @@ type WebService struct {
eventRepository repository.EventRepository
healthService health.Interface
config config.UHSConfig
client yunikorn.Client
}

func NewWebService(
cfg config.UHSConfig,
repository repository.Repository,
eventRepository repository.EventRepository,
healthService health.Interface,
client yunikorn.Client,
) *WebService {
return &WebService{
server: &http.Server{
Expand All @@ -35,6 +38,7 @@ func NewWebService(
eventRepository: eventRepository,
healthService: healthService,
config: cfg,
client: client,
}
}

Expand Down
2 changes: 2 additions & 0 deletions internal/yunikorn/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,6 @@ type Client interface {
GetContainersHistory(ctx context.Context) ([]*dao.ContainerHistoryDAOInfo, error)
GetEventStream(ctx context.Context) (*http.Response, error)
Healthcheck(ctx context.Context) (*dao.SchedulerHealthDAOInfo, error)
NodeUtilizations(ctx context.Context) ([]*dao.PartitionNodesUtilDAOInfo, error)
GetClusters(ctx context.Context) ([]*dao.ClusterDAOInfo, error)
}
30 changes: 30 additions & 0 deletions internal/yunikorn/mock_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 4007aaa

Please sign in to comment.