Skip to content

Commit

Permalink
fix re-dispatch
Browse files Browse the repository at this point in the history
  • Loading branch information
ponfee committed Feb 19, 2024
1 parent a4ab191 commit 6f78f55
Show file tree
Hide file tree
Showing 20 changed files with 221 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ public class JobConstants {
*/
public static final String SPRING_BEAN_NAME_CURRENT_WORKER = SPRING_BEAN_NAME_PREFIX + ".current-worker";

/**
* Rest template spring bean name
*/
public static final String SPRING_BEAN_NAME_REST_TEMPLATE = SPRING_BEAN_NAME_PREFIX + ".rest-template";

/**
* Authenticate header group
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import cn.ponfee.disjob.core.exception.JobException;
import cn.ponfee.disjob.core.handle.SplitTask;
import cn.ponfee.disjob.core.param.worker.ConfigureWorkerParam;
import cn.ponfee.disjob.core.param.worker.ExistsTaskParam;
import cn.ponfee.disjob.core.param.worker.GetMetricsParam;
import cn.ponfee.disjob.core.param.worker.JobHandlerParam;
import io.swagger.v3.oas.annotations.Hidden;
Expand All @@ -45,6 +46,9 @@ public interface WorkerRpcService {
@PostMapping("/job/split")
List<SplitTask> split(JobHandlerParam param) throws JobException;

@GetMapping("/task/exists")
boolean existsTask(ExistsTaskParam param);

@GetMapping("/metrics")
WorkerMetrics metrics(GetMetricsParam param);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2022-2024 Ponfee (http://www.ponfee.cn/)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.ponfee.disjob.core.param.worker;

import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

/**
* Exists task param
*
* @author Ponfee
*/
@Getter
@Setter
@NoArgsConstructor
public class ExistsTaskParam extends AuthenticationParam {
private static final long serialVersionUID = -2212057097314433737L;

private long taskId;

public ExistsTaskParam(String supervisorToken, long taskId) {
super.setSupervisorToken(supervisorToken);
this.taskId = taskId;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
package cn.ponfee.disjob.dispatch.http.configuration;

import cn.ponfee.disjob.common.base.TimingWheel;
import cn.ponfee.disjob.common.spring.RestTemplateUtils;
import cn.ponfee.disjob.core.base.HttpProperties;
import cn.ponfee.disjob.core.base.JobConstants;
import cn.ponfee.disjob.core.base.RetryProperties;
import cn.ponfee.disjob.core.base.Supervisor;
import cn.ponfee.disjob.core.base.Worker;
Expand All @@ -29,7 +28,7 @@
import cn.ponfee.disjob.dispatch.http.HttpTaskDispatcher;
import cn.ponfee.disjob.dispatch.http.HttpTaskReceiver;
import cn.ponfee.disjob.registry.SupervisorRegistry;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.context.ApplicationEventPublisher;
Expand Down Expand Up @@ -60,13 +59,10 @@ public TaskReceiver taskReceiver(Worker.Current currentWorker, TimingWheel<Execu
@ConditionalOnBean(Supervisor.Current.class)
@Bean
public TaskDispatcher taskDispatcher(ApplicationEventPublisher eventPublisher,
HttpProperties http,
RetryProperties retry,
SupervisorRegistry discoveryWorker,
@Nullable ObjectMapper objectMapper,
@Qualifier(JobConstants.SPRING_BEAN_NAME_REST_TEMPLATE) RestTemplate restTemplate,
@Nullable TaskReceiver taskReceiver) {
http.check();
RestTemplate restTemplate = RestTemplateUtils.create(http.getConnectTimeout(), http.getReadTimeout(), objectMapper);
return new HttpTaskDispatcher(eventPublisher, discoveryWorker, retry, restTemplate, taskReceiver);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@
import java.util.function.Function;

/**
* Destination server rest proxy
* Destination(Designated) server rest proxy
*
* @author Ponfee
*/
public final class DestinationServerRestProxy {

private static final ThreadLocal<Server> SERVER_THREAD_LOCAL = new NamedThreadLocal<>("server_rest_proxy");
private static final ThreadLocal<Server> SERVER_THREAD_LOCAL = new NamedThreadLocal<>("destination-server");

public static <T, S extends Server> DestinationServerInvoker<T, S> create(Class<T> interfaceType,
@Nullable T localServiceProvider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
public final class DiscoveryServerRestProxy {

private static final ConcurrentMap<Method, Request> METHOD_REQUEST_CACHE = new ConcurrentHashMap<>();
private static final ThreadLocal<String> GROUP_THREAD_LOCAL = new NamedThreadLocal<>("discovery_rest_proxy");
private static final ThreadLocal<String> GROUP_THREAD_LOCAL = new NamedThreadLocal<>("discovery-group");

/**
* Creates ungrouped rpc service client proxy.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import cn.ponfee.disjob.common.base.TimingWheel;
import cn.ponfee.disjob.common.collect.Collects;
import cn.ponfee.disjob.common.exception.Throwables.ThrowingRunnable;
import cn.ponfee.disjob.common.spring.RestTemplateUtils;
import cn.ponfee.disjob.common.spring.YamlProperties;
import cn.ponfee.disjob.common.util.ClassUtils;
import cn.ponfee.disjob.common.util.NetUtils;
Expand All @@ -45,6 +46,7 @@
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.util.Assert;
import org.springframework.web.client.RestTemplate;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
Expand Down Expand Up @@ -145,13 +147,17 @@ public boolean receive(ExecuteTaskParam task) {
// --------------------- create receiver(select redis or http) --------------------- //


HttpProperties http = props.extract(HttpProperties.class, HTTP_KEY_PREFIX + ".");
http.check();
RestTemplate restTemplate = RestTemplateUtils.create(http.getConnectTimeout(), http.getReadTimeout(), null);

WorkerStartup workerStartup = WorkerStartup.builder()
.currentWorker(currentWorker)
.workerProperties(workerProperties)
.retryProperties(props.extract(RetryProperties.class, RETRY_KEY_PREFIX + "."))
.httpProperties(props.extract(HttpProperties.class, HTTP_KEY_PREFIX + "."))
.taskReceiver(actualTaskReceiver)
.workerRegistry(workerRegistry)
.restTemplate(restTemplate)
.build();

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import cn.ponfee.disjob.common.util.Jsons;
import cn.ponfee.disjob.core.base.WorkerRpcService;
import cn.ponfee.disjob.core.param.worker.ConfigureWorkerParam;
import cn.ponfee.disjob.core.param.worker.ExistsTaskParam;
import cn.ponfee.disjob.core.param.worker.GetMetricsParam;
import cn.ponfee.disjob.core.param.worker.JobHandlerParam;
import cn.ponfee.disjob.dispatch.ExecuteTaskParam;
Expand Down Expand Up @@ -104,30 +105,34 @@ public void start() {

//String[] args = ctx.body().asPojo(String[].class);
router.post(PREFIX_PATH + "/job/verify").handler(ctx -> handle(() -> {
JobHandlerParam param = parseArg(ctx, JobHandlerParam.class);
JobHandlerParam param = parseBodyArg(ctx, JobHandlerParam.class);
workerRpcService.verify(param);
}, ctx, BAD_REQUEST));

router.post(PREFIX_PATH + "/job/split").handler(ctx -> handle(() -> {
JobHandlerParam param = parseArg(ctx, JobHandlerParam.class);
JobHandlerParam param = parseBodyArg(ctx, JobHandlerParam.class);
JobHandlerParser.parse(param, "jobHandler");
return workerRpcService.split(param);
}, ctx, INTERNAL_SERVER_ERROR));

router.get(PREFIX_PATH + "/task/exists").handler(ctx -> handle(() -> {
ExistsTaskParam param = parseParamArg(ctx, ExistsTaskParam.class);
return workerRpcService.existsTask(param);
}, ctx, INTERNAL_SERVER_ERROR));

router.get(PREFIX_PATH + "/metrics").handler(ctx -> handle(() -> {
String json = Collects.get(ctx.queryParam(LocalizedMethodArgumentUtils.getQueryParamName(0)), 0);
GetMetricsParam param = Jsons.fromJson(json, GetMetricsParam.class);
GetMetricsParam param = parseParamArg(ctx, GetMetricsParam.class);
return workerRpcService.metrics(param);
}, ctx, INTERNAL_SERVER_ERROR));

router.post(PREFIX_PATH + "/worker/configure").handler(ctx -> handle(() -> {
ConfigureWorkerParam param = parseArg(ctx, ConfigureWorkerParam.class);
ConfigureWorkerParam param = parseBodyArg(ctx, ConfigureWorkerParam.class);
workerRpcService.configureWorker(param);
}, ctx, INTERNAL_SERVER_ERROR));

if (httpTaskReceiver != null) {
router.post(PREFIX_PATH + "/task/receive").handler(ctx -> handle(() -> {
ExecuteTaskParam param = parseArg(ctx, ExecuteTaskParam.class);
ExecuteTaskParam param = parseBodyArg(ctx, ExecuteTaskParam.class);
JobHandlerParser.parse(param, "jobHandler");
return httpTaskReceiver.receive(param);
}, ctx, INTERNAL_SERVER_ERROR));
Expand Down Expand Up @@ -163,11 +168,17 @@ private static void handle(ThrowingSupplier<?, ?> action, RoutingContext ctx, Ht
}
}

private static <T> T parseArg(RoutingContext ctx, Class<T> type) {
private static <T> T parseBodyArg(RoutingContext ctx, Class<T> type) {
Object[] args = Jsons.parseArray(ctx.body().asString(), type);
return args == null ? null : (T) args[0];
}

private static <T> T parseParamArg(RoutingContext ctx, Class<T> type) {
String arg0ParamName = LocalizedMethodArgumentUtils.getQueryParamName(0);
String json = Collects.get(ctx.queryParam(arg0ParamName), 0);
return Jsons.fromJson(json, type);
}

private static String toJson(Object obj) {
if (obj == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import cn.ponfee.disjob.common.collect.Collects;
import cn.ponfee.disjob.common.concurrent.MultithreadExecutors;
import cn.ponfee.disjob.common.concurrent.ThreadPoolExecutors;
import cn.ponfee.disjob.common.spring.RestTemplateUtils;
import cn.ponfee.disjob.common.util.Numbers;
import cn.ponfee.disjob.core.base.*;
import cn.ponfee.disjob.core.exception.AuthenticationException;
Expand All @@ -39,20 +38,18 @@
import cn.ponfee.disjob.supervisor.application.request.ConfigureOneWorkerRequest;
import cn.ponfee.disjob.supervisor.application.response.SupervisorMetricsResponse;
import cn.ponfee.disjob.supervisor.application.response.WorkerMetricsResponse;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.lang.Nullable;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
Expand All @@ -66,30 +63,26 @@ public class ServerInvokeService extends SingletonClassConstraint {

private final SupervisorRegistry supervisorRegistry;
private final Supervisor.Current currentSupervisor;
private final DestinationServerInvoker<SupervisorRpcService, Supervisor> supervisorRpcServiceClient;
private final DestinationServerInvoker<WorkerRpcService, Worker> workerRpcServiceClient;
private final DestinationServerInvoker<SupervisorRpcService, Supervisor> supervisorRpcClient;
private final DestinationServerInvoker<WorkerRpcService, Worker> workerRpcClient;

public ServerInvokeService(@Value("${server.servlet.context-path:/}") String contextPath,
SupervisorRegistry supervisorRegistry,
public ServerInvokeService(SupervisorRegistry supervisorRegistry,
Supervisor.Current currentSupervisor,
SupervisorRpcService supervisorProvider,
HttpProperties http,
@Nullable WorkerRpcService workerProvider,
@Nullable ObjectMapper objectMapper) {
http.check();
Function<Supervisor, String> supervisorContextPath = supervisor -> contextPath;
Function<Worker, String> workerContextPath = worker -> Supervisor.current().getWorkerContextPath(worker.getGroup());
RestTemplate restTemplate = RestTemplateUtils.create(http.getConnectTimeout(), http.getReadTimeout(), objectMapper);
RetryProperties retry = RetryProperties.of(0, 0);

@Value("${server.servlet.context-path:/}") String contextPath,
SupervisorRpcService localSupervisorRpcProvider,
@Qualifier(JobConstants.SPRING_BEAN_NAME_REST_TEMPLATE) RestTemplate restTemplate,
DestinationServerInvoker<WorkerRpcService, Worker> workerRpcClient) {
this.supervisorRegistry = supervisorRegistry;
this.currentSupervisor = currentSupervisor;
this.supervisorRpcServiceClient = DestinationServerRestProxy.create(
SupervisorRpcService.class, supervisorProvider, currentSupervisor, supervisorContextPath, restTemplate, retry
);
this.workerRpcServiceClient = DestinationServerRestProxy.create(
WorkerRpcService.class, workerProvider, Worker.current(), workerContextPath, restTemplate, retry
this.supervisorRpcClient = DestinationServerRestProxy.create(
SupervisorRpcService.class,
localSupervisorRpcProvider,
currentSupervisor,
supervisor -> contextPath,
restTemplate,
RetryProperties.of(0, 0)
);
this.workerRpcClient = workerRpcClient;
}

// ------------------------------------------------------------public methods
Expand Down Expand Up @@ -166,7 +159,7 @@ private SupervisorMetricsResponse getSupervisorMetrics(Supervisor supervisor) {
Long pingTime = null;
try {
long start = System.currentTimeMillis();
metrics = supervisorRpcServiceClient.invoke(supervisor, SupervisorRpcService::metrics);
metrics = supervisorRpcClient.invoke(supervisor, SupervisorRpcService::metrics);
pingTime = System.currentTimeMillis() - start;
} catch (Throwable e) {
LOG.warn("Ping supervisor occur error: {} {}", supervisor, e.getMessage());
Expand All @@ -192,7 +185,7 @@ private WorkerMetricsResponse getWorkerMetrics(Worker worker) {
GetMetricsParam param = buildGetMetricsParam(group);
try {
long start = System.currentTimeMillis();
metrics = workerRpcServiceClient.invoke(worker, client -> client.metrics(param));
metrics = workerRpcClient.invoke(worker, client -> client.metrics(param));
pingTime = System.currentTimeMillis() - start;
} catch (Throwable e) {
LOG.warn("Ping worker occur error: {} {}", worker, e.getMessage());
Expand Down Expand Up @@ -222,7 +215,7 @@ private List<Worker> getDiscoveredWorkers(String group) {
private void verifyWorkerSignature(Worker worker) {
String group = worker.getGroup();
GetMetricsParam param = buildGetMetricsParam(group);
WorkerMetrics metrics = workerRpcServiceClient.invoke(worker, client -> client.metrics(param));
WorkerMetrics metrics = workerRpcClient.invoke(worker, client -> client.metrics(param));
if (!SchedGroupService.verifyWorkerSignatureToken(metrics.getSignature(), group)) {
throw new AuthenticationException("Worker authenticated failed: " + worker);
}
Expand All @@ -232,11 +225,11 @@ private void configureWorker(Worker worker, Action action, String data) {
ConfigureWorkerParam param = new ConfigureWorkerParam(SchedGroupService.createSupervisorAuthenticationToken(worker.getGroup()));
param.setAction(action);
param.setData(data);
workerRpcServiceClient.invokeWithoutResult(worker, client -> client.configureWorker(param));
workerRpcClient.invokeWithoutResult(worker, client -> client.configureWorker(param));
}

private void publishSupervisor(Supervisor supervisor, EventParam param) {
RetryTemplate.executeQuietly(() -> supervisorRpcServiceClient.invokeWithoutResult(supervisor, client -> client.publish(param)), 1, 2000);
RetryTemplate.executeQuietly(() -> supervisorRpcClient.invokeWithoutResult(supervisor, client -> client.publish(param)), 1, 2000);
}

private GetMetricsParam buildGetMetricsParam(String group) {
Expand Down
Loading

0 comments on commit 6f78f55

Please sign in to comment.