 Spring Event 事件解耦
Spring Event 事件解耦
  在业务开发过程中,业务逻辑可能非常复杂,核心业务 + 多个子业务,比如,下单之后,发送通知、监控埋点、记录日志……,如果都放在一起,代码会很臃肿。而且有两个问题,一个是业务耦合,一个是串行耗时。

还有一些业务场景不需要在一次请求中同步完成,比如邮件发送、短信发送等。对于这样的场景可以使用 MQ ,使用 MQ会增加系统设计的复杂性,还要考虑消息丢失、消息重复等问题。所以,一般在开发的时候,都会把这些操作抽象成观察者模式,也就是发布/订阅模式,下面给大家介绍一下 Spring Event。
# 简介
Spring Event(Application Event)其实就是一个观察者模式。观察者模式,含有主题(针对该主题的事件),发布者(发布主题或事件),订阅者(监听主题的人)。有三个部分组成,事件(ApplicationEvent)、监听器(ApplicationListener)和事件发布操作。
在一个完整的事件体系中存在以下的角色:
事件:描述发生了什么事情、比如说请求处理完成、Spring 容器刷新完毕 事件源:事件的产生者、任何一个事件都必须有一个事件源。比如请求处理完成的事件源就是 DispatcherServlet 、Spring 容器刷新完毕的事件源就是 ApplicationContext 事件广播器:事件和事件监听器的桥梁、负责把事件通知给事件监听器 事件监听器:监听事件的发生、可以在监听器中做一些处理
# 事件实现方式
实现Spring事件机制主要有4个类:
ApplicationEvent:事件,每个实现类表示一类事件,可携带数据。 ApplicationListener:事件监听器,用于接收事件处理时间。 ApplicationEventMulticaster:事件管理者,用于事件监听器的注册和事件的广播。 ApplicationEventPublisher:事件发布者,委托ApplicationEventMulticaster完成事件发布。
# 事件
public abstract class ApplicationEvent extends EventObject {
   /** use serialVersionUID from Spring 1.2 for interoperability */
   private static final long serialVersionUID = 7099057708183571937L;
   /** System time when the event happened */
   private final long timestamp;
   /**
    * Create a new ApplicationEvent.
    * @param source the object on which the event initially occurred (never {@code null})
    */
   public ApplicationEvent(Object source) {
      super(source);
      this.timestamp = System.currentTimeMillis();
   }
   /**
    * Return the system time in milliseconds when the event happened.
    */
   public final long getTimestamp() {
      return this.timestamp;
   }
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
ApplicationEvent:应用事件携带一个 Objecgt 对象,可以被发布,source表示事件源。
可以继承ApplicationEvent自定义事件。
public class TestEvent extends ApplicationEvent {
    private String message;
    public TestEvent(String message) {
        super(message);
        this.message = message;
    }
    public String getMessage() {
        return message;
    }
    public void setMessage(String message) {
        this.message = message;
    }
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 事件监听器
事件监听器,有两种实现方式,一种是实现 ApplicationListener 接口,另一种是使用@EventListener 注解。
ApplicationListener:事件监听器,职责为处理事件广播器发布的事件。
@FunctionalInterface
public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {
	/**
	 * Handle an application event.
	 * @param event the event to respond to
	 */
	void onApplicationEvent(E event);
}
2
3
4
5
6
7
8
9
10
11
ApplicationListener 有两个实现接口:SmartApplicationListener和GenericApplicationListener
例如:
@Slf4j
@Service
public class OrderLogListener implements ApplicationListener<OrderEvent> {
    @Override
    public void onApplicationEvent(PlaceOrderEvent event) {
        log.info("[afterPlaceOrder] log.");
    }
}
2
3
4
5
6
7
8
9
@Log4j2
@Component
public class AEventListener implements ApplicationListener<TestEvent> {
    @Async
    @EventListener
    public void listener(TestEvent event) throws InterruptedException {
        log.info("监听到数据:{}", event.getMessage());
    }
}
2
3
4
5
6
7
8
9
10
11
12
# 事件广播器
事件广播器,将EventPubsher(事件发布者)发布的event 广播给事件EventListener(事件监听器)。
Spring提供了默认的实现SimpleApplicationEventMulticaster,如果用户没有配置自定义事件广播器, 则会默认使用SimpleApplicationEventMulticaster作为事件广播器。在容器刷新的过程中会实例化、初始化事件广播器。
ApplicationContext 本来就实现了ApplicationEventPublisher接口,因此应用上下文本来就是一个事件发布者,在AbstractApplicationContext中实现了事件发布的业务。
- 注入ApplicationContext 发布事件
@Autowired
private ApplicationContext applicationContext;
applicationContext.publishEvent();
2
3
4
5
- 注入ApplicationEventPublisher发布事件
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
applicationEventPublisher.publishEvent(myEvent)
2
3
4
- 实现ApplicationEventPublisherAware 发布事件
@Service
public class PublishEvent implements ApplicationEventPublisherAware {
    public static ApplicationEventPublisher eventPublisher = null;
    
