From 58b1b0cb785a57bf2fe8764931ac9f4d46036a2f Mon Sep 17 00:00:00 2001 From: Ponfee Date: Sun, 21 Jan 2024 19:12:13 +0800 Subject: [PATCH] optimize code --- .../config/thread/ThreadPoolConfig.java | 27 ++++++++------- .../templates/disjob/mygroup/worker.html | 14 +++----- .../ponfee/disjob/core/base/JobConstants.java | 10 +++--- .../cn/ponfee/disjob/core/base/Tokens.java | 20 +++++------ .../cn/ponfee/disjob/core/base/Worker.java | 7 ++-- .../param/worker/ConfigureWorkerParam.java | 4 +-- .../application/SchedGroupService.java | 33 +++++++++---------- .../auth/AuthenticationConfigurer.java | 5 +-- .../disjob/worker/base/WorkerThreadPool.java | 24 +++++++++++--- .../worker/provider/WorkerRpcProvider.java | 6 ++-- 10 files changed, 76 insertions(+), 74 deletions(-) diff --git a/disjob-admin/ruoyi-common/src/main/java/com/ruoyi/common/config/thread/ThreadPoolConfig.java b/disjob-admin/ruoyi-common/src/main/java/com/ruoyi/common/config/thread/ThreadPoolConfig.java index 11eaa8828..033c3a7b0 100644 --- a/disjob-admin/ruoyi-common/src/main/java/com/ruoyi/common/config/thread/ThreadPoolConfig.java +++ b/disjob-admin/ruoyi-common/src/main/java/com/ruoyi/common/config/thread/ThreadPoolConfig.java @@ -16,13 +16,12 @@ * @author ruoyi **/ @Configuration -public class ThreadPoolConfig -{ +public class ThreadPoolConfig { // 核心线程池大小 - private final int corePoolSize = 50; + private final int corePoolSize = 10; // 最大可创建的线程数 - private final int maxPoolSize = 200; + private final int maxPoolSize = 100; // 队列最大长度 private final int queueCapacity = 1000; @@ -31,13 +30,13 @@ public class ThreadPoolConfig private final int keepAliveSeconds = 300; @Bean(name = "threadPoolTaskExecutor") - public ThreadPoolTaskExecutor threadPoolTaskExecutor() - { + public ThreadPoolTaskExecutor threadPoolTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setMaxPoolSize(maxPoolSize); executor.setCorePoolSize(corePoolSize); executor.setQueueCapacity(queueCapacity); executor.setKeepAliveSeconds(keepAliveSeconds); + executor.setAllowCoreThreadTimeOut(true); // 线程池对拒绝任务(无线程可用)的处理策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return executor; @@ -47,18 +46,18 @@ public ThreadPoolTaskExecutor threadPoolTaskExecutor() * 执行周期性或定时任务 */ @Bean(name = "scheduledExecutorService") - protected ScheduledExecutorService scheduledExecutorService() - { - return new ScheduledThreadPoolExecutor(corePoolSize, - new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d").daemon(true).build(), - new ThreadPoolExecutor.CallerRunsPolicy()) - { + protected ScheduledExecutorService scheduledExecutorService() { + return new ScheduledThreadPoolExecutor( + corePoolSize, + new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d").daemon(true).build(), + new ThreadPoolExecutor.CallerRunsPolicy() + ) { @Override - protected void afterExecute(Runnable r, Throwable t) - { + protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); Threads.printException(r, t); } }; } + } diff --git a/disjob-admin/ruoyi-disjob/src/main/resources/templates/disjob/mygroup/worker.html b/disjob-admin/ruoyi-disjob/src/main/resources/templates/disjob/mygroup/worker.html index 3eaea9521..cd7cb3fe6 100644 --- a/disjob-admin/ruoyi-disjob/src/main/resources/templates/disjob/mygroup/worker.html +++ b/disjob-admin/ruoyi-disjob/src/main/resources/templates/disjob/mygroup/worker.html @@ -111,8 +111,8 @@ align: 'center', formatter: function (value, row, index) { const actions = []; - actions.push(" 摘除Worker "); - actions.push(" 清空任务 "); + actions.push(" 摘除Worker "); + // actions.push(" 摘除Worker并清空任务队列 "); return ' 操作'; } }] @@ -211,7 +211,7 @@ const input = layero.find('.layui-layer-input'); const value = input.val().trim(); if (!/^[A-Za-z0-9.]+:\d+$/g.test(value)) { - layer.tips("无效的host:port", input, {tips: 1}); + layer.tips("无效的`host:port`", input, {tips: 1}); return; } const params = { @@ -232,11 +232,7 @@ }); } - const actionMap = { - "REMOVE_WORKER": "摘除该Worker", - "CLEAR_TASK_QUEUE": "清空该Worker的任务队列" - }; - function operate(action, workerId, host, port) { + function operate(action, msg, workerId, host, port) { const params = { "group": [[${group}]], "workerId": workerId, @@ -245,7 +241,7 @@ "action": action, "data": null }; - $.modal.confirm("确认要 `" + actionMap[action] + "` 吗?", function () { + $.modal.confirm("确认要 `" + msg + "` 吗?", function () { $.operate.post(prefix + "/configure_one_worker", params, function (result) { if (result.code === 0) { location.reload(); diff --git a/disjob-core/src/main/java/cn/ponfee/disjob/core/base/JobConstants.java b/disjob-core/src/main/java/cn/ponfee/disjob/core/base/JobConstants.java index ab00c918a..5debabb06 100644 --- a/disjob-core/src/main/java/cn/ponfee/disjob/core/base/JobConstants.java +++ b/disjob-core/src/main/java/cn/ponfee/disjob/core/base/JobConstants.java @@ -99,6 +99,11 @@ public class JobConstants { */ public static final String AUTHENTICATE_HEADER_TOKEN = "X-Disjob-Token"; + /** + * Instance lock pool + */ + public static final Interner INSTANCE_LOCK_POOL = Interners.newWeakInterner(); + /** * Http url pattern */ @@ -109,11 +114,6 @@ public class JobConstants { */ public static final String HTTPS_URL_PATTERN = "https://%s:%d/"; - /** - * Instance lock pool - */ - public static final Interner INSTANCE_LOCK_POOL = Interners.newWeakInterner(); - /** * UTF-8 charset */ diff --git a/disjob-core/src/main/java/cn/ponfee/disjob/core/base/Tokens.java b/disjob-core/src/main/java/cn/ponfee/disjob/core/base/Tokens.java index 3004a3b43..a306ae59f 100644 --- a/disjob-core/src/main/java/cn/ponfee/disjob/core/base/Tokens.java +++ b/disjob-core/src/main/java/cn/ponfee/disjob/core/base/Tokens.java @@ -25,7 +25,7 @@ */ public class Tokens { - private static final long EXPIRE_MILLISECONDS = 60_000L; + private static final long EXPIRATION_MILLISECONDS = 60_000L; public enum Type { /** @@ -57,8 +57,8 @@ public static String create(String tokenPlain, Type type, Mode mode, String grou if (StringUtils.isEmpty(tokenPlain)) { return null; } - String expire = Long.toString(System.currentTimeMillis() + EXPIRE_MILLISECONDS); - return secret(tokenPlain, type, mode, expire, group) + DOT + expire; + String expiration = Long.toString(System.currentTimeMillis() + EXPIRATION_MILLISECONDS); + return secret(tokenPlain, type, mode, expiration, group) + DOT + expiration; } public static boolean verify(String tokenSecret, String tokenPlain, Type type, Mode mode, String group) { @@ -71,26 +71,26 @@ public static boolean verify(String tokenSecret, String tokenPlain, Type type, M String[] array = tokenSecret.split("\\.", 2); String actual = array[0]; - String expire = array[1]; - if (Long.parseLong(expire) < System.currentTimeMillis()) { + String expiration = array[1]; + if (Long.parseLong(expiration) < System.currentTimeMillis()) { return false; } - String expect = secret(tokenPlain, type, mode, expire, group); + String expect = secret(tokenPlain, type, mode, expiration, group); return actual.equals(expect); } // -----------------------------------------------------------------private methods - private static String secret(String tokenPlain, Type type, Mode mode, String expire, String group) { + private static String secret(String tokenPlain, Type type, Mode mode, String expiration, String group) { Assert.notNull(type, "Type cannot be null."); Assert.notNull(mode, "Mode cannot be null."); Assert.hasText(group, "Group cannot be empty."); - String payload = type + DOT + mode + DOT + expire + DOT + group; + String payload = type + DOT + mode + DOT + expiration + DOT + group; HmacUtils hmac = new HmacUtils(HmacAlgorithms.HMAC_SHA_1, tokenPlain.getBytes(UTF_8)); - byte[] data = hmac.hmac(payload.getBytes(UTF_8)); - return Base64.getUrlEncoder().withoutPadding().encodeToString(data); + byte[] digest = hmac.hmac(payload.getBytes(UTF_8)); + return Base64.getUrlEncoder().withoutPadding().encodeToString(digest); } } diff --git a/disjob-core/src/main/java/cn/ponfee/disjob/core/base/Worker.java b/disjob-core/src/main/java/cn/ponfee/disjob/core/base/Worker.java index 4a1b25bec..e4874c8dc 100644 --- a/disjob-core/src/main/java/cn/ponfee/disjob/core/base/Worker.java +++ b/disjob-core/src/main/java/cn/ponfee/disjob/core/base/Worker.java @@ -264,11 +264,8 @@ public Map authenticationHeaders() { return Collections.singletonMap(AUTHENTICATE_HEADER_GROUP, group); } - String tokenSecret = Tokens.create(workerToken, Type.WORKER, Mode.AUTHENTICATE, group); - return ImmutableMap.of( - AUTHENTICATE_HEADER_GROUP, group, - AUTHENTICATE_HEADER_TOKEN, Objects.requireNonNull(tokenSecret) - ); + String tokenSecret = Objects.requireNonNull(Tokens.create(workerToken, Type.WORKER, Mode.AUTHENTICATE, group)); + return ImmutableMap.of(AUTHENTICATE_HEADER_GROUP, group, AUTHENTICATE_HEADER_TOKEN, tokenSecret); } @Override diff --git a/disjob-core/src/main/java/cn/ponfee/disjob/core/param/worker/ConfigureWorkerParam.java b/disjob-core/src/main/java/cn/ponfee/disjob/core/param/worker/ConfigureWorkerParam.java index 72c5dab90..c5f47650f 100644 --- a/disjob-core/src/main/java/cn/ponfee/disjob/core/param/worker/ConfigureWorkerParam.java +++ b/disjob-core/src/main/java/cn/ponfee/disjob/core/param/worker/ConfigureWorkerParam.java @@ -49,9 +49,9 @@ public T parse(String data) { }, /** - * Clear worker thread pool task queue + * Remove(deregister) worker and clear worker thread pool task queue */ - CLEAR_TASK_QUEUE { + REMOVE_WORKER_AND_CLEAR_TASK_QUEUE { @Override public T parse(String data) { throw new UnsupportedOperationException(); diff --git a/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/application/SchedGroupService.java b/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/application/SchedGroupService.java index feb241cf9..2245d8d5c 100644 --- a/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/application/SchedGroupService.java +++ b/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/application/SchedGroupService.java @@ -145,41 +145,32 @@ public PageResponse queryForPage(SchedGroupPageRequest pageR // ------------------------------------------------------------other static methods - public static DisjobGroup getGroup(String group) { - DisjobGroup disjobGroup = groupMap.get(group); - if (disjobGroup == null) { - throw new GroupNotFoundException("Not found worker group: " + group); - } - return disjobGroup; - } - public static Set myGroups(String user) { Set groups = userMap.get(user); return groups == null ? Collections.emptySet() : groups; } - public static String createSupervisorAuthenticateToken(String group) { - String supervisorToken = SchedGroupService.getGroup(group).getSupervisorToken(); - return Tokens.create(supervisorToken, Type.SUPERVISOR, Mode.AUTHENTICATE, group); + public static boolean isDeveloper(String group, String user) { + return getDisjobGroup(group).isDeveloper(user); } - public static String createSupervisorSignatureToken(String group) { - String supervisorToken = SchedGroupService.getGroup(group).getSupervisorToken(); - return Tokens.create(supervisorToken, Type.SUPERVISOR, Mode.SIGNATURE, group); + public static String createSupervisorAuthenticateToken(String group) { + String supervisorToken = getDisjobGroup(group).getSupervisorToken(); + return Tokens.create(supervisorToken, Type.SUPERVISOR, Mode.AUTHENTICATE, group); } public static boolean verifyWorkerAuthenticateToken(String tokenSecret, String group) { - String workerToken = SchedGroupService.getGroup(group).getWorkerToken(); + String workerToken = getDisjobGroup(group).getWorkerToken(); return Tokens.verify(tokenSecret, workerToken, Type.WORKER, Mode.AUTHENTICATE, group); } public static boolean verifyUserAuthenticateToken(String tokenSecret, String group) { - String userToken = SchedGroupService.getGroup(group).getUserToken(); + String userToken = getDisjobGroup(group).getUserToken(); return Tokens.verify(tokenSecret, userToken, Type.USER, Mode.AUTHENTICATE, group); } public static boolean verifyWorkerSignatureToken(String tokenSecret, String group) { - String workerToken = SchedGroupService.getGroup(group).getWorkerToken(); + String workerToken = getDisjobGroup(group).getWorkerToken(); return Tokens.verify(tokenSecret, workerToken, Type.WORKER, Mode.SIGNATURE, group); } @@ -205,6 +196,14 @@ private void refresh() { } } + private static DisjobGroup getDisjobGroup(String group) { + DisjobGroup disjobGroup = groupMap.get(group); + if (disjobGroup == null) { + throw new GroupNotFoundException("Not found worker group: " + group); + } + return disjobGroup; + } + private static Map> toUserMap(List list) { Map userMap = list.stream() .flatMap(e -> { diff --git a/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/auth/AuthenticationConfigurer.java b/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/auth/AuthenticationConfigurer.java index c96964834..18e8d653d 100644 --- a/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/auth/AuthenticationConfigurer.java +++ b/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/auth/AuthenticationConfigurer.java @@ -11,7 +11,6 @@ import cn.ponfee.disjob.core.base.JobConstants; import cn.ponfee.disjob.core.exception.AuthenticationException; import cn.ponfee.disjob.supervisor.application.SchedGroupService; -import cn.ponfee.disjob.supervisor.application.value.DisjobGroup; import org.apache.commons.lang3.StringUtils; import org.springframework.core.Ordered; import org.springframework.web.context.request.RequestContextHolder; @@ -74,9 +73,7 @@ private static void authenticateWorker(String group) { } private static void authenticateUser(String group) { - String user = requestUser(); - DisjobGroup disjobGroup = SchedGroupService.getGroup(group); - if (!disjobGroup.isDeveloper(user)) { + if (!SchedGroupService.isDeveloper(group, requestUser())) { throw new AuthenticationException(ERR_MSG); } diff --git a/disjob-worker/src/main/java/cn/ponfee/disjob/worker/base/WorkerThreadPool.java b/disjob-worker/src/main/java/cn/ponfee/disjob/worker/base/WorkerThreadPool.java index c6fe6cfaf..3fe0bf949 100644 --- a/disjob-worker/src/main/java/cn/ponfee/disjob/worker/base/WorkerThreadPool.java +++ b/disjob-worker/src/main/java/cn/ponfee/disjob/worker/base/WorkerThreadPool.java @@ -14,6 +14,7 @@ import cn.ponfee.disjob.common.concurrent.Threads; import cn.ponfee.disjob.common.exception.Throwables; import cn.ponfee.disjob.common.exception.Throwables.ThrowingRunnable; +import cn.ponfee.disjob.common.util.Jsons; import cn.ponfee.disjob.core.base.JobConstants; import cn.ponfee.disjob.core.base.SupervisorRpcService; import cn.ponfee.disjob.core.base.WorkerMetrics; @@ -34,6 +35,7 @@ import cn.ponfee.disjob.core.param.supervisor.TerminateTaskParam; import cn.ponfee.disjob.core.param.supervisor.UpdateTaskWorkerParam; import cn.ponfee.disjob.dispatch.ExecuteTaskParam; +import com.google.common.collect.Lists; import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,16 +43,15 @@ import javax.annotation.PreDestroy; import java.io.Closeable; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import static cn.ponfee.disjob.core.base.JobConstants.PROCESS_BATCH_SIZE; import static cn.ponfee.disjob.core.enums.ExecuteState.*; /** @@ -317,7 +318,20 @@ synchronized void modifyMaximumPoolSize(int value) { } synchronized void clearTaskQueue() { - taskQueue.clear(); + List tasks = new LinkedList<>(); + taskQueue.drainTo(tasks); + + List list = tasks.stream() + // 广播任务分派的worker不可修改,需要排除 + .filter(e -> e.getRouteStrategy() != RouteStrategy.BROADCAST) + // 清除分派worker数据 + .map(e -> new UpdateTaskWorkerParam(e.getTaskId(), null)) + .collect(Collectors.toList()); + + for (List sub : Lists.partition(list, PROCESS_BATCH_SIZE)) { + // 更新task的worker信息 + ThrowingRunnable.doCaught(() -> supervisorRpcClient.updateTaskWorker(sub), () -> "Update task worker error: " + Jsons.toJson(list)); + } } // ----------------------------------------------------------------------private methods diff --git a/disjob-worker/src/main/java/cn/ponfee/disjob/worker/provider/WorkerRpcProvider.java b/disjob-worker/src/main/java/cn/ponfee/disjob/worker/provider/WorkerRpcProvider.java index 96c803188..02fe7a3b0 100644 --- a/disjob-worker/src/main/java/cn/ponfee/disjob/worker/provider/WorkerRpcProvider.java +++ b/disjob-worker/src/main/java/cn/ponfee/disjob/worker/provider/WorkerRpcProvider.java @@ -66,11 +66,11 @@ public void configureWorker(ConfigureWorkerParam param) { Integer maximumPoolSize = action.parse(param.getData()); WorkerConfigurator.modifyMaximumPoolSize(maximumPoolSize); - } else if (action == Action.CLEAR_TASK_QUEUE) { - WorkerConfigurator.clearTaskQueue(); - } else if (action == Action.REMOVE_WORKER) { workerRegistry.deregister(currentWork); + + } else if (action == Action.REMOVE_WORKER_AND_CLEAR_TASK_QUEUE) { + workerRegistry.deregister(currentWork); WorkerConfigurator.clearTaskQueue(); } else if (action == Action.ADD_WORKER) {