• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

《Spring 高手系列》异步笔记

武飞扬头像
newProxyInstance
帮助1

参考链接1

EnableAsync

源码

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {

	Class<? extends Annotation> annotation() default Annotation.class;	
	
	boolean proxyTargetClass() default false;
	
	AdviceMode mode() default AdviceMode.PROXY;
	
	int order() default Ordered.LOWEST_PRECEDENCE;
}

修饰范围:类型

AdviceMode

public enum AdviceMode {

	/**
	 * JDK proxy-based advice.
	 */
	PROXY,

	/**
	 * AspectJ weaving-based advice.
	 */
	ASPECTJ

}

AsyncConfigurationSelector

public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {

	private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
			"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";

	@Override
	@Nullable
	public String[] selectImports(AdviceMode adviceMode) {
		switch (adviceMode) {
			case PROXY:
				return new String[] {ProxyAsyncConfiguration.class.getName()};
			case ASPECTJ:
				return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
			default:
				return null;
		}
	}
}
学新通

Async

@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Async {
	String value() default "";
}

修饰范围:类型、方法。

案例1 无返回值

@Component
public class LogService {
    @Async
    public void log(String msg) throws InterruptedException {
        System.out.println(Thread.currentThread()   "开始记录日志,"   System.currentTimeMillis());
        //模拟耗时2秒
        TimeUnit.SECONDS.sleep(2);
        System.out.println(Thread.currentThread()   "日志记录完毕,"   System.currentTimeMillis());
    }
}
@ComponentScan
@EnableAsync
public class Client {
    public static void main(String[] args) throws InterruptedException {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
        context.register(Client.class);
        context.refresh();
        LogService logService = context.getBean(LogService.class);
        System.out.println(Thread.currentThread()   " logService.log start,"   System.currentTimeMillis());
        logService.log("异步执行方法!");
        System.out.println(Thread.currentThread()   " logService.log end,"   System.currentTimeMillis());
        TimeUnit.SECONDS.sleep(3);
    }
}
Thread[main,5,main] logService.log start,1667887578200
14:06:18.209 [main] DEBUG org.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor - Could not find default TaskExecutor bean
org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.core.task.TaskExecutor' available
	at org.springframework.beans.factory.support.DefaultListableBeanFactory.getBean(DefaultListableBeanFactory.java:351)
	at org.springframework.beans.factory.support.DefaultListableBeanFactory.getBean(DefaultListableBeanFactory.java:342)
	at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.getDefaultExecutor(AsyncExecutionAspectSupport.java:233)
	at org.springframework.aop.interceptor.AsyncExecutionInterceptor.getDefaultExecutor(AsyncExecutionInterceptor.java:157)
	at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.lambda$configure$2(AsyncExecutionAspectSupport.java:119)
	at org.springframework.util.function.SingletonSupplier.get(SingletonSupplier.java:100)
	at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.determineAsyncExecutor(AsyncExecutionAspectSupport.java:172)
	at org.springframework.aop.interceptor.AsyncExecutionInterceptor.invoke(AsyncExecutionInterceptor.java:107)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
	at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692)
	at com.example.lurenjia.spring.c37.d1.LogService$$EnhancerBySpringCGLIB$$ecabcff7.log(<generated>)
	at com.example.lurenjia.spring.c37.d1.Client.main(Client.java:23)
14:06:18.210 [main] INFO org.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor - No task executor bean found for async processing: no bean of type TaskExecutor and no bean named 'taskExecutor' either
Thread[main,5,main] logService.log end,1667887578212
Thread[SimpleAsyncTaskExecutor-1,5,main]开始记录日志,1667887578220
Thread[SimpleAsyncTaskExecutor-1,5,main]日志记录完毕,1667887580234

进程已结束,退出代码为 0

学新通

为什么不报错的原因在这里

AsyncExecutionInterceptor 这个类有一个getDefaultExecutor

	@Override
	@Nullable
	protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
		Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
		return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
	}

首先在容器中找TaskExecutor这个类型的bean如果有就用,没有就会创建一个SimpleAsyncTaskExecutor

想要修复这个错就得自定义一个线程池

@Configuration
public class TaskExecutorConfig {

    @Bean
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(100);
        executor.setThreadNamePrefix("my-thread-");
        return executor;
    }
}

修复后输出

14:27:03.298 [main] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor - Initializing ExecutorService 'taskExecutor'
Thread[main,5,main] logService.log start,1667888823308
Thread[main,5,main] logService.log end,1667888823310
Thread[my-thread-1,5,main]开始记录日志,1667888823316
Thread[my-thread-1,5,main]日志记录完毕,1667888825316

