Springboot 事件驱动(发布/订阅模式)


Spring的事件监听(也叫事件驱动)是观察者模式的一种实现,只要是观察者模式,就含有主题(针对该主题的事件),发布者(发布主题或事件),订阅者(监听主题的人)。有三个部分组成,事件(ApplicationEvent)、监听器(ApplicationListener)和事件发布操作。

作用:

使用事件机制我们可以将相互耦合的代码解耦,从而方便功能拓展和调整。

1. 观察者模式简介

当对象间存在一对多关系时,则使用观察者模式(Observer Pattern)。

比如,当一个对象被修改时,则会自动通知它的依赖对象。观察者模式属于行为型模式。

img

2. 观察者模式角色

  • Subject:抽象主题(抽象被观察者),抽象主题角色把所有观察者对象保存在一个集合里,每个主题都可以有任意数量的观察者,抽象主题提供一个接口,可以增加和删除观察者对象。
  • ConcreteSubject:具体主题(具体被观察者),该角色将有关状态存入具体观察者对象,在具体主题的内部状态发生改变时,给所有注册过的观察者发送通知。
  • Observer:抽象观察者,是观察者者的抽象类,它定义了一个更新接口,使得在得到主题更改通知时更新自己。
  • ConcrereObserver:具体观察者,实现抽象观察者定义的更新接口,以便在得到主题更改通知时更新自身的状态。

3. 事件机制实现方式

实现Spring事件机制主要有4个类:

  1. ApplicationEvent:事件,每个实现类表示一类事件,可携带数据。
  2. ApplicationListener:事件监听器,用于接收事件处理时间。
  3. ApplicationEventMulticaster:事件管理者,用于事件监听器的注册和事件的广播。
  4. ApplicationEventPublisher:事件发布者,委托ApplicationEventMulticaster完成事件发布。
3.1 ApplicationEvent

ApplicationEvent:应用事件,职责为定义业务

Spring 提供了一个继承于java.util.EventObject 类的ApplicationEvent的的抽象类,并提供了应用上线文事件的抽象实现ApplicationContextEvent 下面的容器关闭、刷新、启动、停止等容器事件 以及RequestHandledEvent(http 请求处理完成事件),可自定义
事件(只需要实现ApplicationEvent 抽象类定义有参构造函数即可,source表示事件源( 我用于事件的回调)

  1. public class TestEvent extends ApplicationEvent {
  2. private String message;
  3. // constructor getter setter
  4. }
3.2 ApplicationListener

ApplicationListener:事件监听器,职责为处理事件广播器发布的事件。

Spring提供了继承于java.util.EventListener接口的应用监听器接口, ApplicationListener,此接口源码:

  1. @FunctionalInterface
  2. public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {
  3. void onApplicationEvent(E event);
  4. }

并提供了两个实现:SmartApplicationListener和GenericApplicationListener接口

事件机制监听的方式有两种:

  1. 实现ApplicationListener接口
  2. EventListener注解形式

代码实例:

  1. // 接口方式一
  2. @Component
  3. public class AEventListener implements ApplicationListener<TestEvent> {
  4. @Override
  5. public void onApplicationEvent(TestEvent event) {
  6. //逻辑处理
  7. }
  8. }
  9. // 注解方式二,推荐
  10. @Component
  11. public class AEventListener {
  12. @EventListener
  13. public void listener(TestEvent event) {
  14. //逻辑处理
  15. }
  16. }
3.3 ApplicationEventMulticaster

事件广播器,职责为将EventPubsher(事件发布者)发布的event 广播给事件EventListener(事件监听器)。

Spring提供了默认的实现SimpleApplicationEventMulticaster,如果用户没有配置自定义事件广播器,
则会默认使用SimpleApplicationEventMulticaster作为事件广播器。在容器刷新的过程中会实例化、初始化事件广播器。

3.4 ApplicationEventPublisher

事件发布者,职责为发布事件。
spring的ApplicationContext 本来就实现了ApplicationEventPublisher接口,因此应用上下文本来就是
一个事件发布者,在AbstractApplicationContext中实现了事件发布的业务。

如何发布事件呢

  1. // 1. 直接注入ApplicationContext:
  2. @Component
  3. public class MyEventDoHandle {
  4. @Autowired
  5. private ApplicationContext applicationContext;
  6. public void doSomething(TestEvent obj){
  7. applicationContext.publishEvent(obj);
  8. }
  9. }
  10. // 2. 直接注入ApplicationEventPublisher: 推荐
  11. @Component
  12. public class MyEventDoHandle {
  13. @Autowired
  14. private ApplicationEventPublisher applicationEventPublisher;
  15. public void doSomething(TestEvent obj){
  16. applicationEventPublisher.publishEvent(obj);
  17. }
  18. }

4. 代码实例

4.1 创建事件
  1. public class TestEvent extends ApplicationEvent {
  2. private String message;
  3. public TestEvent(String message) {
  4. // 这里需要回调,就把super传个service实例
  5. super(message);
  6. this.message = message;
  7. }
  8. public String getMessage() {
  9. return message;
  10. }
  11. public void setMessage(String message) {
  12. this.message = message;
  13. }
  14. }

如定义一个事件,对一个字符型数据处理。

4.2 创建发布者
  1. @Service
  2. public class PublishEvent {
  3. @Autowired
  4. private ApplicationEventPublisher applicationEventPublisher;
  5. public void publish(String message) {
  6. applicationEventPublisher.publishEvent(new TestEvent(message));
  7. }
  8. }
4.3 创建监听者
  1. @Slf4j
  2. @Component
  3. public class AEventListener {
  4. @EventListener
  5. public void listener(TestEvent event) {
  6. log.info("监听到数据1:{}", event.getMessage());
  7. }
  8. }

调用接口

  1. @RequestMapping("pub")
  2. public void pub() {
  3. for (int i = 0; i < 5; i++) {
  4. publishEvent.publish("你若为我繁华,你好呀:" + (i + 1));
  5. }
  6. }

事件监听器默认是同步阻塞的.

4.4 开启异步:

启动类增加此注解,开启异步支持。

  1. @EnableAsync
  2. public class Application{
  3. public static void main(String[] args) }{
  4. SpringApplocation.run(Application.class, args);
  5. }
  6. }

