diff --git a/disjob-common/src/main/java/cn/ponfee/disjob/common/base/IntValueEnum.java b/disjob-common/src/main/java/cn/ponfee/disjob/common/base/IntValueEnum.java index 365d85dbc..bc1434422 100644 --- a/disjob-common/src/main/java/cn/ponfee/disjob/common/base/IntValueEnum.java +++ b/disjob-common/src/main/java/cn/ponfee/disjob/common/base/IntValueEnum.java @@ -44,7 +44,7 @@ public interface IntValueEnum & IntValueEnum> { String desc(); /** - * Returns IntValueEnum instance is equals Integer value + * Returns this IntValueEnum instance value is equals Integer value * * @param value the Integer value * @return {@code true} if equals @@ -54,7 +54,7 @@ default boolean equalsValue(Integer value) { } /** - * Returns IntValueEnum instance is equals int value + * Returns this IntValueEnum instance value is equals int value * * @param value the int value * @return {@code true} if equals @@ -63,19 +63,16 @@ default boolean equalsValue(int value) { return value == value(); } - static & IntValueEnum> T of(Class type, Integer value) { + static & IntValueEnum> T of(Class type, int value) { if (type == null) { - throw new IllegalArgumentException("Enum int type cannot be null: " + type); - } - if (value == null) { - throw new IllegalArgumentException("Enum int value cannot be null."); + throw new IllegalArgumentException("Enum class cannot be null."); } for (T e : type.getEnumConstants()) { if (e.value() == value) { return e; } } - throw new IllegalArgumentException("Invalid enum int value: " + value); + throw new IllegalArgumentException("Invalid enum " + type + " int value: " + value); } static List values(Class> clazz) { diff --git a/disjob-common/src/main/java/cn/ponfee/disjob/common/concurrent/Threads.java b/disjob-common/src/main/java/cn/ponfee/disjob/common/concurrent/Threads.java index bda64aca4..4e058c973 100644 --- a/disjob-common/src/main/java/cn/ponfee/disjob/common/concurrent/Threads.java +++ b/disjob-common/src/main/java/cn/ponfee/disjob/common/concurrent/Threads.java @@ -31,20 +31,6 @@ public final class Threads { private static final Logger LOG = LoggerFactory.getLogger(Threads.class); - /** - * New thread - * - * @param run the runnable - * @return thread instance - */ - public static Thread newThread(Runnable run) { - Thread thread = new Thread(run); - String callerClassName = Thread.currentThread().getStackTrace()[2].getClassName(); - thread.setName(callerClassName.substring(callerClassName.lastIndexOf(".") + 1)); - thread.setUncaughtExceptionHandler(new LoggedUncaughtExceptionHandler(LOG)); - return thread; - } - /** * New thread * @@ -52,14 +38,17 @@ public static Thread newThread(Runnable run) { * @param daemon the daemon * @param priority the priority * @param run the runnable + * @param logger the uncaught exception handler logger * @return thread instance */ - public static Thread newThread(String name, boolean daemon, int priority, Runnable run) { + public static Thread newThread(String name, boolean daemon, int priority, Runnable run, Logger logger) { Thread thread = new Thread(run); thread.setName(name); thread.setDaemon(daemon); thread.setPriority(priority); - thread.setUncaughtExceptionHandler(new LoggedUncaughtExceptionHandler(LOG)); + if (logger != null) { + thread.setUncaughtExceptionHandler(new LoggedUncaughtExceptionHandler(logger)); + } return thread; } @@ -146,8 +135,6 @@ private static void stopThread(Thread thread, long joinMillis, boolean fromAsync return; } - thread.interrupt(); - if (Thread.currentThread() == thread) { if (fromAsync) { LOG.warn("Call stop on self thread: {}\n{}", thread.getName(), getStackTrace()); @@ -158,6 +145,9 @@ private static void stopThread(Thread thread, long joinMillis, boolean fromAsync return; } + // do interrupt + thread.interrupt(); + // wait joined if (joinMillis > 0) { try { @@ -181,7 +171,6 @@ private static void stopThread(Thread thread) { return; } - thread.interrupt(); try { // 调用后,thread中正在执行的run方法内部会抛出java.lang.ThreadDeath异常 // 如果在run方法内用 try{...} catch(Throwable e){} 捕获住,则线程不会停止执行 diff --git a/disjob-common/src/main/java/cn/ponfee/disjob/common/util/ClassUtils.java b/disjob-common/src/main/java/cn/ponfee/disjob/common/util/ClassUtils.java index 73331e966..91049bf04 100644 --- a/disjob-common/src/main/java/cn/ponfee/disjob/common/util/ClassUtils.java +++ b/disjob-common/src/main/java/cn/ponfee/disjob/common/util/ClassUtils.java @@ -124,7 +124,7 @@ public static Set fieldDiff(Class a, Class b) { */ public static List listFields(Class clazz) { if (clazz.isInterface() || clazz == Object.class) { - return null; // error class args + throw new IllegalArgumentException("Class cannot be interface or Object.class: " + clazz); } List list = new ArrayList<>(); diff --git a/disjob-common/src/test/java/cn/ponfee/disjob/common/util/CollectsTest.java b/disjob-common/src/test/java/cn/ponfee/disjob/common/util/CollectsTest.java index cae9cca5a..492a5d9ff 100644 --- a/disjob-common/src/test/java/cn/ponfee/disjob/common/util/CollectsTest.java +++ b/disjob-common/src/test/java/cn/ponfee/disjob/common/util/CollectsTest.java @@ -17,14 +17,20 @@ package cn.ponfee.disjob.common.util; import cn.ponfee.disjob.common.collect.Collects; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.commons.lang3.reflect.MethodUtils; import org.junit.jupiter.api.Test; import org.mockito.internal.util.collections.Sets; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.net.URL; import java.util.Arrays; import java.util.Collections; import java.util.List; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** * Collects test @@ -53,4 +59,44 @@ public void testNewArray() { assertThat(array2).hasSize(9); } + private static String test = "xxx"; + private static final String STR = "123"; + + @Test + public void testReflect() throws IllegalAccessException { + // static field + Field f = FieldUtils.getField(CollectsTest.class, "test", true); + assertThat("xxx").isEqualTo(test); + assertThat("xxx").isEqualTo(FieldUtils.readField(f, (Object) null)); + FieldUtils.writeField(f, (Object) null, "yyy", true); + assertThat("yyy").isEqualTo(test); + assertThat("yyy").isEqualTo(FieldUtils.readField(f, (Object) null)); + + // static final field + Field f1 = FieldUtils.getField(CollectsTest.class, "STR", true); + Field f2 = FieldUtils.getField(CollectsTest.class, "STR", true); + assertThat(f1).isSameAs(f1); + assertThat(f1 == f2).isFalse(); + assertThat(f1).isNotSameAs(f2); // f1 != f2 + assertThat(f1).isEqualTo(f2); + + assertThat("123").isEqualTo(STR); + assertThat("123").isEqualTo(FieldUtils.readField(f1, (Object) null)); + + assertThatThrownBy(() -> FieldUtils.writeField(f1, (Object) null, "abc", true)) + .isInstanceOf(IllegalAccessException.class) + .hasMessage("Can not set static final java.lang.String field cn.ponfee.disjob.common.util.CollectsTest.STR to java.lang.String"); + + Fields.put(CollectsTest.class, f1, "abc"); + assertThat("123").isEqualTo(STR); // 编译时直接替换为`123` + assertThat("abc").isEqualTo(FieldUtils.readField(f1, (Object) null)); + + Method m1 = MethodUtils.getMatchingMethod(ClassUtils.class, "decodeURL", URL.class); + Method m2 = MethodUtils.getMatchingMethod(ClassUtils.class, "decodeURL", URL.class); + assertThat(m1).isSameAs(m1); + assertThat(m1 == m2).isFalse(); + assertThat(m1).isNotSameAs(m2); + assertThat(m1).isEqualTo(m2); + } + } diff --git a/disjob-dispatch/disjob-dispatch-api/pom.xml b/disjob-dispatch/disjob-dispatch-api/pom.xml index eac8b5d16..0e995493a 100644 --- a/disjob-dispatch/disjob-dispatch-api/pom.xml +++ b/disjob-dispatch/disjob-dispatch-api/pom.xml @@ -22,12 +22,6 @@ disjob-registry-api ${project.parent.version} - - - org.springframework.boot - spring-boot-starter-data-redis - true - diff --git a/disjob-dispatch/disjob-dispatch-api/src/main/java/cn/ponfee/disjob/dispatch/TaskDispatcher.java b/disjob-dispatch/disjob-dispatch-api/src/main/java/cn/ponfee/disjob/dispatch/TaskDispatcher.java index caa59bdfb..4d60a1be1 100644 --- a/disjob-dispatch/disjob-dispatch-api/src/main/java/cn/ponfee/disjob/dispatch/TaskDispatcher.java +++ b/disjob-dispatch/disjob-dispatch-api/src/main/java/cn/ponfee/disjob/dispatch/TaskDispatcher.java @@ -103,11 +103,11 @@ public final boolean dispatch(List tasks) { /** * Assign a worker and dispatch to the assigned worker. * - * @param tasks the list of execution task param * @param group the group + * @param tasks the list of execution task param * @return {@code true} if the first dispatch successful */ - public final boolean dispatch(List tasks, String group) { + public final boolean dispatch(String group, List tasks) { if (CollectionUtils.isEmpty(tasks)) { return false; } diff --git a/disjob-dispatch/disjob-dispatch-api/src/main/java/cn/ponfee/disjob/dispatch/route/count/RedisAtomicCounter.java b/disjob-dispatch/disjob-dispatch-api/src/main/java/cn/ponfee/disjob/dispatch/route/count/RedisAtomicCounter.java index a788912c3..2d67ba920 100644 --- a/disjob-dispatch/disjob-dispatch-api/src/main/java/cn/ponfee/disjob/dispatch/route/count/RedisAtomicCounter.java +++ b/disjob-dispatch/disjob-dispatch-api/src/main/java/cn/ponfee/disjob/dispatch/route/count/RedisAtomicCounter.java @@ -1,74 +1,80 @@ -/* - * Copyright 2022-2024 Ponfee (http://www.ponfee.cn/) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.ponfee.disjob.dispatch.route.count; - -import cn.ponfee.disjob.common.spring.RedisKeyRenewal; -import cn.ponfee.disjob.core.base.JobConstants; -import org.apache.commons.lang3.StringUtils; -import org.springframework.data.redis.core.StringRedisTemplate; - -import java.util.function.Function; - -/** - * Atomic counter based redis INCRBY command. - * - * @author Ponfee - */ -public class RedisAtomicCounter extends AtomicCounter { - - private final String counterRedisKey; - private final StringRedisTemplate stringRedisTemplate; - private final RedisKeyRenewal redisKeyRenewal; - - /** - * Function: group -> new RedisAtomicCounter(group, stringRedisTemplate) - * - * @param group the group - * @param stringRedisTemplate the StringRedisTemplate - * @see cn.ponfee.disjob.dispatch.route.RoundRobinExecutionRouter#RoundRobinExecutionRouter(Function) - */ - public RedisAtomicCounter(String group, - StringRedisTemplate stringRedisTemplate) { - this.counterRedisKey = JobConstants.DISJOB_KEY_PREFIX + ":route:counter:" + group; - this.stringRedisTemplate = stringRedisTemplate; - this.redisKeyRenewal = new RedisKeyRenewal(stringRedisTemplate, counterRedisKey); - } - - @Override - public long get() { - String ret = stringRedisTemplate.opsForValue().get(counterRedisKey); - if (StringUtils.isBlank(ret)) { - return 0; - } - redisKeyRenewal.renewIfNecessary(); - return Long.parseLong(ret); - } - - @Override - public void set(long newValue) { - stringRedisTemplate.opsForValue().set(counterRedisKey, Long.toString(newValue)); - redisKeyRenewal.renewIfNecessary(); - } - - @Override - public long addAndGet(long delta) { - Long value = stringRedisTemplate.opsForValue().increment(counterRedisKey, delta); - redisKeyRenewal.renewIfNecessary(); - return value == null ? 0 : value; - } - -} +///* +// * Copyright 2022-2024 Ponfee (http://www.ponfee.cn/) +// * +// * Licensed under the Apache License, Version 2.0 (the "License"); +// * you may not use this file except in compliance with the License. +// * You may obtain a copy of the License at +// * +// * https://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// */ +// +//package cn.ponfee.disjob.dispatch.route.count; +// +//import cn.ponfee.disjob.common.spring.RedisKeyRenewal; +//import cn.ponfee.disjob.core.base.JobConstants; +//import org.apache.commons.lang3.StringUtils; +//import org.springframework.data.redis.core.StringRedisTemplate; +// +//import java.util.function.Function; +// +///** +// * Atomic counter based redis INCRBY command. +// * +// * +// * org.springframework.boot +// * spring-boot-starter-data-redis +// * true +// * +// * +// * @author Ponfee +// */ +//public class RedisAtomicCounter extends AtomicCounter { +// +// private final String counterRedisKey; +// private final StringRedisTemplate stringRedisTemplate; +// private final RedisKeyRenewal redisKeyRenewal; +// +// /** +// * Function: group -> new RedisAtomicCounter(group, stringRedisTemplate) +// * +// * @param group the group +// * @param stringRedisTemplate the StringRedisTemplate +// * @see cn.ponfee.disjob.dispatch.route.RoundRobinExecutionRouter#RoundRobinExecutionRouter(Function) +// */ +// public RedisAtomicCounter(String group, +// StringRedisTemplate stringRedisTemplate) { +// this.counterRedisKey = JobConstants.DISJOB_KEY_PREFIX + ":route:counter:" + group; +// this.stringRedisTemplate = stringRedisTemplate; +// this.redisKeyRenewal = new RedisKeyRenewal(stringRedisTemplate, counterRedisKey); +// } +// +// @Override +// public long get() { +// String ret = stringRedisTemplate.opsForValue().get(counterRedisKey); +// if (StringUtils.isBlank(ret)) { +// return 0; +// } +// redisKeyRenewal.renewIfNecessary(); +// return Long.parseLong(ret); +// } +// +// @Override +// public void set(long newValue) { +// stringRedisTemplate.opsForValue().set(counterRedisKey, Long.toString(newValue)); +// redisKeyRenewal.renewIfNecessary(); +// } +// +// @Override +// public long addAndGet(long delta) { +// Long value = stringRedisTemplate.opsForValue().increment(counterRedisKey, delta); +// redisKeyRenewal.renewIfNecessary(); +// return value == null ? 0 : value; +// } +// +//} diff --git a/disjob-id/src/main/java/cn/ponfee/disjob/id/snowflake/zk/ZkDistributedSnowflake.java b/disjob-id/src/main/java/cn/ponfee/disjob/id/snowflake/zk/ZkDistributedSnowflake.java index fd34a2dec..58918a818 100644 --- a/disjob-id/src/main/java/cn/ponfee/disjob/id/snowflake/zk/ZkDistributedSnowflake.java +++ b/disjob-id/src/main/java/cn/ponfee/disjob/id/snowflake/zk/ZkDistributedSnowflake.java @@ -361,7 +361,7 @@ private static CuratorFramework createCuratorFramework(ZkConfig config) throws I CuratorFramework curatorFramework = builder.build(); curatorFramework.start(); - boolean isStarted = curatorFramework.getState().equals(CuratorFrameworkState.STARTED); + boolean isStarted = curatorFramework.getState() == CuratorFrameworkState.STARTED; Assert.state(isStarted, () -> "Snowflake curator framework not started: " + curatorFramework.getState()); boolean isConnected = curatorFramework.blockUntilConnected(5000, TimeUnit.MILLISECONDS); Assert.state(isConnected, () -> "Snowflake curator framework not connected: " + curatorFramework.getState()); diff --git a/disjob-registry/disjob-registry-zookeeper/src/main/java/cn/ponfee/disjob/registry/zookeeper/CuratorFrameworkClient.java b/disjob-registry/disjob-registry-zookeeper/src/main/java/cn/ponfee/disjob/registry/zookeeper/CuratorFrameworkClient.java index ad1ac2f6e..6d2653196 100644 --- a/disjob-registry/disjob-registry-zookeeper/src/main/java/cn/ponfee/disjob/registry/zookeeper/CuratorFrameworkClient.java +++ b/disjob-registry/disjob-registry-zookeeper/src/main/java/cn/ponfee/disjob/registry/zookeeper/CuratorFrameworkClient.java @@ -65,7 +65,7 @@ public CuratorFrameworkClient(ZookeeperRegistryProperties config, ReconnectCallb curatorFramework.getConnectionStateListenable().addListener(new CuratorConnectionStateListener()); curatorFramework.start(); - boolean isStarted = curatorFramework.getState().equals(CuratorFrameworkState.STARTED); + boolean isStarted = curatorFramework.getState() == CuratorFrameworkState.STARTED; Assert.state(isStarted, () -> "Curator framework not started: " + curatorFramework.getState()); boolean isConnected = curatorFramework.blockUntilConnected(config.getMaxWaitTimeMs(), TimeUnit.MILLISECONDS); Assert.state(isConnected, () -> "Curator framework not connected: " + curatorFramework.getState()); diff --git a/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/application/EventSubscribeService.java b/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/application/EventSubscribeService.java index 832d5119e..a0bb4f71e 100644 --- a/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/application/EventSubscribeService.java +++ b/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/application/EventSubscribeService.java @@ -54,8 +54,8 @@ public EventSubscribeService(SchedGroupService schedGroupService) { public static void subscribe(EventParam param) { if (param != null && param.getType() != null) { - // putIfAbsent不会更新param - MAP.compute(param.getType(), (k, v) -> param); + // add or update value + MAP.put(param.getType(), param); } } diff --git a/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/component/AbstractJobManager.java b/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/component/AbstractJobManager.java index bd11453cc..986956c8a 100644 --- a/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/component/AbstractJobManager.java +++ b/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/component/AbstractJobManager.java @@ -45,6 +45,8 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.Assert; @@ -64,10 +66,11 @@ */ @RequiredArgsConstructor public abstract class AbstractJobManager { - private static final int MAX_SPLIT_TASK_SIZE = 1000; private static final int MAX_DEPENDS_LEVEL = 20; + protected final Logger log = LoggerFactory.getLogger(getClass()); + protected final SchedJobMapper jobMapper; protected final SchedDependMapper dependMapper; @@ -226,27 +229,30 @@ public boolean hasNotDiscoveredWorkers() { return !workerDiscover.hasDiscoveredServers(); } - public boolean isNeedRedispatch(SchedTask task) { + public boolean shouldRedispatch(SchedTask task) { if (!ExecuteState.WAITING.equalsValue(task.getExecuteState())) { return false; } if (StringUtils.isBlank(task.getWorker())) { + // not dispatched to a worker successfully return true; } Worker worker = Worker.deserialize(task.getWorker()); if (isDeadWorker(worker)) { + // dispatched worker are dead return true; } String supervisorToken = SchedGroupService.createSupervisorAuthenticationToken(worker.getGroup()); ExistsTaskParam param = new ExistsTaskParam(supervisorToken, task.getTaskId()); try { - // `WorkerRpcService#existsTask`判断任务是否在线程池中 + // `WorkerRpcService#existsTask`:判断任务是否在线程池中,如果不在则可能是没有分发成功,需要重新分发。 // 因扫描(WaitingInstanceScanner/RunningInstanceScanner)时间是很滞后的, - // 所以若任务已分发成功,不考虑该任务还在时间轮中的可能性,认定任务已在线程池(WorkerThreadPool)中了 + // 所以若任务已分发成功,不考虑该任务还在时间轮中的可能性,认定任务已在线程池(WorkerThreadPool)中了。 return !destinationWorkerRpcClient.invoke(worker, client -> client.existsTask(param)); - } catch (Throwable ignored) { + } catch (Throwable e) { + log.error("Invoke worker exists task error: " + worker, e); // 若调用异常(如请求超时),本次不做处理,等下一次扫描时再判断是否要重新分发任务 return false; } @@ -259,7 +265,6 @@ public boolean dispatch(SchedJob job, SchedInstance instance, List ta if (RouteStrategy.BROADCAST.equalsValue(job.getRouteStrategy())) { list = new ArrayList<>(tasks.size()); for (SchedTask task : tasks) { - Assert.hasText(task.getWorker(), () -> "Broadcast route strategy worker must pre assign: " + task.getTaskId()); Worker worker = Worker.deserialize(task.getWorker()); if (isDeadWorker(worker)) { cancelWaitingTask(task.getTaskId()); @@ -273,19 +278,11 @@ public boolean dispatch(SchedJob job, SchedInstance instance, List ta .collect(Collectors.toList()); } - return taskDispatcher.dispatch(list, job.getGroup()); + return taskDispatcher.dispatch(job.getGroup(), list); } public boolean dispatch(List params) { - List list = new ArrayList<>(params.size()); - for (ExecuteTaskParam param : params) { - if (RouteStrategy.BROADCAST == param.getRouteStrategy() && isDeadWorker(param.getWorker())) { - cancelWaitingTask(param.getTaskId()); - } else { - list.add(param); - } - } - return taskDispatcher.dispatch(list); + return taskDispatcher.dispatch(params); } /** diff --git a/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/component/DistributedJobManager.java b/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/component/DistributedJobManager.java index 6c8e6d07a..fbcd06c05 100644 --- a/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/component/DistributedJobManager.java +++ b/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/component/DistributedJobManager.java @@ -53,8 +53,6 @@ import cn.ponfee.disjob.supervisor.instance.WorkflowInstanceCreator; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.exception.ExceptionUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; @@ -79,7 +77,6 @@ */ @Component public class DistributedJobManager extends AbstractJobManager { - private static final Logger LOG = LoggerFactory.getLogger(DistributedJobManager.class); private static final List RUN_STATE_TERMINABLE = Collects.convert(RunState.Const.TERMINABLE_LIST, RunState::value); private static final List RUN_STATE_RUNNABLE = Collects.convert(RunState.Const.RUNNABLE_LIST, RunState::value); @@ -132,7 +129,7 @@ public void processTaskDispatchFailedEvent(TaskDispatchFailedEvent event) { int toState = ExecuteState.DISPATCH_FAILED.value(); int fromState = ExecuteState.WAITING.value(); if (isNotAffectedRow(taskMapper.terminate(event.getTaskId(), null, toState, fromState, null, null))) { - LOG.warn("Terminate dispatch failed task unsuccessful: {}", event.getTaskId()); + log.warn("Terminate dispatch failed task unsuccessful: {}", event.getTaskId()); } } @@ -216,7 +213,7 @@ public boolean startTask(StartTaskParam param) { return false; } - LOG.info("Task trace [{}] starting: {}", param.getTaskId(), param.getWorker()); + log.info("Task trace [{}] starting: {}", param.getTaskId(), param.getWorker()); Date now = new Date(); // start sched instance(also possibly started by other task) int row = 0; @@ -251,7 +248,7 @@ public void changeInstanceState(long instanceId, ExecuteState toExecuteState) { TransactionUtils.doAfterTransactionCommit(() -> super.dispatch(param.a, param.b, param.c)); } - LOG.info("Force change state success {}, {}", instanceId, toExecuteState); + log.info("Force change state success {}, {}", instanceId, toExecuteState); }); } @@ -288,7 +285,7 @@ public void deleteInstance(long instanceId) { row = taskMapper.deleteByInstanceId(instanceId); assertHasAffectedRow(row, () -> "Delete sched task conflict: " + instanceId); } - LOG.info("Delete sched instance success {}", instanceId); + log.info("Delete sched instance success {}", instanceId); }); } @@ -305,7 +302,7 @@ public boolean terminateTask(TerminateTaskParam param) { ExecuteState toState = param.getToState(); long instanceId = param.getInstanceId(); Assert.isTrue(!ExecuteState.Const.PAUSABLE_LIST.contains(toState), () -> "Stop executing invalid to state " + toState); - LOG.info("Task trace [{}] terminating: {}, {}", param.getTaskId(), param.getOperation(), param.getWorker()); + log.info("Task trace [{}] terminating: {}, {}, {}", param.getTaskId(), param.getOperation(), param.getToState(), param.getWorker()); return doTransactionLockInSynchronized(instanceId, param.getWnstanceId(), instance -> { Assert.notNull(instance, () -> "Terminate executing task failed, instance not found: " + instanceId); Assert.isTrue(!instance.isWorkflowLead(), () -> "Cannot direct terminate workflow lead instance: " + instance); @@ -318,7 +315,7 @@ public boolean terminateTask(TerminateTaskParam param) { int row = taskMapper.terminate(param.getTaskId(), param.getWorker(), toState.value(), ExecuteState.EXECUTING.value(), executeEndTime, param.getErrorMsg()); if (isNotAffectedRow(row)) { // usual is worker invoke http timeout, then retry - LOG.warn("Conflict terminate executing task: {}, {}", param.getTaskId(), toState); + log.warn("Conflict terminate executing task: {}, {}", param.getTaskId(), toState); return false; } @@ -361,7 +358,7 @@ public boolean terminateTask(TerminateTaskParam param) { * @return {@code true} if purged successfully */ public boolean purgeInstance(SchedInstance inst) { - LOG.info("Purge instance: {}", inst.getInstanceId()); + log.info("Purge instance: {}", inst.getInstanceId()); Long instanceId = inst.getInstanceId(); return doTransactionLockInSynchronized(instanceId, inst.getWnstanceId(), instance -> { Assert.notNull(instance, () -> "Purge instance not found: " + instanceId); @@ -374,13 +371,13 @@ public boolean purgeInstance(SchedInstance inst) { // task execute state must not 10 List tasks = taskMapper.findBaseByInstanceId(instanceId); if (tasks.stream().anyMatch(e -> ExecuteState.WAITING.equalsValue(e.getExecuteState()))) { - LOG.warn("Purge instance failed, has waiting task: {}", tasks); + log.warn("Purge instance failed, has waiting task: {}", tasks); return false; } // if task execute state is 20, cannot is alive if (hasAliveExecuting(tasks)) { - LOG.warn("Purge instance failed, has alive executing task: {}", tasks); + log.warn("Purge instance failed, has alive executing task: {}", tasks); return false; } @@ -405,7 +402,7 @@ public boolean purgeInstance(SchedInstance inst) { instance.markTerminated(tuple.a, tuple.b); afterTerminateTask(instance); - LOG.warn("Purge instance {} to state {}", instanceId, tuple.a); + log.warn("Purge instance {} to state {}", instanceId, tuple.a); return true; }); } @@ -417,7 +414,7 @@ public boolean purgeInstance(SchedInstance inst) { * @return {@code true} if paused successfully */ public boolean pauseInstance(long instanceId) { - LOG.info("Pause instance: {}", instanceId); + log.info("Pause instance: {}", instanceId); Long wnstanceId = instanceMapper.getWnstanceId(instanceId); if (wnstanceId != null) { Assert.isTrue(instanceId == wnstanceId, () -> "Must pause lead workflow instance: " + instanceId); @@ -440,7 +437,7 @@ public boolean pauseInstance(long instanceId) { * @return {@code true} if canceled successfully */ public boolean cancelInstance(long instanceId, Operation ops) { - LOG.info("Cancel instance: {}, {}", instanceId, ops); + log.info("Cancel instance: {}, {}", instanceId, ops); Assert.isTrue(ops.toState().isFailure(), () -> "Cancel instance operation invalid: " + ops); Long wnstanceId = instanceMapper.getWnstanceId(instanceId); if (wnstanceId != null) { @@ -836,7 +833,7 @@ private void processWorkflow(SchedInstance nodeInstance) { graph, graph.successors(DAGNode.fromString(nodeInstance.parseAttach().getCurNode())), throwable -> { - LOG.error("Split workflow job task error: " + nodeInstance, throwable); + log.error("Split workflow job task error: " + nodeInstance, throwable); onCreateWorkflowNodeFailed(nodeInstance.getWnstanceId()); return false; } @@ -856,7 +853,7 @@ private void onCreateWorkflowNodeFailed(Long wnstanceId) { private void retryJob(SchedInstance prev, LazyLoader lazyJob) { SchedJob schedJob = lazyJob.orElseGet(() -> { - LOG.error("Sched job not found {}", prev.getJobId()); + log.error("Sched job not found {}", prev.getJobId()); return null; }); int retriedCount = prev.obtainRetriedCount(); @@ -891,7 +888,7 @@ private void retryJob(SchedInstance prev, LazyLoader lazyJob) { // re-split tasks tasks = splitTasks(JobHandlerParam.from(schedJob), retryInstance.getInstanceId(), now); } catch (Throwable t) { - LOG.error("Split retry job error: " + schedJob + ", " + prev, t); + log.error("Split retry job error: " + schedJob + ", " + prev, t); processWorkflow(prev); return; } @@ -930,7 +927,7 @@ private void dependJob(SchedInstance parentInstance) { for (SchedDepend depend : schedDepends) { SchedJob childJob = jobMapper.get(depend.getChildJobId()); if (childJob == null) { - LOG.error("Child sched job not found: {}, {}", depend.getParentJobId(), depend.getChildJobId()); + log.error("Child sched job not found: {}, {}", depend.getParentJobId(), depend.getChildJobId()); continue; } if (JobState.DISABLE.equalsValue(childJob.getJobState())) { @@ -954,7 +951,7 @@ private void dependJob(SchedInstance parentInstance) { createInstance(tInstance); return () -> creator.dispatch(childJob, tInstance); }, - t -> LOG.error("Depend job instance created fail: " + parentInstance + ", " + childJob, t) + t -> log.error("Depend job instance created fail: " + parentInstance + ", " + childJob, t) ); if (dispatchAction != null) { @@ -980,12 +977,11 @@ private List loadExecutingTasks(SchedInstance instance, Operat } else { // update dead task Date executeEndTime = ops.toState().isTerminal() ? new Date() : null; - int row = taskMapper.terminate(task.getTaskId(), task.getWorker(), ops.toState().value(), ExecuteState.EXECUTING.value(), executeEndTime, null); + int toState = ExecuteState.EXECUTE_TIMEOUT.value(); + int fromState = ExecuteState.EXECUTING.value(); + int row = taskMapper.terminate(task.getTaskId(), task.getWorker(), toState, fromState, executeEndTime, null); if (isNotAffectedRow(row)) { - LOG.error("Cancel the dead task failed: {}", task); - executingTasks.add(builder.build(ops, task.getTaskId(), triggerTime, worker)); - } else { - LOG.info("Cancel the dead task success: {}", task); + log.error("Terminate loaded executing dead worker task failed: {}", task); } } } diff --git a/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/configuration/EnableSupervisor.java b/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/configuration/EnableSupervisor.java index 5e395d15e..417bf6e79 100644 --- a/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/configuration/EnableSupervisor.java +++ b/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/configuration/EnableSupervisor.java @@ -128,23 +128,23 @@ public RestTemplate restTemplate(HttpProperties http, @Nullable ObjectMapper obj public GroupedServerInvoker groupedWorkerRpcClient(RetryProperties retry, @Qualifier(JobConstants.SPRING_BEAN_NAME_REST_TEMPLATE) RestTemplate restTemplate, SupervisorRegistry discoveryWorker, - @Nullable WorkerRpcService localServiceProvider, + @Nullable WorkerRpcService workerRpcProvider, @Nullable Worker.Current currentWorker) { retry.check(); Predicate serverGroupMatcher = group -> Worker.matchesGroup(currentWorker, group); return DiscoveryServerRestProxy.create( - WorkerRpcService.class, localServiceProvider, serverGroupMatcher, discoveryWorker, restTemplate, retry + WorkerRpcService.class, workerRpcProvider, serverGroupMatcher, discoveryWorker, restTemplate, retry ); } @DependsOn(JobConstants.SPRING_BEAN_NAME_CURRENT_SUPERVISOR) @Bean public DestinationServerInvoker destinationWorkerRpcClient(@Qualifier(JobConstants.SPRING_BEAN_NAME_REST_TEMPLATE) RestTemplate restTemplate, - @Nullable WorkerRpcService localServiceProvider) { + @Nullable WorkerRpcService workerRpcProvider) { RetryProperties retry = RetryProperties.of(0, 0); Function workerContextPath = worker -> Supervisor.current().getWorkerContextPath(worker.getGroup()); return DestinationServerRestProxy.create( - WorkerRpcService.class, localServiceProvider, Worker.current(), workerContextPath, restTemplate, retry + WorkerRpcService.class, workerRpcProvider, Worker.current(), workerContextPath, restTemplate, retry ); } diff --git a/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/thread/RunningInstanceScanner.java b/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/thread/RunningInstanceScanner.java index 984708c43..ab4c70080 100644 --- a/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/thread/RunningInstanceScanner.java +++ b/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/thread/RunningInstanceScanner.java @@ -98,7 +98,7 @@ private void processEach(SchedInstance instance, Date now) { // 1、has waiting state task // sieve the (un-dispatch) or (assigned worker dead) waiting tasks to do re-dispatch - List redispatchingTasks = Collects.filter(waitingTasks, jobManager::isNeedRedispatch); + List redispatchingTasks = Collects.filter(waitingTasks, jobManager::shouldRedispatch); if (CollectionUtils.isEmpty(redispatchingTasks)) { return; } diff --git a/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/thread/WaitingInstanceScanner.java b/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/thread/WaitingInstanceScanner.java index 67e993173..3fa294a6f 100644 --- a/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/thread/WaitingInstanceScanner.java +++ b/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/thread/WaitingInstanceScanner.java @@ -98,7 +98,7 @@ private void processEach(SchedInstance instance) { // 1、has waiting state task // sieve the (un-dispatch) or (assigned worker dead) waiting tasks to do re-dispatch - List redispatchingTasks = Collects.filter(waitingTasks, jobManager::isNeedRedispatch); + List redispatchingTasks = Collects.filter(waitingTasks, jobManager::shouldRedispatch); if (CollectionUtils.isEmpty(redispatchingTasks)) { return; } diff --git a/disjob-supervisor/src/test/java/cn/ponfee/disjob/supervisor/model/RouteTest.java b/disjob-supervisor/src/test/java/cn/ponfee/disjob/supervisor/model/RouteTest.java index c937ed55b..3e2fa1bd9 100644 --- a/disjob-supervisor/src/test/java/cn/ponfee/disjob/supervisor/model/RouteTest.java +++ b/disjob-supervisor/src/test/java/cn/ponfee/disjob/supervisor/model/RouteTest.java @@ -20,7 +20,6 @@ import cn.ponfee.disjob.common.util.UuidUtils; import cn.ponfee.disjob.dispatch.route.count.AtomicCounter; import cn.ponfee.disjob.dispatch.route.count.JdkAtomicCounter; -import cn.ponfee.disjob.dispatch.route.count.RedisAtomicCounter; import cn.ponfee.disjob.supervisor.SpringBootTestBase; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -55,6 +54,7 @@ public void testRedisAtomicCounter() { Assertions.assertEquals(1, bean.opsForValue().increment(key2)); Assertions.assertEquals(2, bean.opsForValue().increment(key2)); + /* RedisAtomicCounter counter = new RedisAtomicCounter("disjob:counter3:" + UuidUtils.uuid32(), bean); long initValue = counter.get(); Assertions.assertEquals(initValue, counter.getAndIncrement()); @@ -63,5 +63,6 @@ public void testRedisAtomicCounter() { Assertions.assertEquals(6 + initValue, counter.get()); counter.set(100); Assertions.assertEquals(100, counter.get()); + */ } } diff --git a/disjob-worker/src/main/java/cn/ponfee/disjob/worker/base/WorkerThreadPool.java b/disjob-worker/src/main/java/cn/ponfee/disjob/worker/base/WorkerThreadPool.java index 1a7817fac..9c47d0489 100644 --- a/disjob-worker/src/main/java/cn/ponfee/disjob/worker/base/WorkerThreadPool.java +++ b/disjob-worker/src/main/java/cn/ponfee/disjob/worker/base/WorkerThreadPool.java @@ -854,7 +854,7 @@ private void runTask(ExecuteTaskParam task) { if (task.getExecuteTimeout() > 0) { FutureTask futureTask = new FutureTask<>(() -> taskExecutor.execute(executingTask, savepoint)); String threadName = getClass().getSimpleName() + "#FutureTaskThread" + "-" + FUTURE_TASK_NAMED_SEQ.getAndIncrement(); - Thread futureTaskThread = Threads.newThread(threadName, true, Thread.NORM_PRIORITY, futureTask); + Thread futureTaskThread = Threads.newThread(threadName, true, Thread.NORM_PRIORITY, futureTask, LOG); futureTaskThread.start(); try { result = futureTask.get(task.getExecuteTimeout(), TimeUnit.MILLISECONDS); diff --git a/pom.xml b/pom.xml index 106898432..59fb4223f 100644 --- a/pom.xml +++ b/pom.xml @@ -85,7 +85,7 @@ 2.5.3 1.4 0.18.1 - 1.19.4 + 1.19.5 3.2.0