案例2 有返回值

@Async
@Component
public class GoodsService {
    public Future<String> getGoodsInfo(long goodsId) throws InterruptedException {
        TimeUnit.MILLISECONDS.sleep(500);
        return AsyncResult.forValue(String.format("商品%s基本信息!", goodsId));
    }

    public Future<String> getGoodsDesc(long goodsId) throws InterruptedException {
        TimeUnit.MILLISECONDS.sleep(500);
        return AsyncResult.forValue(String.format("商品%s描述信息!", goodsId));
    }

    public Future<List<String>> getGoodsComments(long goodsId) throws InterruptedException {
        TimeUnit.MILLISECONDS.sleep(500);
        List<String> comments = Arrays.asList("评论1", "评论2");
        return AsyncResult.forValue(comments);
    }
}
学新通
@Configuration
public class TaskExecutorConfig {

    @Bean
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(100);
        executor.setThreadNamePrefix("d2-thread-");
        return executor;
    }
}
@ComponentScan
@EnableAsync
public class Client {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
        context.register(Client.class);
        context.refresh();
        GoodsService goodsService = context.getBean(GoodsService.class);

        long starTime = System.currentTimeMillis();
        System.out.println("开始获取商品的各种信息");

        long goodsId = 1L;
        Future<String> goodsInfoFuture = goodsService.getGoodsInfo(goodsId);
        Future<String> goodsDescFuture = goodsService.getGoodsDesc(goodsId);
        Future<List<String>> goodsCommentsFuture = goodsService.getGoodsComments(goodsId);

        System.out.println(goodsInfoFuture.get());
        System.out.println(goodsDescFuture.get());
        System.out.println(goodsCommentsFuture.get());

        System.out.println("商品信息获取完毕,总耗时(ms):"   (System.currentTimeMillis() - starTime));

        //休眠一下,防止@Test退出
        TimeUnit.SECONDS.sleep(3);
    }
}
学新通
15:00:41.668 [main] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor - Initializing ExecutorService 'taskExecutor'
开始获取商品的各种信息
商品1基本信息!
商品1描述信息!
[评论1, 评论2]
商品信息获取完毕,总耗时(ms):521

自定义线程池

方式一

@Configuration
public class TaskExecutorConfig {

    @Bean
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(100);
        executor.setThreadNamePrefix("d2-thread-");
        return executor;
    }
}

文章中说名称必须为taskExecutor,这个是不对的,将名字修改为任意值测试即可。

异常处理

在之前 juc 的篇目中我们知道Thread的runable方法只能通过UncaughtExceptionHandler来处理,future可以通过try-catch包裹get方法来捕获异常,在这里也是相同的道理。

有返回值

@Service
public class LogService {
    
    @Async
    public Future<String> mockException() {
        //模拟抛出一个异常
        throw new IllegalArgumentException("参数有误!");
    }

    @Async
    public void mockNoReturnException() {
        //模拟抛出一个异常
        throw new IllegalArgumentException("无返回值的异常!");
    }
}
@ComponentScan(basePackageClasses = {TaskExecutorConfig.class, Client.class, LogService.class})
@EnableAsync
public class Client {

    private static final Logger logger = LoggerFactory.getLogger(Client.class);

    public static void main(String[] args) throws Exception {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
        context.register(Client.class);
        context.refresh();
        LogService logService = context.getBean(LogService.class);
        try {
            Future<String> future = logService.mockException();
            System.out.println(future.get());
        } catch (Exception e) {
            logger.error(e.getMessage());
        }
    }
}
学新通

无返回值

@ComponentScan(basePackageClasses = {TaskExecutorConfig.class, Client.class, LogService.class})
@EnableAsync
public class Client {

    private static final Logger logger = LoggerFactory.getLogger(Client.class);

    public static void main(String[] args) throws Exception {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
        context.register(Client.class);
        context.refresh();
        LogService logService = context.getBean(LogService.class);
        logService.mockNoReturnException();
    }
}
ERROR org.springframework.aop.interceptor.SimpleAsyncUncaughtExceptionHandler - Unexpected exception occurred invoking async method: public void com.example.lurenjia.spring.c37.service.LogService.mockNoReturnException()
java.lang.IllegalArgumentException: 无返回值的异常!
	at com.example.lurenjia.spring.c37.service.LogService.mockNoReturnException(LogService.java:25)
	at com.example.lurenjia.spring.c37.service.LogService$$FastClassBySpringCGLIB$$e4f93305.invoke(<generated>)
	at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
	at org.springframework.aop.interceptor.AsyncExecutionInterceptor.lambda$invoke$0(AsyncExecutionInterceptor.java:115)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

