From c73afabefa0b708da7d74f2b25b25ce0d36dddc7 Mon Sep 17 00:00:00 2001 From: Ponfee Date: Mon, 4 Mar 2024 22:24:26 +0800 Subject: [PATCH] supported custom supervisor datasource --- .../common/spring/SpringContextHolder.java | 47 +------------------ .../disjob/core/handle/JobHandlerUtils.java | 43 ++++++++++------- .../rpc/DiscoveryServerRestProxy.java | 30 ++++++++---- .../dao/SupervisorDataSourceConfig.java | 3 ++ .../ponfee/disjob/worker/WorkerStartup.java | 23 +-------- 5 files changed, 52 insertions(+), 94 deletions(-) diff --git a/disjob-common/src/main/java/cn/ponfee/disjob/common/spring/SpringContextHolder.java b/disjob-common/src/main/java/cn/ponfee/disjob/common/spring/SpringContextHolder.java index f8e0bbea9..4e4a2c72c 100644 --- a/disjob-common/src/main/java/cn/ponfee/disjob/common/spring/SpringContextHolder.java +++ b/disjob-common/src/main/java/cn/ponfee/disjob/common/spring/SpringContextHolder.java @@ -60,9 +60,6 @@ public static ApplicationContext applicationContext() { * @return spring bean */ public static Object getBean(String beanName) { - if (applicationContext == null) { - return null; - } return applicationContext.getBean(beanName); } @@ -73,9 +70,6 @@ public static Object getBean(String beanName) { * @return spring bean */ public static T getBean(Class beanType) { - if (applicationContext == null) { - return null; - } return applicationContext.getBean(beanType); } @@ -87,19 +81,12 @@ public static T getBean(Class beanType) { * @return spring bean */ public static T getBean(String beanName, Class beanType) { - if (applicationContext == null) { - return null; - } return applicationContext.getBean(beanName, beanType); } // -----------------------------------------------------------------------getPrototypeBean public static T getPrototypeBean(String beanName) throws IllegalStateException { - if (applicationContext == null) { - return null; - } - T bean; try { bean = (T) applicationContext.getBean(beanName); @@ -122,10 +109,6 @@ public static T getPrototypeBean(String beanName) throws IllegalStateExcepti * @throws IllegalStateException if not prototype bean */ public static T getPrototypeBean(Class beanType) throws IllegalStateException { - if (applicationContext == null) { - return null; - } - T bean; try { bean = applicationContext.getBean(beanType); @@ -151,10 +134,6 @@ public static T getPrototypeBean(Class beanType) throws IllegalStateExcep * @throws IllegalStateException if not prototype bean */ public static T getPrototypeBean(String beanName, Class beanType) throws IllegalStateException { - if (applicationContext == null) { - return null; - } - T bean; try { bean = applicationContext.getBean(beanName, beanType); @@ -172,10 +151,6 @@ public static T getPrototypeBean(String beanName, Class beanType) throws // -----------------------------------------------------------------------getSingletonBean public static T getSingletonBean(String beanName) throws IllegalStateException { - if (applicationContext == null) { - return null; - } - T bean; try { bean = (T) applicationContext.getBean(beanName); @@ -198,10 +173,6 @@ public static T getSingletonBean(String beanName) throws IllegalStateExcepti * @throws IllegalStateException if not singleton bean */ public static T getSingletonBean(Class beanType) throws IllegalStateException { - if (applicationContext == null) { - return null; - } - T bean; try { bean = applicationContext.getBean(beanType); @@ -226,10 +197,6 @@ public static T getSingletonBean(Class beanType) throws IllegalStateExcep * @throws IllegalStateException if not singleton bean */ public static T getSingletonBean(String beanName, Class beanType) throws IllegalStateException { - if (applicationContext == null) { - return null; - } - T bean; try { bean = applicationContext.getBean(beanName, beanType); @@ -244,7 +211,7 @@ public static T getSingletonBean(String beanName, Class beanType) throws return bean; } - // ----------------------------------------------------------------------- + // -----------------------------------------------------------------------other methods /** * Returns spring container contains specified bean name. @@ -253,9 +220,6 @@ public static T getSingletonBean(String beanName, Class beanType) throws * @return {@code true} if contains bean */ public static boolean containsBean(String beanName) { - if (applicationContext == null) { - return false; - } return applicationContext.containsBean(beanName); } @@ -266,9 +230,6 @@ public static boolean containsBean(String beanName) { * @return bean name type */ public static Class getType(String beanName) { - if (applicationContext == null) { - return null; - } return applicationContext.getType(beanName); } @@ -279,9 +240,6 @@ public static Class getType(String beanName) { * @return other alias name */ public static String[] getAliases(String beanName) { - if (applicationContext == null) { - return null; - } return applicationContext.getAliases(beanName); } @@ -332,9 +290,6 @@ public static T registerBean(String beanName, Class beanType, Object... a * @param bean the spring bean */ public static void autowire(Object bean) { - if (applicationContext == null) { - return; - } applicationContext.getAutowireCapableBeanFactory().autowireBean(bean); } diff --git a/disjob-core/src/main/java/cn/ponfee/disjob/core/handle/JobHandlerUtils.java b/disjob-core/src/main/java/cn/ponfee/disjob/core/handle/JobHandlerUtils.java index 9c19b7aed..76de1b02e 100644 --- a/disjob-core/src/main/java/cn/ponfee/disjob/core/handle/JobHandlerUtils.java +++ b/disjob-core/src/main/java/cn/ponfee/disjob/core/handle/JobHandlerUtils.java @@ -102,38 +102,47 @@ public static List split(JobHandlerParam param) throws JobException { } /** - * Load jobHandler instance, String parameter can be qualified class name or source code + * Load jobHandler instance, String parameter can be spring bean name or qualified class name or source code * - * @param text qualified class name or source code + * @param text spring bean name or qualified class name or source code * @return JobHandler instance object * @throws JobException if new instance failed */ public static JobHandler load(String text) throws JobException { - JobHandler handler = SpringContextHolder.getPrototypeBean(text, JobHandler.class); - if (handler != null) { + if (SpringContextHolder.applicationContext() != null) { // must be annotated with @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) + // get by spring bean name + JobHandler handler = SpringContextHolder.getPrototypeBean(text, JobHandler.class); + if (handler != null) { + return handler; + } + + Class jobHandlerClass = getJobHandlerClass(text); + handler = SpringContextHolder.getPrototypeBean(jobHandlerClass); + if (handler != null) { + return handler; + } + + handler = ClassUtils.newInstance(jobHandlerClass); + SpringContextHolder.autowire(handler); return handler; + } else { + Class jobHandlerClass = getJobHandlerClass(text); + return ClassUtils.newInstance(jobHandlerClass); } + } - Class type = ClassUtils.getClass(text); + private static Class getJobHandlerClass(String text) throws JobException { + Class type = ClassUtils.getClass(text); if (type == null) { - throw new JobException(JobCodeMsg.LOAD_HANDLER_ERROR, "Illegal job handler: " + text); + throw new JobException(JobCodeMsg.LOAD_HANDLER_ERROR, "Illegal job handler class: " + text); } // interface type: Modifier.isAbstract(type.getModifiers()) -> true if (!JobHandler.class.isAssignableFrom(type) || Modifier.isAbstract(type.getModifiers())) { - throw new JobException(JobCodeMsg.LOAD_HANDLER_ERROR, "Invalid job handler: " + ClassUtils.getName(type) + ", " + text); + throw new JobException(JobCodeMsg.LOAD_HANDLER_ERROR, "Invalid job handler class '" + ClassUtils.getName(type) + "': " + text); } - - // must be annotated with @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) - handler = SpringContextHolder.getPrototypeBean(type); - if (handler != null) { - return handler; - } - - handler = ClassUtils.newInstance(type); - SpringContextHolder.autowire(handler); - return handler; + return type; } } diff --git a/disjob-registry/disjob-registry-api/src/main/java/cn/ponfee/disjob/registry/rpc/DiscoveryServerRestProxy.java b/disjob-registry/disjob-registry-api/src/main/java/cn/ponfee/disjob/registry/rpc/DiscoveryServerRestProxy.java index 4e34165d1..1190b873a 100644 --- a/disjob-registry/disjob-registry-api/src/main/java/cn/ponfee/disjob/registry/rpc/DiscoveryServerRestProxy.java +++ b/disjob-registry/disjob-registry-api/src/main/java/cn/ponfee/disjob/registry/rpc/DiscoveryServerRestProxy.java @@ -16,6 +16,7 @@ package cn.ponfee.disjob.registry.rpc; +import cn.ponfee.disjob.common.base.RetryInvocationHandler; import cn.ponfee.disjob.common.base.Symbol.Str; import cn.ponfee.disjob.common.collect.Collects; import cn.ponfee.disjob.common.exception.Throwables.ThrowingConsumer; @@ -75,22 +76,31 @@ public final class DiscoveryServerRestProxy { /** * Creates ungrouped rpc service client proxy. * - * @param interfaceType the interface class - * @param discoveryServer the discoveryServer - * @param restTemplate the restTemplate - * @param retry the retry config - * @param interface type - * @param discovery server type + * @param interfaceType the interface class + * @param localServiceProvider the localServiceProvider + * @param discoveryServer the discoveryServer + * @param restTemplate the restTemplate + * @param retry the retry config + * @param interface type + * @param discovery server type * @return rpc service client proxy */ public static T create(Class interfaceType, + @Nullable T localServiceProvider, Discovery discoveryServer, RestTemplate restTemplate, RetryProperties retry) { - DiscoveryServerRestTemplate template = new DiscoveryServerRestTemplate<>(discoveryServer, restTemplate, retry); - String prefixPath = getMappingPath(AnnotationUtils.findAnnotation(interfaceType, RequestMapping.class)); - InvocationHandler ungroupedInvocationHandler = new UngroupedInvocationHandler(template, prefixPath); - return ProxyUtils.create(ungroupedInvocationHandler, interfaceType); + InvocationHandler invocationHandler; + if (localServiceProvider != null) { + // 本地调用:使用动态代理来增加重试能力 + invocationHandler = new RetryInvocationHandler(localServiceProvider, retry.getMaxCount(), retry.getBackoffPeriod()); + } else { + // 远程调用:通过Discovery来获取目标服务器 + DiscoveryServerRestTemplate template = new DiscoveryServerRestTemplate<>(discoveryServer, restTemplate, retry); + String prefixPath = getMappingPath(AnnotationUtils.findAnnotation(interfaceType, RequestMapping.class)); + invocationHandler = new UngroupedInvocationHandler(template, prefixPath); + } + return ProxyUtils.create(invocationHandler, interfaceType); } /** diff --git a/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/dao/SupervisorDataSourceConfig.java b/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/dao/SupervisorDataSourceConfig.java index 5cf371506..edfab1f8e 100644 --- a/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/dao/SupervisorDataSourceConfig.java +++ b/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/dao/SupervisorDataSourceConfig.java @@ -16,11 +16,13 @@ package cn.ponfee.disjob.supervisor.dao; +import cn.ponfee.disjob.core.base.JobConstants; import cn.ponfee.disjob.supervisor.base.AbstractDataSourceConfig; import org.apache.commons.lang3.ClassUtils; import org.apache.ibatis.session.SqlSessionFactory; import org.mybatis.spring.SqlSessionTemplate; import org.mybatis.spring.annotation.MapperScan; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.jdbc.DataSourceBuilder; import org.springframework.context.annotation.Bean; @@ -48,6 +50,7 @@ * * @author Ponfee */ +@ConditionalOnProperty(prefix = JobConstants.SUPERVISOR_KEY_PREFIX + ".datasource", name = "custom", havingValue = "false", matchIfMissing = true) @Configuration @MapperScan( basePackages = SupervisorDataSourceConfig.BASE_PACKAGE + ".mapper", diff --git a/disjob-worker/src/main/java/cn/ponfee/disjob/worker/WorkerStartup.java b/disjob-worker/src/main/java/cn/ponfee/disjob/worker/WorkerStartup.java index 690e99fed..ee85b7ccc 100644 --- a/disjob-worker/src/main/java/cn/ponfee/disjob/worker/WorkerStartup.java +++ b/disjob-worker/src/main/java/cn/ponfee/disjob/worker/WorkerStartup.java @@ -16,10 +16,8 @@ package cn.ponfee.disjob.worker; -import cn.ponfee.disjob.common.base.RetryInvocationHandler; import cn.ponfee.disjob.common.base.Startable; import cn.ponfee.disjob.common.exception.Throwables.ThrowingRunnable; -import cn.ponfee.disjob.common.util.ProxyUtils; import cn.ponfee.disjob.core.base.RetryProperties; import cn.ponfee.disjob.core.base.SupervisorRpcService; import cn.ponfee.disjob.core.base.Worker; @@ -33,7 +31,6 @@ import org.slf4j.LoggerFactory; import org.springframework.web.client.RestTemplate; -import java.lang.reflect.InvocationHandler; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; @@ -68,8 +65,8 @@ private WorkerStartup(Worker.Current currentWorker, Objects.requireNonNull(taskReceiver, "Task receiver cannot null."); Objects.requireNonNull(restTemplate, "Rest template cannot null."); - SupervisorRpcService supervisorRpcClient = createProxy( - supervisorRpcService, retryProperties, workerRegistry, restTemplate + SupervisorRpcService supervisorRpcClient = DiscoveryServerRestProxy.create( + SupervisorRpcService.class, supervisorRpcService, workerRegistry, restTemplate, retryProperties ); this.currentWorker = currentWorker; @@ -179,20 +176,4 @@ public WorkerStartup build() { } } - // ----------------------------------------------------------------------------------------private methods - - private static SupervisorRpcService createProxy(SupervisorRpcService local, - RetryProperties retry, - WorkerRegistry discoverySupervisor, - RestTemplate restTemplate) { - if (local != null) { - // cn.ponfee.disjob.supervisor.provider.rpc.SupervisorRpcProvider - // 此Worker同时也是Supervisor身份,则是本地调用,并使用动态代理增加重试能力 - InvocationHandler ih = new RetryInvocationHandler(local, retry.getMaxCount(), retry.getBackoffPeriod()); - return ProxyUtils.create(ih, SupervisorRpcService.class); - } else { - return DiscoveryServerRestProxy.create(SupervisorRpcService.class, discoverySupervisor, restTemplate, retry); - } - } - }