From 54638eb8f510e8652ba778729f1df38d12dfbee3 Mon Sep 17 00:00:00 2001 From: Ponfee Date: Mon, 19 Feb 2024 20:42:56 +0800 Subject: [PATCH] rename enum equals method to equalsValue --- .../controller/DisjobInstanceController.java | 2 +- .../disjob/common/base/IntValueEnum.java | 18 ++++++++++++- .../cn/ponfee/disjob/core/model/SchedJob.java | 4 +-- .../component/AbstractJobManager.java | 10 +++---- .../component/DistributedJobManager.java | 26 +++++++++---------- .../thread/RunningInstanceScanner.java | 2 +- .../thread/TriggeringJobScanner.java | 6 ++--- .../thread/WaitingInstanceScanner.java | 2 +- .../handler/PrimeAccumulateJobHandler.java | 2 +- 9 files changed, 44 insertions(+), 28 deletions(-) diff --git a/disjob-admin/ruoyi-disjob/src/main/java/cn/ponfee/disjob/admin/controller/DisjobInstanceController.java b/disjob-admin/ruoyi-disjob/src/main/java/cn/ponfee/disjob/admin/controller/DisjobInstanceController.java index 8d5f63721..18be613dd 100644 --- a/disjob-admin/ruoyi-disjob/src/main/java/cn/ponfee/disjob/admin/controller/DisjobInstanceController.java +++ b/disjob-admin/ruoyi-disjob/src/main/java/cn/ponfee/disjob/admin/controller/DisjobInstanceController.java @@ -196,7 +196,7 @@ public AjaxResult resume(@PathVariable("instanceId") Long instanceId) { openapiService.resumeInstance(instanceId); Threads.waitUntil(WAIT_SLEEP_ROUND, new long[]{500, 200}, () -> { SchedInstanceResponse instance = openapiService.getInstance(instanceId, false); - return !RunState.PAUSED.equals(instance.getRunState()); + return !RunState.PAUSED.equalsValue(instance.getRunState()); }); return success(); } diff --git a/disjob-common/src/main/java/cn/ponfee/disjob/common/base/IntValueEnum.java b/disjob-common/src/main/java/cn/ponfee/disjob/common/base/IntValueEnum.java index 152034f85..365d85dbc 100644 --- a/disjob-common/src/main/java/cn/ponfee/disjob/common/base/IntValueEnum.java +++ b/disjob-common/src/main/java/cn/ponfee/disjob/common/base/IntValueEnum.java @@ -43,10 +43,26 @@ public interface IntValueEnum & IntValueEnum> { */ String desc(); - default boolean equals(Integer value) { + /** + * Returns IntValueEnum instance is equals Integer value + * + * @param value the Integer value + * @return {@code true} if equals + */ + default boolean equalsValue(Integer value) { return value != null && value == value(); } + /** + * Returns IntValueEnum instance is equals int value + * + * @param value the int value + * @return {@code true} if equals + */ + default boolean equalsValue(int value) { + return value == value(); + } + static & IntValueEnum> T of(Class type, Integer value) { if (type == null) { throw new IllegalArgumentException("Enum int type cannot be null: " + type); diff --git a/disjob-core/src/main/java/cn/ponfee/disjob/core/model/SchedJob.java b/disjob-core/src/main/java/cn/ponfee/disjob/core/model/SchedJob.java index 8bfb34009..c07765190 100644 --- a/disjob-core/src/main/java/cn/ponfee/disjob/core/model/SchedJob.java +++ b/disjob-core/src/main/java/cn/ponfee/disjob/core/model/SchedJob.java @@ -272,7 +272,7 @@ public boolean retryable(RunState runState, int retriedCount) { if (!runState.isFailure()) { return false; } - return !RetryType.NONE.equals(retryType) && retriedCount < retryCount; + return !RetryType.NONE.equalsValue(retryType) && retriedCount < retryCount; } /** @@ -283,7 +283,7 @@ public boolean retryable(RunState runState, int retriedCount) { * @return retry trigger time milliseconds */ public long computeRetryTriggerTime(int failCount, Date current) { - Assert.isTrue(!RetryType.NONE.equals(retryType), () -> "Sched job '" + jobId + "' retry type is NONE."); + Assert.isTrue(!RetryType.NONE.equalsValue(retryType), () -> "Sched job '" + jobId + "' retry type is NONE."); Assert.isTrue(retryCount > 0, () -> "Sched job '" + jobId + "' retry count must greater than 0, but actual " + retryCount); Assert.isTrue(failCount <= retryCount, () -> "Sched job '" + jobId + "' retried " + failCount + " exceed " + retryCount + " limit."); // exponential backoff diff --git a/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/component/AbstractJobManager.java b/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/component/AbstractJobManager.java index 1269cecb5..bd11453cc 100644 --- a/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/component/AbstractJobManager.java +++ b/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/component/AbstractJobManager.java @@ -150,7 +150,7 @@ public void updateJob(SchedJob job) throws JobException { public void deleteJob(long jobId) { SchedJob job = jobMapper.get(jobId); Assert.notNull(job, "Job id not found: " + jobId); - if (JobState.ENABLE.equals(job.getJobState())) { + if (JobState.ENABLE.equalsValue(job.getJobState())) { throw new IllegalStateException("Please disable job before delete this job."); } assertOneAffectedRow(jobMapper.softDelete(jobId), "Delete sched job fail or conflict."); @@ -162,7 +162,7 @@ public void deleteJob(long jobId) { protected boolean updateFixedDelayNextTriggerTime(SchedJob job, Date baseTime) { TriggerType fixedDelay = TriggerType.FIXED_DELAY; - if (!fixedDelay.equals(job.getTriggerType())) { + if (!fixedDelay.equalsValue(job.getTriggerType())) { return false; } Date date = baseTime == null ? null : fixedDelay.computeNextTriggerTime(job.getTriggerValue(), baseTime); @@ -200,7 +200,7 @@ public boolean hasAliveExecuting(List tasks) { return false; } return tasks.stream() - .filter(e -> ExecuteState.EXECUTING.equals(e.getExecuteState())) + .filter(e -> ExecuteState.EXECUTING.equalsValue(e.getExecuteState())) .map(SchedTask::getWorker) .anyMatch(this::isAliveWorker); } @@ -227,7 +227,7 @@ public boolean hasNotDiscoveredWorkers() { } public boolean isNeedRedispatch(SchedTask task) { - if (!ExecuteState.WAITING.equals(task.getExecuteState())) { + if (!ExecuteState.WAITING.equalsValue(task.getExecuteState())) { return false; } if (StringUtils.isBlank(task.getWorker())) { @@ -256,7 +256,7 @@ public boolean dispatch(SchedJob job, SchedInstance instance, List ta String supervisorToken = SchedGroupService.createSupervisorAuthenticationToken(job.getGroup()); ExecuteTaskParam.Builder builder = ExecuteTaskParam.builder(instance, job, supervisorToken); List list; - if (RouteStrategy.BROADCAST.equals(job.getRouteStrategy())) { + if (RouteStrategy.BROADCAST.equalsValue(job.getRouteStrategy())) { list = new ArrayList<>(tasks.size()); for (SchedTask task : tasks) { Assert.hasText(task.getWorker(), () -> "Broadcast route strategy worker must pre assign: " + task.getTaskId()); diff --git a/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/component/DistributedJobManager.java b/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/component/DistributedJobManager.java index 57967f552..6c8e6d07a 100644 --- a/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/component/DistributedJobManager.java +++ b/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/component/DistributedJobManager.java @@ -220,7 +220,7 @@ public boolean startTask(StartTaskParam param) { Date now = new Date(); // start sched instance(also possibly started by other task) int row = 0; - if (RunState.WAITING.equals(instance.getRunState())) { + if (RunState.WAITING.equalsValue(instance.getRunState())) { row = instanceMapper.start(param.getInstanceId(), now); } @@ -373,7 +373,7 @@ public boolean purgeInstance(SchedInstance inst) { // task execute state must not 10 List tasks = taskMapper.findBaseByInstanceId(instanceId); - if (tasks.stream().anyMatch(e -> ExecuteState.WAITING.equals(e.getExecuteState()))) { + if (tasks.stream().anyMatch(e -> ExecuteState.WAITING.equalsValue(e.getExecuteState()))) { LOG.warn("Purge instance failed, has waiting task: {}", tasks); return false; } @@ -398,7 +398,7 @@ public boolean purgeInstance(SchedInstance inst) { tasks.stream() .filter(e -> EXECUTE_STATE_PAUSABLE.contains(e.getExecuteState())) .forEach(e -> { - String worker = ExecuteState.EXECUTING.equals(e.getExecuteState()) ? Strings.requireNonBlank(e.getWorker()) : null; + String worker = ExecuteState.EXECUTING.equalsValue(e.getExecuteState()) ? Strings.requireNonBlank(e.getWorker()) : null; taskMapper.terminate(e.getTaskId(), worker, ExecuteState.EXECUTE_TIMEOUT.value(), e.getExecuteState(), new Date(), null); }); @@ -478,7 +478,7 @@ public boolean resumeInstance(long instanceId) { Long wnstanceId = instanceMapper.getWnstanceId(instanceId); return doTransactionLockInSynchronized(instanceId, wnstanceId, instance -> { Assert.notNull(instance, () -> "Cancel failed, instance_id not found: " + instanceId); - if (!RunState.PAUSED.equals(instance.getRunState())) { + if (!RunState.PAUSED.equalsValue(instance.getRunState())) { return false; } @@ -489,7 +489,7 @@ public boolean resumeInstance(long instanceId) { assertOneAffectedRow(row, () -> "Resume workflow lead instance failed: " + instanceId); workflowMapper.resumeWaiting(instanceId); for (SchedInstance nodeInstance : instanceMapper.findWorkflowNode(instanceId)) { - if (RunState.PAUSED.equals(nodeInstance.getRunState())) { + if (RunState.PAUSED.equalsValue(nodeInstance.getRunState())) { resumeInstance(nodeInstance); updateWorkflowEdgeState(nodeInstance, RunState.RUNNING.value(), RUN_STATE_PAUSED); } @@ -538,7 +538,7 @@ private boolean doTransactionLockInSynchronized(long instanceId, Long wnstanceId private boolean shouldTerminateDispatchFailedTask(long taskId) { SchedTask task = taskMapper.get(taskId); - if (!ExecuteState.WAITING.equals(task.getExecuteState())) { + if (!ExecuteState.WAITING.equalsValue(task.getExecuteState())) { return false; } int currentDispatchFailedCount = task.getDispatchFailedCount(); @@ -678,7 +678,7 @@ private void updateWorkflowLeadState(SchedInstance instance) { RunState state = graph.anyMatch(e -> e.getValue().isFailure()) ? RunState.CANCELED : RunState.FINISHED; int row = instanceMapper.terminate(instance.getWnstanceId(), state.value(), RUN_STATE_TERMINABLE, new Date()); assertOneAffectedRow(row, () -> "Update workflow lead instance state failed: " + instance + ", " + state); - } else if (workflows.stream().noneMatch(e -> RunState.RUNNING.equals(e.getRunState()))) { + } else if (workflows.stream().noneMatch(e -> RunState.RUNNING.equalsValue(e.getRunState()))) { RunState state = RunState.PAUSED; int row = instanceMapper.updateState(instance.getWnstanceId(), state.value(), instance.getRunState()); assertOneAffectedRow(row, () -> "Update workflow lead instance state failed: " + instance + ", " + state); @@ -714,7 +714,7 @@ private void createWorkflowNode(SchedInstance leadInstance, WorkflowGraph graph, for (Map.Entry entry : map.entrySet()) { DAGNode target = entry.getKey().getTarget(); SchedWorkflow workflow = entry.getValue(); - if (target.isEnd() || !RunState.WAITING.equals(workflow.getRunState()) || !duplicates.add(target)) { + if (target.isEnd() || !RunState.WAITING.equalsValue(workflow.getRunState()) || !duplicates.add(target)) { // 当前节点为结束结点 或 当前节点不为等待状态,则跳过 continue; } @@ -787,7 +787,7 @@ private void updateFixedDelayNextTriggerTime(SchedInstance curr, LazyLoader ... -> A`问题,因为在添加Job时做了不能循环依赖的校验 long rnstanceId = curr.obtainRnstanceId(); SchedInstance root = (rnstanceId == curr.getInstanceId()) ? curr : instanceMapper.get(rnstanceId); - if (!root.getJobId().equals(curr.getJobId()) || !RunType.SCHEDULE.equals(root.getRunType())) { + if (!root.getJobId().equals(curr.getJobId()) || !RunType.SCHEDULE.equalsValue(root.getRunType())) { return; } @@ -900,7 +900,7 @@ private void retryJob(SchedInstance prev, LazyLoader lazyJob) { .stream() .filter(e -> ExecuteState.of(e.getExecuteState()).isFailure()) // broadcast task cannot support partial retry - .filter(e -> !RouteStrategy.BROADCAST.equals(schedJob.getRouteStrategy()) || super.isAliveWorker(e.getWorker())) + .filter(e -> !RouteStrategy.BROADCAST.equalsValue(schedJob.getRouteStrategy()) || super.isAliveWorker(e.getWorker())) .map(e -> SchedTask.create(e.getTaskParam(), generateId(), retryInstanceId, e.getTaskNo(), e.getTaskCount(), now, e.getWorker())) .collect(Collectors.toList()); } else { @@ -933,7 +933,7 @@ private void dependJob(SchedInstance parentInstance) { LOG.error("Child sched job not found: {}, {}", depend.getParentJobId(), depend.getChildJobId()); continue; } - if (JobState.DISABLE.equals(childJob.getJobState())) { + if (JobState.DISABLE.equalsValue(childJob.getJobState())) { continue; } @@ -971,7 +971,7 @@ private List loadExecutingTasks(SchedInstance instance, Operat // immediate trigger long triggerTime = 0L; for (SchedTask task : taskMapper.findBaseByInstanceId(instance.getInstanceId())) { - if (!ExecuteState.EXECUTING.equals(task.getExecuteState())) { + if (!ExecuteState.EXECUTING.equalsValue(task.getExecuteState())) { continue; } Worker worker = Worker.deserialize(task.getWorker()); @@ -998,7 +998,7 @@ private Tuple3> buildDispatchParam(long Assert.notNull(job, "Not found job: " + instance.getJobId()); List waitingTasks = taskMapper.findLargeByInstanceId(instanceId) .stream() - .filter(e -> ExecuteState.WAITING.equals(e.getExecuteState())) + .filter(e -> ExecuteState.WAITING.equalsValue(e.getExecuteState())) .collect(Collectors.toList()); if (waitingTasks.size() != expectTaskSize) { throw new IllegalStateException("Invalid dispatching tasks size: expect=" + expectTaskSize + ", actual=" + waitingTasks.size()); diff --git a/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/thread/RunningInstanceScanner.java b/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/thread/RunningInstanceScanner.java index d7df20b29..984708c43 100644 --- a/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/thread/RunningInstanceScanner.java +++ b/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/thread/RunningInstanceScanner.java @@ -92,7 +92,7 @@ private void processEach(SchedInstance instance, Date now) { } List tasks = jobQuerier.findBaseInstanceTasks(instance.getInstanceId()); - List waitingTasks = Collects.filter(tasks, e -> ExecuteState.WAITING.equals(e.getExecuteState())); + List waitingTasks = Collects.filter(tasks, e -> ExecuteState.WAITING.equalsValue(e.getExecuteState())); if (CollectionUtils.isNotEmpty(waitingTasks)) { // 1、has waiting state task diff --git a/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/thread/TriggeringJobScanner.java b/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/thread/TriggeringJobScanner.java index 18dde21bd..264c9a3ed 100644 --- a/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/thread/TriggeringJobScanner.java +++ b/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/thread/TriggeringJobScanner.java @@ -181,7 +181,7 @@ private void processJob(SchedJob job, Date now, long maxNextTriggerTime) { * @return accurate next trigger time milliseconds */ private Long reComputeNextTriggerTime(SchedJob job, Date now) { - if (TriggerType.FIXED_DELAY.equals(job.getTriggerType())) { + if (TriggerType.FIXED_DELAY.equalsValue(job.getTriggerType())) { // 固定延时类型不重新计算nextTriggerTime return job.obtainNextTriggerTime(); } @@ -201,7 +201,7 @@ private Long reComputeNextTriggerTime(SchedJob job, Date now) { * @return newly next trigger time milliseconds */ private static Long doComputeNextTriggerTime(SchedJob job, Date now) { - if (TriggerType.FIXED_DELAY.equals(job.getTriggerType())) { + if (TriggerType.FIXED_DELAY.equalsValue(job.getTriggerType())) { // 固定延时类型的nextTriggerTime:先更新为long最大值,当任务实例运行完成时去主动计算并更新 // null值已被用作表示没有下次触发时间 return Long.MAX_VALUE; @@ -259,7 +259,7 @@ private boolean checkBlockCollidedTrigger(SchedJob job, Date now) { } private boolean checkBlockCollidedTrigger(SchedJob job, List instances, CollidedStrategy collidedStrategy, Date now) { - if (TriggerType.FIXED_DELAY.equals(job.getTriggerType())) { + if (TriggerType.FIXED_DELAY.equalsValue(job.getTriggerType())) { SchedInstance first = instances.get(0); log.error("Fixed delay trigger type cannot happen run collided: {}, {}", first.obtainRnstanceId(), job.getNextTriggerTime()); } diff --git a/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/thread/WaitingInstanceScanner.java b/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/thread/WaitingInstanceScanner.java index 133085fad..67e993173 100644 --- a/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/thread/WaitingInstanceScanner.java +++ b/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/thread/WaitingInstanceScanner.java @@ -92,7 +92,7 @@ private void processEach(SchedInstance instance) { } List tasks = jobQuerier.findBaseInstanceTasks(instance.getInstanceId()); - List waitingTasks = Collects.filter(tasks, e -> ExecuteState.WAITING.equals(e.getExecuteState())); + List waitingTasks = Collects.filter(tasks, e -> ExecuteState.WAITING.equalsValue(e.getExecuteState())); if (CollectionUtils.isNotEmpty(waitingTasks)) { // 1、has waiting state task diff --git a/disjob-test/src/main/java/cn/ponfee/disjob/test/handler/PrimeAccumulateJobHandler.java b/disjob-test/src/main/java/cn/ponfee/disjob/test/handler/PrimeAccumulateJobHandler.java index 15e4ac362..343295523 100644 --- a/disjob-test/src/main/java/cn/ponfee/disjob/test/handler/PrimeAccumulateJobHandler.java +++ b/disjob-test/src/main/java/cn/ponfee/disjob/test/handler/PrimeAccumulateJobHandler.java @@ -45,7 +45,7 @@ public List split(String jobParamString) { public ExecuteResult execute(ExecutingTask executingTask, Savepoint savepoint) throws Exception { long sum = executingTask.getWorkflowPredecessorNodes() .stream() - .peek(e -> Assert.state(RunState.FINISHED.equals(e.getRunState()), "Previous instance unfinished: " + e.getInstanceId())) + .peek(e -> Assert.state(RunState.FINISHED == e.getRunState(), "Previous instance unfinished: " + e.getInstanceId())) .flatMap(e -> e.getExecutedTasks().stream()) .map(AbstractExecutionTask::getExecuteSnapshot) .map(e -> Jsons.fromJson(e, PrimeCountJobHandler.ExecuteSnapshot.class))