    @Override
    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        eventPublisher.publishEvent("123");
    }
}
2
3
4
5
6
7
8
9
10
# Spring Event 同步模式
事件监听器默认是同步阻塞的,
# 定义事件
public class TestEvent extends ApplicationEvent {
    private String message;
    public TestEvent(String message) {
        super(message);
        this.message = message;
    }
    public String getMessage() {
        return message;
    }
    public void setMessage(String message) {
        this.message = message;
    }
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 监听器
@Component
@Slf4j
public class ListenerService {
    @EventListener
    public void listener(TestEvent event)  throws InterruptedException {
         Thread.sleep(5000);
        log.info("监听到数据:{}", event.getMessage());
    }
}
2
3
4
5
6
7
8
9
10
11
# 发布事件
@Service
public class PublishService {
    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;
    public void publish(String message) {
        applicationEventPublisher.publishEvent(new TestEvent(message));
    }
}
2
3
4
5
6
7
8
9
10
11
测试:
@Autowired
PublishService publishService;
@RequestMapping("publishMsg")
public void publishMsg() {
    for (int i = 0; i < 5; i++) {
        publishService.publish("消息" + (i + 1));
    }
}
2
3
4
5
6
7
8
9
输出结果:
2022-12-07 15:45:12.825 INFO 16304 --- [nio-8080-exec-1] com.test.service.ListenerService : 监听到数据:消息1 2022-12-07 15:45:17.838 INFO 16304 --- [nio-8080-exec-1] com.test.service.ListenerService : 监听到数据:消息2 2022-12-07 15:45:22.842 INFO 16304 --- [nio-8080-exec-1] com.test.service.ListenerService : 监听到数据:消息3 2022-12-07 15:45:27.857 INFO 16304 --- [nio-8080-exec-1] com.test.service.ListenerService : 监听到数据:消息4 2022-12-07 15:45:32.870 INFO 16304 --- [nio-8080-exec-1] com.test.service.ListenerService : 监听到数据:消息5
从结果可以看出,只有处理完一个事件后才会处理下一个事件,这就是同步模式
# Spring Event 异步模式
将上面的例子进行改造。
启动类增加 @EnableAsync 注解
@EnableAsync
@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class,args);
    }
}
2
3
4
5
6
7
8
9
Listener 类需要开启异步的方法增加 @Async 注解:
@EventListener
@Async
public void listener(TestEvent event) throws InterruptedException {
    Thread.sleep(5000);
    log.info("监听到数据:{}", event.getMessage());
}
2
3
4
5
6
输出结果:
2022-12-07 15:50:00.986 INFO 3964 --- [cTaskExecutor-5] com.test.service.ListenerService : 监听到数据:消息5 2022-12-07 15:50:00.986 INFO 3964 --- [cTaskExecutor-4] com.test.service.ListenerService : 监听到数据:消息4 2022-12-07 15:50:00.986 INFO 3964 --- [cTaskExecutor-1] com.test.service.ListenerService : 监听到数据:消息1 2022-12-07 15:50:00.986 INFO 3964 --- [cTaskExecutor-2] com.test.service.ListenerService : 监听到数据:消息2 2022-12-07 15:50:00.986 INFO 3964 --- [cTaskExecutor-3] com.test.service.ListenerService : 监听到数据:消息3
可以看到,事件监听器不会阻塞,多个事件可以同时进行。
# 自定义线程池
@Async默认线程池为 SimpleAsyncTaskExecutor,不是真的线程池,这个类不重用线程,默认每次调用都会创建一个新的线程。
自定义线程池实现方式:
- 实现接口 AsyncConfigurer
- 继承 AsyncConfigurerSupport
- 配置由自定义的 TaskExecutor 替代内置的任务执行器
配置线程池:
@Configuration
public class TaskPoolConfig {
    @Bean(name = "asyncExecutor")
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(20);
        executor.setQueueCapacity(200);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("asyncExecutor-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return executor;
    }
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
使用线程池:
   @EventListener
    @Async("asyncExecutor")
    public void listener(TestEvent event) throws InterruptedException {
        Thread.sleep(5000);
        log.info("监听到数据:{}", event.getMessage());
    }
2
3
4
5
6
# 总结
使用 Spring Event 实现发布/订阅模式,可以对一些业务进行解耦。
- 01
- 保姆级教程 用DeepSeek+飞书,批量写文案、写文章,太高效了06-06
- 03
- 熬夜做PPT?AI一键生成高逼格幻灯片,效率提升10倍!06-06
