Skip to content

Commit

Permalink
Issue #3864: [API] Allow to show engine tasks stats on run details pa…
Browse files Browse the repository at this point in the history
…ge - persist tasks method (#3867)
  • Loading branch information
ekazachkova authored Jan 20, 2025
1 parent de78430 commit c649f83
Show file tree
Hide file tree
Showing 16 changed files with 501 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.epam.pipeline.entity.pipeline.RunInstance;
import com.epam.pipeline.entity.pipeline.RunLog;
import com.epam.pipeline.entity.pipeline.TaskStatus;
import com.epam.pipeline.entity.pipeline.run.EngineRunTask;
import com.epam.pipeline.entity.pipeline.run.PipeRunCmdStartVO;
import com.epam.pipeline.entity.pipeline.run.PipelineStart;
import com.epam.pipeline.entity.pipeline.run.RunChartInfo;
Expand All @@ -56,6 +57,7 @@
import com.epam.pipeline.manager.cluster.InstanceOfferManager;
import com.epam.pipeline.manager.filter.FilterManager;
import com.epam.pipeline.manager.filter.WrongFilterException;
import com.epam.pipeline.manager.pipeline.EngineRunTaskService;
import com.epam.pipeline.manager.pipeline.ArchiveRunService;
import com.epam.pipeline.manager.pipeline.PipelineRunAsManager;
import com.epam.pipeline.manager.pipeline.PipelineRunCRUDService;
Expand Down Expand Up @@ -110,6 +112,7 @@ public class RunApiService {
private final RunPermissionManager runPermissionManager;
private final EdgeServiceManager edgeServiceManager;
private final ArchiveRunService archiveRunService;
private final EngineRunTaskService engineRunTaskService;

@AclMask
@QuotaLaunchCheck
Expand Down Expand Up @@ -419,4 +422,9 @@ public RunRuntimeData getPipelineRunRuntimeData(final Long runId, final RunSyncR
final Map<String, String> parameters) {
return runRuntimeDataManager.getPipelineRunRuntimeData(runId, type, parameters);
}

@PreAuthorize(RUN_ID_EXECUTE)
public int consumeRunEngineTaskEvents(final Long runId, final List<EngineRunTask> tasks) {
return engineRunTaskService.upsertTasks(runId, tasks);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ public final class MessageConstants {
public static final String ERROR_ARCHIVE_RUN_METADATA_NOT_FOUND = "error.archive.run.metadata.not.found";
public static final String ERROR_ARCHIVE_RUN_METADATA_NOT_NUMERIC = "error.archive.run.metadata.not.numeric";
public static final String ERROR_MAX_PAGE_SIZE_EXCEEDED = "error.max.page.size.exceeded";
public static final String ERROR_ENGINE_RUN_TASK_SETTING_NOT_FOUND = "error.engine.run.task.setting.not.found";

//Run schedule
public static final String CRON_EXPRESSION_IS_NOT_PROVIDED = "cron.expression.is.not.provided";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.epam.pipeline.entity.pipeline.PipelineTask;
import com.epam.pipeline.entity.pipeline.RunInstance;
import com.epam.pipeline.entity.pipeline.RunLog;
import com.epam.pipeline.entity.pipeline.run.EngineRunTask;
import com.epam.pipeline.entity.pipeline.run.PipeRunCmdStartVO;
import com.epam.pipeline.entity.pipeline.run.PipelineStart;
import com.epam.pipeline.entity.pipeline.run.RunChartInfo;
Expand Down Expand Up @@ -666,4 +667,15 @@ public Result<RunRuntimeData> getPipelineRunData(
@RequestBody(required = false) final Map<String, String> parameters) {
return Result.success(runApiService.getPipelineRunRuntimeData(runId, type, parameters));
}

@PostMapping("/run/{runId}/engine/tasks")
@ApiOperation(
value = "Consumes engine task events for run",
notes = "Consumes engine task events for run",
produces = MediaType.APPLICATION_JSON_VALUE)
@ApiResponses(value = {@ApiResponse(code = HTTP_STATUS_OK, message = API_STATUS_DESCRIPTION)})
public Result<Integer> consumeRunEngineTaskEvents(@PathVariable(value = RUN_ID) final Long runId,
@RequestBody final List<EngineRunTask> tasks) {
return Result.success(runApiService.consumeRunEngineTaskEvents(runId, tasks));
}
}
126 changes: 126 additions & 0 deletions api/src/main/java/com/epam/pipeline/dao/pipeline/EngineRunTaskDao.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright 2025 EPAM Systems, Inc. (https://www.epam.com/)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.epam.pipeline.dao.pipeline;

import com.epam.pipeline.dao.DaoUtils;
import com.epam.pipeline.dao.DryRunJdbcDaoSupport;
import com.epam.pipeline.entity.pipeline.run.EngineRunTask;
import com.epam.pipeline.entity.pipeline.run.EngineTaskStatus;
import com.epam.pipeline.entity.pipeline.run.EngineType;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;

@RequiredArgsConstructor
public class EngineRunTaskDao extends DryRunJdbcDaoSupport {

private String upsertEngineRunTaskQuery;
private String deleteEngineRunTaskByRunIdsQuery;
private String findEngineRunTaskByRunIdQuery;

@Transactional(propagation = Propagation.MANDATORY)
public List<EngineRunTask> batchUpsert(final List<EngineRunTask> tasks) {
getNamedParameterJdbcTemplate().batchUpdate(upsertEngineRunTaskQuery,
EngineRunTaskDao.Parameters.getBatchParameters(tasks));
return tasks;
}

@Transactional(propagation = Propagation.MANDATORY)
public void deleteByRunIdIn(final List<Long> runIds, final boolean dryRun) {
final MapSqlParameterSource params = DaoUtils.longListParams(runIds);
getNamedParameterJdbcTemplate(dryRun).update(deleteEngineRunTaskByRunIdsQuery, params);
}

public List<EngineRunTask> findByRunId(final Long runId) {
return getNamedParameterJdbcTemplate().query(findEngineRunTaskByRunIdQuery,
new MapSqlParameterSource().addValue(Parameters.RUN_ID.name(), runId),
Parameters.getRowMapper());
}

enum Parameters {
TASK_ID,
TASK_NAME,
TASK_KEY,
TASK_GROUP,
PARENT_ID,
ENGINE_TYPE,
STATUS,
DATA,
START_DATE,
END_DATE,
RUN_ID,
DURATION;

private static MapSqlParameterSource[] getBatchParameters(final List<EngineRunTask> tasks) {
return tasks.stream()
.map(Parameters::getParameters)
.toArray(MapSqlParameterSource[]::new);
}

private static MapSqlParameterSource getParameters(final EngineRunTask task) {
return new MapSqlParameterSource()
.addValue(TASK_ID.name(), task.getTaskId())
.addValue(TASK_NAME.name(), task.getTaskName())
.addValue(TASK_KEY.name(), task.getTaskKey())
.addValue(TASK_GROUP.name(), task.getTaskGroup())
.addValue(PARENT_ID.name(), task.getParentId())
.addValue(ENGINE_TYPE.name(), task.getEngineType().name())
.addValue(STATUS.name(), task.getStatus().name())
.addValue(DATA.name(), task.getAttributes())
.addValue(START_DATE.name(), task.getStartDateTime())
.addValue(END_DATE.name(), task.getEndDateTime())
.addValue(RUN_ID.name(), task.getRunId())
.addValue(DURATION.name(), task.getDuration());
}

private static RowMapper<EngineRunTask> getRowMapper() {
return (rs, rowNum) -> EngineRunTask.builder()
.runId(rs.getLong(RUN_ID.name()))
.taskId(rs.getString(TASK_ID.name()))
.taskName(rs.getString(TASK_NAME.name()))
.taskKey(rs.getString(TASK_KEY.name()))
.taskGroup(rs.getString(TASK_GROUP.name()))
.parentId(rs.getString(PARENT_ID.name()))
.engineType(EngineType.valueOf(rs.getString(ENGINE_TYPE.name())))
.status(EngineTaskStatus.valueOf(rs.getString(STATUS.name())))
.duration(rs.getLong(DURATION.name()))
.startDateTime(rs.getDate(START_DATE.name()))
.endDateTime(rs.getDate(END_DATE.name()))
.build();
}
}

@Required
public void setUpsertEngineRunTaskQuery(final String upsertEngineRunTaskQuery) {
this.upsertEngineRunTaskQuery = upsertEngineRunTaskQuery;
}

@Required
public void setDeleteEngineRunTaskByRunIdsQuery(final String deleteEngineRunTaskByRunIdsQuery) {
this.deleteEngineRunTaskByRunIdsQuery = deleteEngineRunTaskByRunIdsQuery;
}

@Required
public void setFindEngineRunTaskByRunIdQuery(final String findEngineRunTaskByRunIdQuery) {
this.findEngineRunTaskByRunIdQuery = findEngineRunTaskByRunIdQuery;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.epam.pipeline.manager.pipeline;

import com.epam.pipeline.dao.pipeline.ArchiveRunDao;
import com.epam.pipeline.dao.pipeline.EngineRunTaskDao;
import com.epam.pipeline.dao.pipeline.PipelineRunDao;
import com.epam.pipeline.dao.pipeline.RestartRunDao;
import com.epam.pipeline.dao.pipeline.RunLogDao;
Expand Down Expand Up @@ -50,6 +51,7 @@ public class ArchiveRunCoreService {
private final RunServiceUrlDao runServiceUrlDao;
private final RunStatusDao runStatusDao;
private final StopServerlessRunDao stopServerlessRunDao;
private final EngineRunTaskDao engineRunTaskDao;

@Transactional(propagation = Propagation.REQUIRED)
public void archiveRuns(final Map<String, Date> ownersAndDates, final List<Long> terminalStates,
Expand Down Expand Up @@ -123,7 +125,9 @@ private void deleteRunsAndDependents(final List<Long> runIds, final boolean dryR
runStatusDao.deleteRunStatusByRunIdsIn(runIds, dryRun);
log.debug("Run statuses deleted. Deleting stop serverless runs info...");
stopServerlessRunDao.deleteByRunIdIn(runIds, dryRun);
log.debug("Stop serverless runs info deleted. Deleting runs...");
log.debug("Stop serverless runs info deleted. Deleting engine run tasks...");
engineRunTaskDao.deleteByRunIdIn(runIds, dryRun);
log.debug("Engine run logs deleted. Deleting runs...");
pipelineRunDao.deleteRunByIdIn(runIds, dryRun);
log.debug("'{}' runs deleted.", runIds.size());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2025 EPAM Systems, Inc. (https://www.epam.com/)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.epam.pipeline.manager.pipeline;

import com.epam.pipeline.common.MessageConstants;
import com.epam.pipeline.common.MessageHelper;
import com.epam.pipeline.dao.pipeline.EngineRunTaskDao;
import com.epam.pipeline.entity.pipeline.run.EngineRunTask;
import lombok.RequiredArgsConstructor;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;

import java.util.List;
import java.util.stream.Collectors;

@Service
@RequiredArgsConstructor
public class EngineRunTaskService {

private final PipelineRunCRUDService runCRUDService;
private final EngineRunTaskDao engineRunTaskDao;
private final MessageHelper messageHelper;

@Transactional(propagation = Propagation.REQUIRED)
public int upsertTasks(final Long runId, final List<EngineRunTask> tasks) {
if (CollectionUtils.isEmpty(tasks)) {
return 0;
}
runCRUDService.loadRunById(runId);
return engineRunTaskDao.batchUpsert(tasks.stream()
.peek(task -> task.setRunId(runId))
.map(this::validate)
.collect(Collectors.toList()))
.size();
}

private EngineRunTask validate(final EngineRunTask task) {
Assert.notNull(task.getTaskId(), messageHelper.getMessage(
MessageConstants.ERROR_ENGINE_RUN_TASK_SETTING_NOT_FOUND, "taskId"));
Assert.notNull(task.getEngineType(), messageHelper.getMessage(
MessageConstants.ERROR_ENGINE_RUN_TASK_SETTING_NOT_FOUND, "engineType"));
Assert.notNull(task.getStatus(), messageHelper.getMessage(
MessageConstants.ERROR_ENGINE_RUN_TASK_SETTING_NOT_FOUND, "status"));
return task;
}
}
87 changes: 87 additions & 0 deletions api/src/main/resources/dao/engine-run-tasks.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2025 EPAM Systems, Inc. (https://www.epam.com/)
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean class="com.epam.pipeline.dao.pipeline.EngineRunTaskDao" id="engineRunTaskDao" autowire="byName">
<property name="upsertEngineRunTaskQuery">
<value>
<![CDATA[
INSERT INTO pipeline.engine_run_task (
task_id,
task_name,
task_key,
task_group,
parent_id,
engine_type,
status,
data,
start_date,
end_date,
run_id,
duration
) VALUES (
:TASK_ID,
:TASK_NAME,
:TASK_KEY,
:TASK_GROUP,
:PARENT_ID,
:ENGINE_TYPE,
:STATUS,
to_jsonb(:DATA::jsonb),
:START_DATE,
:END_DATE,
:RUN_ID,
:DURATION
) ON CONFLICT (run_id, task_id, engine_type) DO UPDATE SET
status = :STATUS,
data = to_jsonb(:DATA::jsonb),
end_date = :END_DATE,
duration = :DURATION
]]>
</value>
</property>
<property name="deleteEngineRunTaskByRunIdsQuery">
<value>
<![CDATA[
DELETE FROM pipeline.engine_run_task WHERE run_id IN (:list)
]]>
</value>
</property>
<property name="findEngineRunTaskByRunIdQuery">
<value>
<![CDATA[
SELECT
r.task_id,
r.task_name,
r.task_key,
r.task_group,
r.parent_id,
r.engine_type,
r.status,
r.start_date,
r.end_date,
r.run_id,
r.duration
FROM pipeline.engine_run_task r
WHERE r.run_id = :RUN_ID
]]>
</value>
</property>
</bean>
</beans>
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
CREATE TABLE IF NOT EXISTS pipeline.engine_run_task (
task_id TEXT NOT NULL,
task_name TEXT,
task_key TEXT,
task_group TEXT,
parent_id TEXT,
engine_type TEXT NOT NULL,
status TEXT NOT NULL,
data JSONB,
start_date TIMESTAMP WITH TIME ZONE,
end_date TIMESTAMP WITH TIME ZONE,
run_id BIGINT NOT NULL REFERENCES pipeline.pipeline_run(run_id),
duration BIGINT,
CONSTRAINT engine_run_task_constrain UNIQUE(run_id, task_id, engine_type)
);
1 change: 1 addition & 0 deletions api/src/main/resources/messages.properties
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ warn.instance.stopping=Instance is still in STOPPING status
error.archive.run.metadata.not.found=Failed to archive runs: metadata ''{0}'' not found for entity ''{1}''=''{2}''.
error.archive.run.metadata.not.numeric=Failed to archive runs: metadata value for key ''{0}'' is not numeric.
error.max.page.size.exceeded=Max page size {0} exceeded.
error.engine.run.task.setting.not.found=Attribute ''{0}'' shall be provided for engine run task.

#Run schedule
cron.expression.is.not.provided=Cron expression for {0} id ''{1}'' is not provided.
Expand Down
Loading

0 comments on commit c649f83

Please sign in to comment.