Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 841. Schedule for cluster node. Part 1 #969

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion api/src/main/java/com/epam/pipeline/app/AppConfiguration.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2019 EPAM Systems, Inc. (https://www.epam.com/)
* Copyright 2017-2020 EPAM Systems, Inc. (https://www.epam.com/)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@
package com.epam.pipeline.app;

import com.epam.pipeline.common.MessageHelper;
import com.epam.pipeline.entity.cluster.ClusterNodeScaleAction;
import com.epam.pipeline.manager.scheduling.AutowiringSpringBeanJobFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
Expand All @@ -35,8 +36,10 @@
import org.springframework.security.concurrent.DelegatingSecurityContextExecutor;
import org.springframework.web.filter.CommonsRequestLoggingFilter;

import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

@Configuration
@EnableScheduling
Expand All @@ -61,6 +64,11 @@ public MessageHelper messageHelper() {
return new MessageHelper(messageSource());
}

@Bean
public Queue<ClusterNodeScaleAction> clusterNodeScaleQueue() {
return new LinkedBlockingQueue<>();
}

@Bean
public ResourceBundleMessageSource messageSource() {
ResourceBundleMessageSource messageSource = new ResourceBundleMessageSource();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@
import com.epam.pipeline.controller.AbstractRestController;
import com.epam.pipeline.controller.Result;
import com.epam.pipeline.controller.vo.FilterNodesVO;
import com.epam.pipeline.controller.vo.cluster.ClusterNodeScheduleVO;
import com.epam.pipeline.entity.cluster.AllowedInstanceAndPriceTypes;
import com.epam.pipeline.entity.cluster.ClusterNodeSchedule;
import com.epam.pipeline.entity.cluster.FilterPodsRequest;
import com.epam.pipeline.entity.cluster.InstanceType;
import com.epam.pipeline.entity.cluster.MasterNode;
import com.epam.pipeline.entity.cluster.NodeInstance;
import com.epam.pipeline.entity.cluster.monitoring.MonitoringStats;
import com.epam.pipeline.manager.cluster.ClusterApiService;
import com.epam.pipeline.manager.cluster.NodeScheduleApiService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
Expand All @@ -34,8 +37,11 @@
import org.springframework.format.annotation.DateTimeFormat;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
Expand All @@ -61,6 +67,7 @@ public class ClusterController extends AbstractRestController {
private static final String DATE_TIME_FORMAT = "yyyy-MM-dd HH:mm:ss";

private final ClusterApiService clusterApiService;
private final NodeScheduleApiService nodeScheduleApiService;

@GetMapping(value = "/cluster/master")
@ResponseBody
Expand Down Expand Up @@ -224,4 +231,75 @@ public void downloadNodeUsageStatisticsReport(
final String reportName = String.format("%s_%s-%s-%s", name, from, to, interval);
writeStreamToResponse(response, inputStream, String.format("%s.%s", reportName, "csv"));
}

@PostMapping(value = "/cluster/schedule")
@ResponseBody
@ApiOperation(
value = "Schedule to run or stop batch of node with specific settings",
notes = "Schedule to run or stop batch of node with specific settings",
produces = MediaType.APPLICATION_JSON_VALUE
)
@ApiResponses(
value = {@ApiResponse(code = HTTP_STATUS_OK, message = API_STATUS_DESCRIPTION)}
)
public Result<ClusterNodeSchedule> createClusterNodeSchedule(@RequestBody ClusterNodeScheduleVO actionVO) {
return Result.success(nodeScheduleApiService.createClusterNodeSchedule(actionVO));
}

@DeleteMapping(value = "/cluster/schedule/{id}")
@ResponseBody
@ApiOperation(
value = "Delete schedule for specific type of node",
notes = "Delete schedule for specific type of node",
produces = MediaType.APPLICATION_JSON_VALUE
)
@ApiResponses(
value = {@ApiResponse(code = HTTP_STATUS_OK, message = API_STATUS_DESCRIPTION)}
)
public Result deleteClusterNodeSchedule(@PathVariable Long id, @RequestParam(required = false) Long scheduleId) {
nodeScheduleApiService.deleteClusterNodeSchedule(id, scheduleId);
return Result.success();
}

@GetMapping(value = "/cluster/schedule/")
@ResponseBody
@ApiOperation(
value = "Get all Schedules of batch node settings",
notes = "Get all Schedules of batch node settings",
produces = MediaType.APPLICATION_JSON_VALUE
)
@ApiResponses(
value = {@ApiResponse(code = HTTP_STATUS_OK, message = API_STATUS_DESCRIPTION)}
)
public Result<List<ClusterNodeSchedule>> loadClusterNodeSchedule() {
return Result.success(nodeScheduleApiService.loadClusterNodeSchedule());
}

@GetMapping(value = "/cluster/schedule/{id}")
@ResponseBody
@ApiOperation(
value = "Get all Schedules specific batch node settings",
notes = "Get all Schedules specific batch node settings",
produces = MediaType.APPLICATION_JSON_VALUE
)
@ApiResponses(
value = {@ApiResponse(code = HTTP_STATUS_OK, message = API_STATUS_DESCRIPTION)}
)
public Result<ClusterNodeSchedule> loadClusterNodeSchedule(@PathVariable Long id) {
return Result.success(nodeScheduleApiService.loadClusterNodeSchedule(id));
}

@PutMapping(value = "/cluster/schedule/")
@ResponseBody
@ApiOperation(
value = "Update schedules for specific batch node settings",
notes = "Update schedules for specific batch node settings",
produces = MediaType.APPLICATION_JSON_VALUE
)
@ApiResponses(
value = {@ApiResponse(code = HTTP_STATUS_OK, message = API_STATUS_DESCRIPTION)}
)
public Result<ClusterNodeSchedule> updateClusterNodeSchedule(@RequestBody ClusterNodeScheduleVO scheduleVO) {
return Result.success(nodeScheduleApiService.updateClusterNodeSchedule(scheduleVO));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2017-2020 EPAM Systems, Inc. (https://www.epam.com/)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.epam.pipeline.controller.vo.cluster;

import com.epam.pipeline.controller.vo.PipelineRunScheduleVO;
import com.epam.pipeline.entity.cluster.ClusterNodeScale;
import lombok.Data;

import java.util.List;

@Data
public class ClusterNodeScheduleVO {
private ClusterNodeScale clusterNodeScale;
private List<PipelineRunScheduleVO> schedule;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Copyright 2017-2019 EPAM Systems, Inc. (https://www.epam.com/)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.epam.pipeline.dao.cluster;

import com.epam.pipeline.dao.DaoHelper;
import com.epam.pipeline.entity.cluster.ClusterNodeScale;
import com.epam.pipeline.entity.pipeline.RunInstance;
import org.apache.commons.collections4.ListUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcDaoSupport;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;

public class ClusterNodeScaleDao extends NamedParameterJdbcDaoSupport {

@Autowired
private DaoHelper daoHelper;

private String clusterNodeScaleSequence;
private String createClusterNodeScaleQuery;
private String loadClusterNodeScaleQuery;
private String loadAllClusterNodeScaleQuery;
private String updateClusterNodeScaleQuery;
private String deleteClusterNodeScaleQuery;


@Transactional(propagation = Propagation.MANDATORY)
public Long createNextFreeNodeId() {
return daoHelper.createId(clusterNodeScaleSequence);
}

@Transactional(propagation = Propagation.MANDATORY)
public void createClusterNodeScale(final ClusterNodeScale clusterNodeScale) {
final Long nextBatchNodeSpecId = createNextFreeNodeId();
clusterNodeScale.setId(nextBatchNodeSpecId);

getNamedParameterJdbcTemplate()
.update(createClusterNodeScaleQuery, ClusterNodeScaleParameters.getParameters(clusterNodeScale));
}

@Transactional(propagation = Propagation.SUPPORTS)
public ClusterNodeScale loadClusterNodeScale(final Long id) {
List<ClusterNodeScale> result = getJdbcTemplate()
.query(loadClusterNodeScaleQuery, ClusterNodeScaleParameters.getRowMapper(), id);
return result.isEmpty() ? null : result.iterator().next();
}

@Transactional(propagation = Propagation.SUPPORTS)
public List<ClusterNodeScale> loadClusterNodeScale() {
List<ClusterNodeScale> result = getJdbcTemplate()
.query(loadAllClusterNodeScaleQuery, ClusterNodeScaleParameters.getRowMapper());
return ListUtils.emptyIfNull(result);
}


@Transactional(propagation = Propagation.MANDATORY)
public ClusterNodeScale updateClusterNodeScale(final ClusterNodeScale clusterNodeScale) {
final MapSqlParameterSource params = ClusterNodeScaleParameters.getParameters(clusterNodeScale);
getNamedParameterJdbcTemplate().update(updateClusterNodeScaleQuery, params);
return clusterNodeScale;
}

@Transactional(propagation = Propagation.MANDATORY)
public void deleteClusterNodeScale(final Long id) {
getJdbcTemplate().update(deleteClusterNodeScaleQuery, id);
}

enum ClusterNodeScaleParameters {
ID,
INSTANCE_TYPE,
NODE_DISK,
IS_SPOT,
CLOUD_REGION_ID,
NUMBER_OF_INSTANCES;

static MapSqlParameterSource getParameters(ClusterNodeScale clusterNodeScale) {
MapSqlParameterSource params = new MapSqlParameterSource();

params.addValue(ID.name(), clusterNodeScale.getId());
params.addValue(INSTANCE_TYPE.name(), clusterNodeScale.getInstance().getNodeType());
params.addValue(NUMBER_OF_INSTANCES.name(), clusterNodeScale.getNumberOfInstances());
params.addValue(NODE_DISK.name(), clusterNodeScale.getInstance().getNodeDisk());
params.addValue(CLOUD_REGION_ID.name(), clusterNodeScale.getInstance().getCloudRegionId());
params.addValue(IS_SPOT.name(), clusterNodeScale.getInstance().getSpot());
return params;
}

static RowMapper<ClusterNodeScale> getRowMapper() {
return (rs, rowNum) -> {
ClusterNodeScale clusterNodeScale = new ClusterNodeScale();
clusterNodeScale.setId(rs.getLong(ID.name()));
clusterNodeScale.setInstance(getRunInstance(rs));
clusterNodeScale.setNumberOfInstances(rs.getInt(NUMBER_OF_INSTANCES.name()));
return clusterNodeScale;
};
}

private static RunInstance getRunInstance(final ResultSet rs) throws SQLException {
final RunInstance instance = new RunInstance();
instance.setNodeType(rs.getString(INSTANCE_TYPE.name()));
instance.setNodeDisk(rs.getInt(NODE_DISK.name()));
instance.setCloudRegionId(rs.getLong(CLOUD_REGION_ID.name()));
instance.setSpot(rs.getBoolean(IS_SPOT.name()));
return instance;
}
}

@Required
public void setClusterNodeScaleSequence(String clusterNodeScaleSequence) {
this.clusterNodeScaleSequence = clusterNodeScaleSequence;
}

@Required
public void setCreateClusterNodeScaleQuery(final String createClusterNodeScaleQuery) {
this.createClusterNodeScaleQuery = createClusterNodeScaleQuery;
}

@Required
public void setLoadClusterNodeScaleQuery(final String loadClusterNodeScaleQuery) {
this.loadClusterNodeScaleQuery = loadClusterNodeScaleQuery;
}

@Required
public void setLoadAllClusterNodeScaleQuery(final String loadAllClusterNodeScaleQuery) {
this.loadAllClusterNodeScaleQuery = loadAllClusterNodeScaleQuery;
}

@Required
public void setUpdateClusterNodeScaleQuery(final String updateClusterNodeScaleQuery) {
this.updateClusterNodeScaleQuery = updateClusterNodeScaleQuery;
}

@Required
public void setDeleteClusterNodeScaleQuery(final String deleteClusterNodeScaleQuery) {
this.deleteClusterNodeScaleQuery = deleteClusterNodeScaleQuery;
}
}
Loading