我们还没有添加处理器直接打印的结果如下,观察打印有一个SimpleAsyncUncaughtExceptionHandler 这么一个类。

public class SimpleAsyncUncaughtExceptionHandler implements AsyncUncaughtExceptionHandler {

	private static final Log logger = LogFactory.getLog(SimpleAsyncUncaughtExceptionHandler.class);


	@Override
	public void handleUncaughtException(Throwable ex, Method method, Object... params) {
		if (logger.isErrorEnabled()) {
			logger.error("Unexpected exception occurred invoking async method: "   method, ex);
		}
	}

}

添加自定义处理器

    @Bean
    public AsyncConfigurer asyncConfigurer() {
        return new AsyncConfigurer() {

            @Nullable
            @Override
            public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
                return (ex, method, params) -> {
                    String msg = String.format("方法[%s],参数[%s],发送异常了,异常详细信息:", method, Arrays.asList(params));
                    System.out.println(msg);
                    ex.printStackTrace();
                };
            }
        };
    }

输出

方法[public void com.example.lurenjia.spring.c37.service.LogService.mockNoReturnException()],参数[[]],发送异常了,异常详细信息:
java.lang.IllegalArgumentException: 无返回值的异常!
	at com.example.lurenjia.spring.c37.service.LogService.mockNoReturnException(LogService.java:25)
	at com.example.lurenjia.spring.c37.service.LogService$$FastClassBySpringCGLIB$$e4f93305.invoke(<generated>)
	at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
	at org.springframework.aop.interceptor.AsyncExecutionInterceptor.lambda$invoke$0(AsyncExecutionInterceptor.java:115)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

原理:AbstractAsyncConfiguration这个类有一个bean

	@Autowired(required = false)
	void setConfigurers(Collection<AsyncConfigurer> configurers) {
		if (CollectionUtils.isEmpty(configurers)) {
			return;
		}
		if (configurers.size() > 1) {
			throw new IllegalStateException("Only one AsyncConfigurer may exist");
		}
		AsyncConfigurer configurer = configurers.iterator().next();
		this.executor = configurer::getAsyncExecutor;
		this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler;
	}

线程池隔离

通过 @Async(“executor”) 来指定线程池 bean的名称。

@Service
public class InsulateService {

    @Async("executor")
    public void a() {
        System.out.println(Thread.currentThread().getName());
    }

    @Async("executor2")
    public void b() {
        System.out.println(Thread.currentThread().getName());
    }

    @Async("executor3")
    public void c() {
        System.out.println(Thread.currentThread().getName());
    }

}
学新通
@Configuration
public class TaskExecutorConfig {

    @Bean
    public ThreadPoolTaskExecutor executor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(100);
        executor.setThreadNamePrefix("executor-thread-");
        return executor;
    }

    @Bean("executor2")
    public ThreadPoolTaskExecutor executor2() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(100);
        executor.setThreadNamePrefix("executor2-thread-");
        return executor;
    }

    @Bean("executor3")
    public ThreadPoolTaskExecutor executor3() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(100);
        executor.setThreadNamePrefix("executor3-thread-");
        return executor;
    }


}
学新通
@ComponentScan(basePackageClasses = {TaskExecutorConfig.class, com.example.lurenjia.spring.c37.d4.Client.class, LogService.class})
@EnableAsync
public class Client {
    public static void main(String[] args) {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
        context.register(Client.class);
        context.refresh();
        InsulateService service = context.getBean(InsulateService.class);
        service.a();
        service.b();
        service.c();
    }
}
executor2-thread-1
executor3-thread-1
executor-thread-1

原理

内部使用aop实现的,@EnableAsync会引入一个bean后置处理器:AsyncAnnotationBeanPostProcessor,将其注册到spring容器,这个bean后置处理器在所有bean创建过程中,判断bean的类上是否有@Async注解或者类中是否有@Async标注的方法,如果有,会通过aop给这个bean生成代理对象,会在代理对象中添加一个切面:org.springframework.scheduling.annotation.AsyncAnnotationAdvisor,这个切面中会引入一个拦截器:AnnotationAsyncExecutionInterceptor,方法异步调用的关键代码就是在这个拦截器的invoke方法中实现的,可以去看一下。

AsyncAnnotationAdvisor

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgafbbc
系列文章
更多 icon
同类精品
更多 icon
继续加载