监听方法增加@Async注解,表示该方法为异步方法。高并发时建议指定线程池

  1. @Slf4j
  2. @Component
  3. public class AEventListener {
  4. @Async
  5. @EventListener
  6. public void listener(TestEvent event) {
  7. log.info("监听到数据1:{}", event.getMessage());
  8. }
  9. }
4.5 开启事务
  1. // 发布事件的地方加上事务注解
  2. @Service
  3. public class PublishEvent {
  4. @Autowired
  5. private ApplicationEventPublisher applicationEventPublisher;
  6. @Transactional
  7. public void publish(String message) {
  8. applicationEventPublisher.publishEvent(new TestEvent(message));
  9. }
  10. }
  1. // 将监听处理加入事务
  2. @Log4j2
  3. @Component
  4. public class AEventListener {
  5. @TransactionalEventListener
  6. public void listener(TestEvent event) throws InterruptedException {
  7. Thread.sleep(2000);
  8. log.info("监听到数据:{}", event.getMessage());
  9. }
  10. }

@EventListener注解原理

4.6 开启泛型
  1. // 事件实现 ResolvableTypeProvider
  2. public class BaseEvent<T> extends ApplicationEvent implements ResolvableTypeProvider {
  3. private T data;
  4. private OperationEnum operation;
  5. @Override
  6. public ResolvableType getResolvableType() {
  7. // 提供运行时返回真正的实例
  8. return ResolvableType.forClassWithGenerics(getClass(), ResolvableType.forClass(getData().getClass()));
  9. }
  10. }
  11. //发送时使用泛型
  12. publisher.publishEvent(new BaseEvent<>(region, INSERT));
  13. //事件监听器,按泛型分开
  14. @EventListener
  15. public void onRegionEvent(BaseEvent<Region> event) {
  16. log.info("receive event: {}", event);
  17. }
  18. //用户事件监听器
  19. @EventListener
  20. public void onUserEvent(BaseEvent<User> event) {
  21. log.info("receive event: {}", event);
  22. }
4.7 条件监听
  1. @Log4j2
  2. @Component
  3. public class MyEventListener {
  4. @Async
  5. @EventListener(condition="#event.message=='xxx'")
  6. public void listener(TestEvent event) throws InterruptedException {
  7. Thread.sleep(2000);
  8. log.info("监听到数据:{}", event.getMessage());
  9. }
  10. }

————————————————

原文链接:https://blog.csdn.net/qq_37758497/article/details/118863308
泛型事件:https://www.jianshu.com/p/fd0c358176b9


寒烟濡雨 2024年1月19日 15:39 收藏文档