Skip to content

Commit

Permalink
supported custom supervisor datasource
Browse files Browse the repository at this point in the history
  • Loading branch information
ponfee committed Mar 4, 2024
1 parent 0a906ca commit c73afab
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,6 @@ public static ApplicationContext applicationContext() {
* @return spring bean
*/
public static Object getBean(String beanName) {
if (applicationContext == null) {
return null;
}
return applicationContext.getBean(beanName);
}

Expand All @@ -73,9 +70,6 @@ public static Object getBean(String beanName) {
* @return spring bean
*/
public static <T> T getBean(Class<T> beanType) {
if (applicationContext == null) {
return null;
}
return applicationContext.getBean(beanType);
}

Expand All @@ -87,19 +81,12 @@ public static <T> T getBean(Class<T> beanType) {
* @return spring bean
*/
public static <T> T getBean(String beanName, Class<T> beanType) {
if (applicationContext == null) {
return null;
}
return applicationContext.getBean(beanName, beanType);
}

// -----------------------------------------------------------------------getPrototypeBean

public static <T> T getPrototypeBean(String beanName) throws IllegalStateException {
if (applicationContext == null) {
return null;
}

T bean;
try {
bean = (T) applicationContext.getBean(beanName);
Expand All @@ -122,10 +109,6 @@ public static <T> T getPrototypeBean(String beanName) throws IllegalStateExcepti
* @throws IllegalStateException if not prototype bean
*/
public static <T> T getPrototypeBean(Class<T> beanType) throws IllegalStateException {
if (applicationContext == null) {
return null;
}

T bean;
try {
bean = applicationContext.getBean(beanType);
Expand All @@ -151,10 +134,6 @@ public static <T> T getPrototypeBean(Class<T> beanType) throws IllegalStateExcep
* @throws IllegalStateException if not prototype bean
*/
public static <T> T getPrototypeBean(String beanName, Class<T> beanType) throws IllegalStateException {
if (applicationContext == null) {
return null;
}

T bean;
try {
bean = applicationContext.getBean(beanName, beanType);
Expand All @@ -172,10 +151,6 @@ public static <T> T getPrototypeBean(String beanName, Class<T> beanType) throws
// -----------------------------------------------------------------------getSingletonBean

public static <T> T getSingletonBean(String beanName) throws IllegalStateException {
if (applicationContext == null) {
return null;
}

T bean;
try {
bean = (T) applicationContext.getBean(beanName);
Expand All @@ -198,10 +173,6 @@ public static <T> T getSingletonBean(String beanName) throws IllegalStateExcepti
* @throws IllegalStateException if not singleton bean
*/
public static <T> T getSingletonBean(Class<T> beanType) throws IllegalStateException {
if (applicationContext == null) {
return null;
}

T bean;
try {
bean = applicationContext.getBean(beanType);
Expand All @@ -226,10 +197,6 @@ public static <T> T getSingletonBean(Class<T> beanType) throws IllegalStateExcep
* @throws IllegalStateException if not singleton bean
*/
public static <T> T getSingletonBean(String beanName, Class<T> beanType) throws IllegalStateException {
if (applicationContext == null) {
return null;
}

T bean;
try {
bean = applicationContext.getBean(beanName, beanType);
Expand All @@ -244,7 +211,7 @@ public static <T> T getSingletonBean(String beanName, Class<T> beanType) throws
return bean;
}

// -----------------------------------------------------------------------
// -----------------------------------------------------------------------other methods

/**
* Returns spring container contains specified bean name.
Expand All @@ -253,9 +220,6 @@ public static <T> T getSingletonBean(String beanName, Class<T> beanType) throws
* @return {@code true} if contains bean
*/
public static boolean containsBean(String beanName) {
if (applicationContext == null) {
return false;
}
return applicationContext.containsBean(beanName);
}

Expand All @@ -266,9 +230,6 @@ public static boolean containsBean(String beanName) {
* @return bean name type
*/
public static Class<?> getType(String beanName) {
if (applicationContext == null) {
return null;
}
return applicationContext.getType(beanName);
}

Expand All @@ -279,9 +240,6 @@ public static Class<?> getType(String beanName) {
* @return other alias name
*/
public static String[] getAliases(String beanName) {
if (applicationContext == null) {
return null;
}
return applicationContext.getAliases(beanName);
}

Expand Down Expand Up @@ -332,9 +290,6 @@ public static <T> T registerBean(String beanName, Class<T> beanType, Object... a
* @param bean the spring bean
*/
public static void autowire(Object bean) {
if (applicationContext == null) {
return;
}
applicationContext.getAutowireCapableBeanFactory().autowireBean(bean);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,38 +102,47 @@ public static List<SplitTask> split(JobHandlerParam param) throws JobException {
}

/**
* Load jobHandler instance, String parameter can be qualified class name or source code
* Load jobHandler instance, String parameter can be spring bean name or qualified class name or source code
*
* @param text qualified class name or source code
* @param text spring bean name or qualified class name or source code
* @return JobHandler instance object
* @throws JobException if new instance failed
*/
public static JobHandler load(String text) throws JobException {
JobHandler handler = SpringContextHolder.getPrototypeBean(text, JobHandler.class);
if (handler != null) {
if (SpringContextHolder.applicationContext() != null) {
// must be annotated with @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
// get by spring bean name
JobHandler handler = SpringContextHolder.getPrototypeBean(text, JobHandler.class);
if (handler != null) {
return handler;
}

Class<? extends JobHandler> jobHandlerClass = getJobHandlerClass(text);
handler = SpringContextHolder.getPrototypeBean(jobHandlerClass);
if (handler != null) {
return handler;
}

handler = ClassUtils.newInstance(jobHandlerClass);
SpringContextHolder.autowire(handler);
return handler;
} else {
Class<? extends JobHandler> jobHandlerClass = getJobHandlerClass(text);
return ClassUtils.newInstance(jobHandlerClass);
}
}

Class<JobHandler> type = ClassUtils.getClass(text);
private static Class<? extends JobHandler> getJobHandlerClass(String text) throws JobException {
Class<? extends JobHandler> type = ClassUtils.getClass(text);
if (type == null) {
throw new JobException(JobCodeMsg.LOAD_HANDLER_ERROR, "Illegal job handler: " + text);
throw new JobException(JobCodeMsg.LOAD_HANDLER_ERROR, "Illegal job handler class: " + text);
}

// interface type: Modifier.isAbstract(type.getModifiers()) -> true
if (!JobHandler.class.isAssignableFrom(type) || Modifier.isAbstract(type.getModifiers())) {
throw new JobException(JobCodeMsg.LOAD_HANDLER_ERROR, "Invalid job handler: " + ClassUtils.getName(type) + ", " + text);
throw new JobException(JobCodeMsg.LOAD_HANDLER_ERROR, "Invalid job handler class '" + ClassUtils.getName(type) + "': " + text);
}

// must be annotated with @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
handler = SpringContextHolder.getPrototypeBean(type);
if (handler != null) {
return handler;
}

handler = ClassUtils.newInstance(type);
SpringContextHolder.autowire(handler);
return handler;
return type;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package cn.ponfee.disjob.registry.rpc;

import cn.ponfee.disjob.common.base.RetryInvocationHandler;
import cn.ponfee.disjob.common.base.Symbol.Str;
import cn.ponfee.disjob.common.collect.Collects;
import cn.ponfee.disjob.common.exception.Throwables.ThrowingConsumer;
Expand Down Expand Up @@ -75,22 +76,31 @@ public final class DiscoveryServerRestProxy {
/**
* Creates ungrouped rpc service client proxy.
*
* @param interfaceType the interface class
* @param discoveryServer the discoveryServer
* @param restTemplate the restTemplate
* @param retry the retry config
* @param <T> interface type
* @param <D> discovery server type
* @param interfaceType the interface class
* @param localServiceProvider the localServiceProvider
* @param discoveryServer the discoveryServer
* @param restTemplate the restTemplate
* @param retry the retry config
* @param <T> interface type
* @param <D> discovery server type
* @return rpc service client proxy
*/
public static <T, D extends Server> T create(Class<T> interfaceType,
@Nullable T localServiceProvider,
Discovery<D> discoveryServer,
RestTemplate restTemplate,
RetryProperties retry) {
DiscoveryServerRestTemplate<D> template = new DiscoveryServerRestTemplate<>(discoveryServer, restTemplate, retry);
String prefixPath = getMappingPath(AnnotationUtils.findAnnotation(interfaceType, RequestMapping.class));
InvocationHandler ungroupedInvocationHandler = new UngroupedInvocationHandler(template, prefixPath);
return ProxyUtils.create(ungroupedInvocationHandler, interfaceType);
InvocationHandler invocationHandler;
if (localServiceProvider != null) {
// 本地调用:使用动态代理来增加重试能力
invocationHandler = new RetryInvocationHandler(localServiceProvider, retry.getMaxCount(), retry.getBackoffPeriod());
} else {
// 远程调用:通过Discovery<D>来获取目标服务器
DiscoveryServerRestTemplate<D> template = new DiscoveryServerRestTemplate<>(discoveryServer, restTemplate, retry);
String prefixPath = getMappingPath(AnnotationUtils.findAnnotation(interfaceType, RequestMapping.class));
invocationHandler = new UngroupedInvocationHandler(template, prefixPath);
}
return ProxyUtils.create(invocationHandler, interfaceType);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@

package cn.ponfee.disjob.supervisor.dao;

import cn.ponfee.disjob.core.base.JobConstants;
import cn.ponfee.disjob.supervisor.base.AbstractDataSourceConfig;
import org.apache.commons.lang3.ClassUtils;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
Expand Down Expand Up @@ -48,6 +50,7 @@
*
* @author Ponfee
*/
@ConditionalOnProperty(prefix = JobConstants.SUPERVISOR_KEY_PREFIX + ".datasource", name = "custom", havingValue = "false", matchIfMissing = true)
@Configuration
@MapperScan(
basePackages = SupervisorDataSourceConfig.BASE_PACKAGE + ".mapper",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@

package cn.ponfee.disjob.worker;

import cn.ponfee.disjob.common.base.RetryInvocationHandler;
import cn.ponfee.disjob.common.base.Startable;
import cn.ponfee.disjob.common.exception.Throwables.ThrowingRunnable;
import cn.ponfee.disjob.common.util.ProxyUtils;
import cn.ponfee.disjob.core.base.RetryProperties;
import cn.ponfee.disjob.core.base.SupervisorRpcService;
import cn.ponfee.disjob.core.base.Worker;
Expand All @@ -33,7 +31,6 @@
import org.slf4j.LoggerFactory;
import org.springframework.web.client.RestTemplate;

import java.lang.reflect.InvocationHandler;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -68,8 +65,8 @@ private WorkerStartup(Worker.Current currentWorker,
Objects.requireNonNull(taskReceiver, "Task receiver cannot null.");
Objects.requireNonNull(restTemplate, "Rest template cannot null.");

SupervisorRpcService supervisorRpcClient = createProxy(
supervisorRpcService, retryProperties, workerRegistry, restTemplate
SupervisorRpcService supervisorRpcClient = DiscoveryServerRestProxy.create(
SupervisorRpcService.class, supervisorRpcService, workerRegistry, restTemplate, retryProperties
);

this.currentWorker = currentWorker;
Expand Down Expand Up @@ -179,20 +176,4 @@ public WorkerStartup build() {
}
}

// ----------------------------------------------------------------------------------------private methods

private static SupervisorRpcService createProxy(SupervisorRpcService local,
RetryProperties retry,
WorkerRegistry discoverySupervisor,
RestTemplate restTemplate) {
if (local != null) {
// cn.ponfee.disjob.supervisor.provider.rpc.SupervisorRpcProvider
// 此Worker同时也是Supervisor身份,则是本地调用,并使用动态代理增加重试能力
InvocationHandler ih = new RetryInvocationHandler(local, retry.getMaxCount(), retry.getBackoffPeriod());
return ProxyUtils.create(ih, SupervisorRpcService.class);
} else {
return DiscoveryServerRestProxy.create(SupervisorRpcService.class, discoverySupervisor, restTemplate, retry);
}
}

}

0 comments on commit c73afab

Please sign in to comment.