Skip to content

Commit

Permalink
optimize code
Browse files Browse the repository at this point in the history
  • Loading branch information
ponfee committed Jan 22, 2024
1 parent 39f2500 commit 58b1b0c
Show file tree
Hide file tree
Showing 10 changed files with 76 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@
* @author ruoyi
**/
@Configuration
public class ThreadPoolConfig
{
public class ThreadPoolConfig {
// 核心线程池大小
private final int corePoolSize = 50;
private final int corePoolSize = 10;

// 最大可创建的线程数
private final int maxPoolSize = 200;
private final int maxPoolSize = 100;

// 队列最大长度
private final int queueCapacity = 1000;
Expand All @@ -31,13 +30,13 @@ public class ThreadPoolConfig
private final int keepAliveSeconds = 300;

@Bean(name = "threadPoolTaskExecutor")
public ThreadPoolTaskExecutor threadPoolTaskExecutor()
{
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setMaxPoolSize(maxPoolSize);
executor.setCorePoolSize(corePoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setKeepAliveSeconds(keepAliveSeconds);
executor.setAllowCoreThreadTimeOut(true);
// 线程池对拒绝任务(无线程可用)的处理策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
Expand All @@ -47,18 +46,18 @@ public ThreadPoolTaskExecutor threadPoolTaskExecutor()
* 执行周期性或定时任务
*/
@Bean(name = "scheduledExecutorService")
protected ScheduledExecutorService scheduledExecutorService()
{
return new ScheduledThreadPoolExecutor(corePoolSize,
new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d").daemon(true).build(),
new ThreadPoolExecutor.CallerRunsPolicy())
{
protected ScheduledExecutorService scheduledExecutorService() {
return new ScheduledThreadPoolExecutor(
corePoolSize,
new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d").daemon(true).build(),
new ThreadPoolExecutor.CallerRunsPolicy()
) {
@Override
protected void afterExecute(Runnable r, Throwable t)
{
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
Threads.printException(r, t);
}
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@
align: 'center',
formatter: function (value, row, index) {
const actions = [];
actions.push("<a class='btn btn-danger btn-rounded btn-xs' href='javascript:void(0)' onclick='operate(&quot;REMOVE_WORKER&quot;, &quot;" + row.workerId + "&quot;, &quot;" + row.host + "&quot;, " + row.port + ")'><i class='fa fa-remove'></i> 摘除Worker</a> ");
actions.push("<a class='btn btn-warning btn-rounded btn-xs' href='javascript:void(0)' onclick='operate(&quot;CLEAR_TASK_QUEUE&quot;, &quot;" + row.workerId + "&quot;, &quot;" + row.host + "&quot;, " + row.port + ")'><i class='fa fa-trash' ></i> 清空任务</a> ");
actions.push("<a class='btn btn-danger btn-rounded btn-xs' href='javascript:void(0)' onclick='operate(&quot;REMOVE_WORKER&quot;, &quot;摘除该Worker&quot;, &quot;" + row.workerId + "&quot;, &quot;" + row.host + "&quot;, " + row.port + ")'><i class='fa fa-remove'></i> 摘除Worker</a> ");
// actions.push("<a class='btn btn-danger btn-rounded btn-xs' href='javascript:void(0)' onclick='operate(&quot;REMOVE_WORKER_AND_CLEAR_TASK_QUEUE&quot;, &quot;摘除该Worker并清空任务队列&quot;, &quot;" + row.workerId + "&quot;, &quot;" + row.host + "&quot;, " + row.port + ")'><i class='fa fa-remove' ></i> 摘除Worker并清空任务队列</a> ");
return '<a tabindex="0" class="btn btn-info btn-rounded btn-xs" role="button" data-container="body" data-placement="left" data-toggle="popover" data-html="true" data-trigger="hover" data-content="' + actions.join('') + '"><i class="fa fa-chevron-circle-right"></i> 操作</a>';
}
}]
Expand Down Expand Up @@ -211,7 +211,7 @@
const input = layero.find('.layui-layer-input');
const value = input.val().trim();
if (!/^[A-Za-z0-9.]+:\d+$/g.test(value)) {
layer.tips("无效的host:port", input, {tips: 1});
layer.tips("无效的`host:port`", input, {tips: 1});
return;
}
const params = {
Expand All @@ -232,11 +232,7 @@
});
}

const actionMap = {
"REMOVE_WORKER": "摘除该Worker",
"CLEAR_TASK_QUEUE": "清空该Worker的任务队列"
};
function operate(action, workerId, host, port) {
function operate(action, msg, workerId, host, port) {
const params = {
"group": [[${group}]],
"workerId": workerId,
Expand All @@ -245,7 +241,7 @@
"action": action,
"data": null
};
$.modal.confirm("确认要 `" + actionMap[action] + "` 吗?", function () {
$.modal.confirm("确认要 `" + msg + "` 吗?", function () {
$.operate.post(prefix + "/configure_one_worker", params, function (result) {
if (result.code === 0) {
location.reload();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ public class JobConstants {
*/
public static final String AUTHENTICATE_HEADER_TOKEN = "X-Disjob-Token";

/**
* Instance lock pool
*/
public static final Interner<Long> INSTANCE_LOCK_POOL = Interners.newWeakInterner();

/**
* Http url pattern
*/
Expand All @@ -109,11 +114,6 @@ public class JobConstants {
*/
public static final String HTTPS_URL_PATTERN = "https://%s:%d/";

/**
* Instance lock pool
*/
public static final Interner<Long> INSTANCE_LOCK_POOL = Interners.newWeakInterner();

/**
* UTF-8 charset
*/
Expand Down
20 changes: 10 additions & 10 deletions disjob-core/src/main/java/cn/ponfee/disjob/core/base/Tokens.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
*/
public class Tokens {

private static final long EXPIRE_MILLISECONDS = 60_000L;
private static final long EXPIRATION_MILLISECONDS = 60_000L;

public enum Type {
/**
Expand Down Expand Up @@ -57,8 +57,8 @@ public static String create(String tokenPlain, Type type, Mode mode, String grou
if (StringUtils.isEmpty(tokenPlain)) {
return null;
}
String expire = Long.toString(System.currentTimeMillis() + EXPIRE_MILLISECONDS);
return secret(tokenPlain, type, mode, expire, group) + DOT + expire;
String expiration = Long.toString(System.currentTimeMillis() + EXPIRATION_MILLISECONDS);
return secret(tokenPlain, type, mode, expiration, group) + DOT + expiration;
}

public static boolean verify(String tokenSecret, String tokenPlain, Type type, Mode mode, String group) {
Expand All @@ -71,26 +71,26 @@ public static boolean verify(String tokenSecret, String tokenPlain, Type type, M

String[] array = tokenSecret.split("\\.", 2);
String actual = array[0];
String expire = array[1];
if (Long.parseLong(expire) < System.currentTimeMillis()) {
String expiration = array[1];
if (Long.parseLong(expiration) < System.currentTimeMillis()) {
return false;
}

String expect = secret(tokenPlain, type, mode, expire, group);
String expect = secret(tokenPlain, type, mode, expiration, group);
return actual.equals(expect);
}

// -----------------------------------------------------------------private methods

private static String secret(String tokenPlain, Type type, Mode mode, String expire, String group) {
private static String secret(String tokenPlain, Type type, Mode mode, String expiration, String group) {
Assert.notNull(type, "Type cannot be null.");
Assert.notNull(mode, "Mode cannot be null.");
Assert.hasText(group, "Group cannot be empty.");
String payload = type + DOT + mode + DOT + expire + DOT + group;
String payload = type + DOT + mode + DOT + expiration + DOT + group;

HmacUtils hmac = new HmacUtils(HmacAlgorithms.HMAC_SHA_1, tokenPlain.getBytes(UTF_8));
byte[] data = hmac.hmac(payload.getBytes(UTF_8));
return Base64.getUrlEncoder().withoutPadding().encodeToString(data);
byte[] digest = hmac.hmac(payload.getBytes(UTF_8));
return Base64.getUrlEncoder().withoutPadding().encodeToString(digest);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -264,11 +264,8 @@ public Map<String, String> authenticationHeaders() {
return Collections.singletonMap(AUTHENTICATE_HEADER_GROUP, group);
}

String tokenSecret = Tokens.create(workerToken, Type.WORKER, Mode.AUTHENTICATE, group);
return ImmutableMap.of(
AUTHENTICATE_HEADER_GROUP, group,
AUTHENTICATE_HEADER_TOKEN, Objects.requireNonNull(tokenSecret)
);
String tokenSecret = Objects.requireNonNull(Tokens.create(workerToken, Type.WORKER, Mode.AUTHENTICATE, group));
return ImmutableMap.of(AUTHENTICATE_HEADER_GROUP, group, AUTHENTICATE_HEADER_TOKEN, tokenSecret);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ public <T> T parse(String data) {
},

/**
* Clear worker thread pool task queue
* Remove(deregister) worker and clear worker thread pool task queue
*/
CLEAR_TASK_QUEUE {
REMOVE_WORKER_AND_CLEAR_TASK_QUEUE {
@Override
public <T> T parse(String data) {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,41 +145,32 @@ public PageResponse<SchedGroupResponse> queryForPage(SchedGroupPageRequest pageR

// ------------------------------------------------------------other static methods

public static DisjobGroup getGroup(String group) {
DisjobGroup disjobGroup = groupMap.get(group);
if (disjobGroup == null) {
throw new GroupNotFoundException("Not found worker group: " + group);
}
return disjobGroup;
}

public static Set<String> myGroups(String user) {
Set<String> groups = userMap.get(user);
return groups == null ? Collections.emptySet() : groups;
}

public static String createSupervisorAuthenticateToken(String group) {
String supervisorToken = SchedGroupService.getGroup(group).getSupervisorToken();
return Tokens.create(supervisorToken, Type.SUPERVISOR, Mode.AUTHENTICATE, group);
public static boolean isDeveloper(String group, String user) {
return getDisjobGroup(group).isDeveloper(user);
}

public static String createSupervisorSignatureToken(String group) {
String supervisorToken = SchedGroupService.getGroup(group).getSupervisorToken();
return Tokens.create(supervisorToken, Type.SUPERVISOR, Mode.SIGNATURE, group);
public static String createSupervisorAuthenticateToken(String group) {
String supervisorToken = getDisjobGroup(group).getSupervisorToken();
return Tokens.create(supervisorToken, Type.SUPERVISOR, Mode.AUTHENTICATE, group);
}

public static boolean verifyWorkerAuthenticateToken(String tokenSecret, String group) {
String workerToken = SchedGroupService.getGroup(group).getWorkerToken();
String workerToken = getDisjobGroup(group).getWorkerToken();
return Tokens.verify(tokenSecret, workerToken, Type.WORKER, Mode.AUTHENTICATE, group);
}

public static boolean verifyUserAuthenticateToken(String tokenSecret, String group) {
String userToken = SchedGroupService.getGroup(group).getUserToken();
String userToken = getDisjobGroup(group).getUserToken();
return Tokens.verify(tokenSecret, userToken, Type.USER, Mode.AUTHENTICATE, group);
}

public static boolean verifyWorkerSignatureToken(String tokenSecret, String group) {
String workerToken = SchedGroupService.getGroup(group).getWorkerToken();
String workerToken = getDisjobGroup(group).getWorkerToken();
return Tokens.verify(tokenSecret, workerToken, Type.WORKER, Mode.SIGNATURE, group);
}

Expand All @@ -205,6 +196,14 @@ private void refresh() {
}
}

private static DisjobGroup getDisjobGroup(String group) {
DisjobGroup disjobGroup = groupMap.get(group);
if (disjobGroup == null) {
throw new GroupNotFoundException("Not found worker group: " + group);
}
return disjobGroup;
}

private static Map<String, Set<String>> toUserMap(List<SchedGroup> list) {
Map<String, ?> userMap = list.stream()
.flatMap(e -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import cn.ponfee.disjob.core.base.JobConstants;
import cn.ponfee.disjob.core.exception.AuthenticationException;
import cn.ponfee.disjob.supervisor.application.SchedGroupService;
import cn.ponfee.disjob.supervisor.application.value.DisjobGroup;
import org.apache.commons.lang3.StringUtils;
import org.springframework.core.Ordered;
import org.springframework.web.context.request.RequestContextHolder;
Expand Down Expand Up @@ -74,9 +73,7 @@ private static void authenticateWorker(String group) {
}

private static void authenticateUser(String group) {
String user = requestUser();
DisjobGroup disjobGroup = SchedGroupService.getGroup(group);
if (!disjobGroup.isDeveloper(user)) {
if (!SchedGroupService.isDeveloper(group, requestUser())) {
throw new AuthenticationException(ERR_MSG);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import cn.ponfee.disjob.common.concurrent.Threads;
import cn.ponfee.disjob.common.exception.Throwables;
import cn.ponfee.disjob.common.exception.Throwables.ThrowingRunnable;
import cn.ponfee.disjob.common.util.Jsons;
import cn.ponfee.disjob.core.base.JobConstants;
import cn.ponfee.disjob.core.base.SupervisorRpcService;
import cn.ponfee.disjob.core.base.WorkerMetrics;
Expand All @@ -34,23 +35,23 @@
import cn.ponfee.disjob.core.param.supervisor.TerminateTaskParam;
import cn.ponfee.disjob.core.param.supervisor.UpdateTaskWorkerParam;
import cn.ponfee.disjob.dispatch.ExecuteTaskParam;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;

import javax.annotation.PreDestroy;
import java.io.Closeable;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static cn.ponfee.disjob.core.base.JobConstants.PROCESS_BATCH_SIZE;
import static cn.ponfee.disjob.core.enums.ExecuteState.*;

/**
Expand Down Expand Up @@ -317,7 +318,20 @@ synchronized void modifyMaximumPoolSize(int value) {
}

synchronized void clearTaskQueue() {
taskQueue.clear();
List<ExecuteTaskParam> tasks = new LinkedList<>();
taskQueue.drainTo(tasks);

List<UpdateTaskWorkerParam> list = tasks.stream()
// 广播任务分派的worker不可修改,需要排除
.filter(e -> e.getRouteStrategy() != RouteStrategy.BROADCAST)
// 清除分派worker数据
.map(e -> new UpdateTaskWorkerParam(e.getTaskId(), null))
.collect(Collectors.toList());

for (List<UpdateTaskWorkerParam> sub : Lists.partition(list, PROCESS_BATCH_SIZE)) {
// 更新task的worker信息
ThrowingRunnable.doCaught(() -> supervisorRpcClient.updateTaskWorker(sub), () -> "Update task worker error: " + Jsons.toJson(list));
}
}

// ----------------------------------------------------------------------private methods
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ public void configureWorker(ConfigureWorkerParam param) {
Integer maximumPoolSize = action.parse(param.getData());
WorkerConfigurator.modifyMaximumPoolSize(maximumPoolSize);

} else if (action == Action.CLEAR_TASK_QUEUE) {
WorkerConfigurator.clearTaskQueue();

} else if (action == Action.REMOVE_WORKER) {
workerRegistry.deregister(currentWork);

} else if (action == Action.REMOVE_WORKER_AND_CLEAR_TASK_QUEUE) {
workerRegistry.deregister(currentWork);
WorkerConfigurator.clearTaskQueue();

} else if (action == Action.ADD_WORKER) {
Expand Down

0 comments on commit 58b1b0c

Please sign in to comment.