code学习

SpringBoot:Event实现发布/订阅模式

作者:拔土豆的程序员
SpringBoot:Event实现发布/订阅模式

如图所示支付业务中,用户支付成功之后,后续还有很多的业务流程,但是对于用户来讲是透明的,所以为了提高接口的响应速率,提高用户体验,后续操作都会选择异步执行。

异步执行方式

SpringBoot:Event实现发布/订阅模式

异步执行主体

@Service
public class OrderService {
   public void orderSuccess(){

       // 订单完成异步任务开启 可以再统一封装
       Order order = new Order();
       order.setOrderNo(String.valueOf(System.currentTimeMillis()));
       Map<String, OrderSuccessService> orderSuccessServiceMap = SpringContextUtil.getBeansOfType(OrderSuccessService.class);
       orderSuccessServiceMap.values().forEach(service -> {
           service.orderSuccess(order);
      });
  }
}           

异步执行接口

public interface OrderSuccessService {
   /**
    * 订单支付成功
    * @param order
    */
   public CompletableFuture<Boolean> orderSuccess(Order order);
}           
@Slf4j
@Service
public class MerchantNoticeServiceImpl implements OrderSuccessService {
   @Override
   @Async("taskExecutor")
   public CompletableFuture<Boolean> orderSuccess(Order order) {
       log.info("{}商户通知:{}",Thread.currentThread(),order);
       // 返回异步调用的结果
       return CompletableFuture.completedFuture(true);
  }
}           
@Slf4j
@Service
public class MerchantNoticeServiceImpl implements OrderSuccessService {
   @Override
   @Async("taskExecutor")
   public CompletableFuture<Boolean> orderSuccess(Order order) {
       log.info("{}商户通知:{}",Thread.currentThread(),order);
       // 返回异步调用的结果
       return CompletableFuture.completedFuture(true);
  }
}           
@Slf4j
@Service
public class MerchantNoticeServiceImpl implements OrderSuccessService {
   @Override
   @Async("taskExecutor")
   public CompletableFuture<Boolean> orderSuccess(Order order) {
       log.info("{}商户通知:{}",Thread.currentThread(),order);
       // 返回异步调用的结果
       return CompletableFuture.completedFuture(true);
  }
}           

自定义线程池,线程池隔离,开启异步任务执行

@Configuration // 配置类
@EnableAsync // @Async注解能够生效
public class TaskConfiguration {
   @Bean("taskExecutor")
   public Executor taskExecutor() {
       ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
       // 线程池创建时候初始化的线程数
       executor.setCorePoolSize(5);
       // 线程池最大的线程数,只有在缓冲队列满了之后,才会申请超过核心线程数的线程
       executor.setMaxPoolSize(10);
       // 用来缓冲执行任务的队列
       executor.setQueueCapacity(200);
       // 当超过了核心线程之外的线程,在空闲时间到达之后会被销毁
       executor.setKeepAliveSeconds(60);
       // 可以用于定位处理任务所在的线程池
       executor.setThreadNamePrefix("taskExecutor-orderSuccess-");
       // 这里采用CallerRunsPolicy策略,当线程池没有处理能力的时候,该策略会直接在execute方法的调用线程中运行被拒绝的任务;
       // 如果执行程序已关闭,则会丢弃该任务
       executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
       // 设置 线程池关闭 的时候 等待 所有任务都完成后,再继续 销毁 其他的 Bean,
       // 这样这些 异步任务 的 销毁 就会先于 数据库连接池对象 的销毁。
       executor.setWaitForTasksToCompleteOnShutdown(true);
       // 该方法用来设置线程池中 任务的等待时间,如果超过这个时间还没有销毁就 强制销毁,以确保应用最后能够被关闭,而不是阻塞住。
       executor.setAwaitTerminationSeconds(60);
       return executor;
  }
}           

Spring Event实现发布/订阅模式

SpringBoot:Event实现发布/订阅模式

自定义事件:通过继承ApplicationEvent,并重写构造函数,实现事件扩展。

public class OrderApplicationEvent extends ApplicationEvent {
   public OrderApplicationEvent(OrderData orderData) {
       super(orderData);
  }
}           

定义事件的消息体

@Data
public class OrderData {
   /**
    * 订单号
    */
   private String orderNo;
}           

事件监听

@Slf4j
@Service
public class MerchantNoticeListener {
   @Async("asyncEventTaskExecutor")
   @EventListener
   public CompletableFuture<Boolean> orderSuccess(OrderApplicationEvent event) {
       log.info("{}商户通知:{}",Thread.currentThread(),event);
       // 返回异步调用的结果
       return CompletableFuture.completedFuture(true);
  }
}           
@Slf4j
@Service
public class UserNoticeListener implements ApplicationListener<OrderApplicationEvent> {
   @Override
   @Async("asyncEventTaskExecutor")
   public void onApplicationEvent(OrderApplicationEvent event) {
       log.info("{}用户通知:{}",Thread.currentThread(),event);
  }
}           
@Slf4j
@Service
public class UserNoticeListener implements ApplicationListener<OrderApplicationEvent> {
   @Override
   @Async("asyncEventTaskExecutor")
   public void onApplicationEvent(OrderApplicationEvent event) {
       log.info("{}用户通知:{}",Thread.currentThread(),event);
  }
}           
@Slf4j
@Service
public class UserNoticeListener implements ApplicationListener<OrderApplicationEvent> {
   @Override
   @Async("asyncEventTaskExecutor")
   public void onApplicationEvent(OrderApplicationEvent event) {
       log.info("{}用户通知:{}",Thread.currentThread(),event);
  }
}           

自定义线程池

@Configuration
@Slf4j
@EnableAsync // @Async注解能够生效
public class AsyncConfiguration implements AsyncConfigurer {
   @Bean("asyncEventTaskExecutor")
   public ThreadPoolTaskExecutor executor() {
       //Spring封装的一个线程池
       ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
       executor.setCorePoolSize(5);
       executor.setMaxPoolSize(50);
       executor.setQueueCapacity(30);
       executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
       executor.setThreadNamePrefix("asyncEventTaskExecutor--orderSuccess-");
       executor.initialize();
       return executor;
  }

   @Override
   public Executor getAsyncExecutor() {
       return executor();
  }

   /**
    * 异常处理
    * @return
    */
   @Override
   public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
       return (ex, method, params) -> log.error(String.format("[async] task{} error:", method), ex);
  }
}           

事件发布

@Service
@Slf4j
public class OrderEventService {
   private final ApplicationEventPublisher applicationEventPublisher;

   public OrderEventService(ApplicationEventPublisher applicationEventPublisher) {
       this.applicationEventPublisher = applicationEventPublisher;
  }
   public void success(){
       OrderData orderData = new OrderData();
       orderData.setOrderNo(String.valueOf(System.currentTimeMillis()));
       // 消息
       OrderApplicationEvent orderApplicationEvent = new OrderApplicationEvent(orderData);
       // 发布事件
       applicationEventPublisher.publishEvent(orderApplicationEvent);
  }
}           

写在最后:不管是否基于spring boot 的发布订阅模型,最终都是开启了线程执行任务,和使用第三方的MQ消息组件,问题在于重启服务器或者未知原因崩溃的时候,消息的恢复机制要自行处理。

建议使用在一些边缘业务,比如记录日志,这些要求没有那么高的业务。

继续阅读