Skip to content

Commit

Permalink
close asyncDelayedExecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
ponfee committed Feb 23, 2024
1 parent 3d323b2 commit 63f2718
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,23 +48,29 @@ public final class AsyncDelayedExecutor<E> extends Thread {

private static final Logger LOG = LoggerFactory.getLogger(AsyncDelayedExecutor.class);

private final Consumer<E> processor; // 数据处理器
private final ThreadPoolExecutor asyncExecutor; // 异步执行器
/**
* 数据处理器
*/
private final Consumer<E> dataProcessor;

/**
* 异步执行器
*/
private final ThreadPoolExecutor asyncExecutor;

private final DelayQueue<DelayedData<E>> queue = new DelayQueue<>();
private final AtomicBoolean stopped = new AtomicBoolean(false);

public AsyncDelayedExecutor(Consumer<E> processor) {
this(1, processor);
public AsyncDelayedExecutor(Consumer<E> dataProcessor) {
this(1, dataProcessor);
}

/**
* @param maximumPoolSize the maximumPoolSize
* @param processor the data processor
* @param dataProcessor the data processor
*/
public AsyncDelayedExecutor(int maximumPoolSize,
Consumer<E> processor) {
this.processor = processor;
public AsyncDelayedExecutor(int maximumPoolSize, Consumer<E> dataProcessor) {
this.dataProcessor = dataProcessor;

ThreadPoolExecutor executor = null;
if (maximumPoolSize > 1) {
Expand All @@ -73,13 +79,13 @@ public AsyncDelayedExecutor(int maximumPoolSize,
.maximumPoolSize(maximumPoolSize)
.workQueue(new SynchronousQueue<>())
.keepAliveTimeSeconds(300)
.threadFactory(NamedThreadFactory.builder().prefix("async_delayed_worker").uncaughtExceptionHandler(LOG).build())
.threadFactory(NamedThreadFactory.builder().prefix("async_delayed_executor").uncaughtExceptionHandler(LOG).build())
.rejectedHandler(ThreadPoolExecutors.CALLER_RUNS)
.build();
}
this.asyncExecutor = executor;

super.setName("async_delayed_executor-" + Integer.toHexString(hashCode()));
super.setName("async_delayed_boss-" + Integer.toHexString(hashCode()));
super.setDaemon(false);
super.setUncaughtExceptionHandler(new LoggedUncaughtExceptionHandler(LOG));
super.start();
Expand All @@ -103,7 +109,7 @@ public boolean toStop() {

public void doStop() {
toStop();
Threads.stopThread(this, 1000);
Threads.stopThread(this, 3000);
}

@Override
Expand All @@ -115,24 +121,24 @@ public void run() {
}
DelayedData<E> delayed;
try {
delayed = queue.poll(3000, TimeUnit.MILLISECONDS);
delayed = queue.poll(2000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.error("Delayed queue pool interrupted.", e);
toStop();
Thread.currentThread().interrupt();
break;
}

if (delayed != null) {
E data = delayed.getData();
if (asyncExecutor != null) {
asyncExecutor.submit(ThrowingRunnable.toCaught(() -> processor.accept(data)));
asyncExecutor.submit(ThrowingRunnable.toCaught(() -> dataProcessor.accept(data)));
} else {
ThrowingRunnable.doCaught(() -> processor.accept(data));
ThrowingRunnable.doCaught(() -> dataProcessor.accept(data));
}
}
}

toStop();
if (asyncExecutor != null) {
// destroy the async executor
ThreadPoolExecutors.shutdown(asyncExecutor, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package cn.ponfee.disjob.common.spring;

import cn.ponfee.disjob.common.util.ClassUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
Expand Down Expand Up @@ -91,6 +92,24 @@ public static <T> T getBean(String beanName, Class<T> beanType) {
return applicationContext.getBean(beanName, beanType);
}

public static <T> T getPrototypeBean(String beanName) throws IllegalStateException {
if (applicationContext == null) {
return null;
}

T bean;
try {
bean = (T) applicationContext.getBean(beanName);
} catch (BeansException ignored) {
return null;
}

if (applicationContext.isPrototype(beanName)) {
return bean;
}
throw new IllegalStateException("Bean name is not a prototype bean: " + beanName);
}

/**
* Gets spring bean by bean name and type, if not defined bean then return null
*
Expand Down Expand Up @@ -143,6 +162,76 @@ public static <T> T getPrototypeBean(Class<T> beanType) throws IllegalStateExcep
throw new IllegalStateException("Bean type is not a prototype bean: " + beanType);
}

public static <T> T getSingletonBean(String beanName) throws IllegalStateException {
if (applicationContext == null) {
return null;
}

T bean;
try {
bean = (T) applicationContext.getBean(beanName);
} catch (BeansException ignored) {
return null;
}

if (applicationContext.isSingleton(beanName)) {
return bean;
}
throw new IllegalStateException("Bean name is not a prototype bean: " + beanName);
}

/**
* Gets spring bean by bean name and type, if not defined bean then return null
*
* @param beanName the bean name
* @param beanType the bean type
* @return spring bean
* @throws IllegalStateException if not singleton bean
*/
public static <T> T getSingletonBean(String beanName, Class<T> beanType) throws IllegalStateException {
if (applicationContext == null) {
return null;
}

T bean;
try {
bean = applicationContext.getBean(beanName, beanType);
} catch (BeansException ignored) {
return null;
}

if (applicationContext.isSingleton(beanName)) {
return bean;
}
throw new IllegalStateException("Bean name is not a singleton bean: " + beanName + ", " + beanType);
}

/**
* Gets spring bean by bean type, if not defined bean then return null
*
* @param beanType the bean type
* @return spring bean
* @throws IllegalStateException if not singleton bean
*/
public static <T> T getSingletonBean(Class<T> beanType) throws IllegalStateException {
if (applicationContext == null) {
return null;
}

T bean;
try {
bean = applicationContext.getBean(beanType);
} catch (BeansException ignored) {
return null;
}

String[] beanNames = applicationContext.getBeanNamesForType(beanType);
if (Arrays.stream(beanNames).allMatch(applicationContext::isSingleton)) {
return bean;
}
throw new IllegalStateException("Bean type is not a singleton bean: " + beanType);
}

/**
* Returns spring container contains specified bean name.
*
Expand Down Expand Up @@ -189,7 +278,7 @@ public static String[] getAliases(String beanName) {
*/
public static void removeBean(String beanName) {
if (!(applicationContext instanceof ConfigurableApplicationContext)) {
return;
throw new UnsupportedOperationException("Remove bean failed: " + ClassUtils.getClassName(applicationContext));
}

ConfigurableApplicationContext cac = (ConfigurableApplicationContext) applicationContext;
Expand All @@ -208,7 +297,7 @@ public static void removeBean(String beanName) {
*/
public static <T> T registerBean(String beanName, Class<T> beanType, Object... args) {
if (!(applicationContext instanceof ConfigurableApplicationContext)) {
throw new BeanDefinitionValidationException("Register bean failed: not ConfigurableApplicationContext.");
throw new BeanDefinitionValidationException("Register bean failed: " + ClassUtils.getClassName(applicationContext));
}

BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(beanType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,19 +217,19 @@ public static Field getStaticField(Class<?> clazz, String staticFieldName) {
*/
public static String getName(Class<?> clazz) {
String name = clazz.getCanonicalName();
if (name == null) {
name = clazz.getName();
}
return name != null ? name : clazz.getName();
}

return name;
public static String getClassName(Object obj) {
return obj == null ? null : getName(obj.getClass());
}

/**
* 包名称转目录路径名<p>
* getPackagePath("cn.ponfee.commons.reflect") -> cn/ponfee/commons/reflect
*
* @param packageName the package name
* @return
* @return package name
* @see org.springframework.util.ClassUtils#convertClassNameToResourcePath
*/
public static String getPackagePath(String packageName) {
Expand Down Expand Up @@ -311,10 +311,10 @@ public static <T> T newInstance(Class<T> type) {
* ClassUtils.newInstance(Tuple3.class, new Object[]{1, 2, 3}) <p>
* ClassUtils.newInstance(Tuple2.class, new Object[]{new String[]{"a", "b"}, new Integer[]{1, 2}}) <p>
*
* @param type the type
* @param type the class
* @param args the args
* @param <T>
* @return
* @param <T> class type
* @return instance
*/
public static <T> T newInstance(Class<T> type, Object[] args) {
checkObjectArray(args);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public void start() {
*/
@Override
public void stop() {
// No-op
asyncDelayedExecutor.doStop();
}

// ------------------------------------------------------------private methods
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class JdkAtomicCounter extends AtomicCounter {
private final AtomicLong counter;

public JdkAtomicCounter() {
this(1);
this(0);
}

public JdkAtomicCounter(long initialValue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@

<!-- ********************page query******************** -->

<!-- COUNT(*)是SQL 92定义的标准统计行数的语法,优先走普通索引进行统计 -->
<select id="queryPageCount" resultType="_long">
SELECT COUNT(*) cnt
FROM <include refid="Table_Name" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public void close() {

// 2、do close
// 2.1、stop this boss thread
ThrowingRunnable.doCaught(() -> Threads.stopThread(this, 200));
ThrowingRunnable.doCaught(() -> Threads.stopThread(this, 1000));

// 2.2、stop idle pool thread
idlePool.forEach(e -> ThrowingRunnable.doCaught(() -> stopWorkerThread(e, true)));
Expand Down Expand Up @@ -705,15 +705,16 @@ private void execute(ExecuteTaskParam task) throws InterruptedException {
private void toStop() {
if (stopped.compareAndSet(false, true)) {
threadPool.workerThreadCounter.decrementAndGet();
super.interrupt();
ExecuteTaskParam task = currentTask();
if (task != null) {
task.stop();
}
}
}

private void doStop() {
toStop();
ExecuteTaskParam task = currentTask();
if (task != null) {
task.stop();
}
Threads.stopThread(this, 2000);
}

Expand Down

0 comments on commit 63f2718

Please sign in to comment.