Spring Event 事件解耦
在业务开发过程中,业务逻辑可能非常复杂,核心业务 + 多个子业务,比如,下单之后,发送通知、监控埋点、记录日志……,如果都放在一起,代码会很臃肿。而且有两个问题,一个是业务耦合,一个是串行耗时。
还有一些业务场景不需要在一次请求中同步完成,比如邮件发送、短信发送等。对于这样的场景可以使用 MQ ,使用 MQ会增加系统设计的复杂性,还要考虑消息丢失、消息重复等问题。所以,一般在开发的时候,都会把这些操作抽象成观察者模式,也就是发布/订阅模式,下面给大家介绍一下 Spring Event。
# 简介
Spring Event(Application Event)其实就是一个观察者模式。观察者模式,含有主题(针对该主题的事件),发布者(发布主题或事件),订阅者(监听主题的人)。有三个部分组成,事件(ApplicationEvent)、监听器(ApplicationListener)和事件发布操作。
在一个完整的事件体系中存在以下的角色:
事件:描述发生了什么事情、比如说请求处理完成、Spring 容器刷新完毕 事件源:事件的产生者、任何一个事件都必须有一个事件源。比如请求处理完成的事件源就是 DispatcherServlet 、Spring 容器刷新完毕的事件源就是 ApplicationContext 事件广播器:事件和事件监听器的桥梁、负责把事件通知给事件监听器 事件监听器:监听事件的发生、可以在监听器中做一些处理
# 事件实现方式
实现Spring事件机制主要有4个类:
ApplicationEvent:事件,每个实现类表示一类事件,可携带数据。 ApplicationListener:事件监听器,用于接收事件处理时间。 ApplicationEventMulticaster:事件管理者,用于事件监听器的注册和事件的广播。 ApplicationEventPublisher:事件发布者,委托ApplicationEventMulticaster完成事件发布。
# 事件
public abstract class ApplicationEvent extends EventObject {
/** use serialVersionUID from Spring 1.2 for interoperability */
private static final long serialVersionUID = 7099057708183571937L;
/** System time when the event happened */
private final long timestamp;
/**
* Create a new ApplicationEvent.
* @param source the object on which the event initially occurred (never {@code null})
*/
public ApplicationEvent(Object source) {
super(source);
this.timestamp = System.currentTimeMillis();
}
/**
* Return the system time in milliseconds when the event happened.
*/
public final long getTimestamp() {
return this.timestamp;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
ApplicationEvent:应用事件携带一个 Objecgt 对象,可以被发布,source表示事件源。
可以继承ApplicationEvent自定义事件。
public class TestEvent extends ApplicationEvent {
private String message;
public TestEvent(String message) {
super(message);
this.message = message;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 事件监听器
事件监听器,有两种实现方式,一种是实现 ApplicationListener 接口,另一种是使用@EventListener 注解。
ApplicationListener:事件监听器,职责为处理事件广播器发布的事件。
@FunctionalInterface
public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {
/**
* Handle an application event.
* @param event the event to respond to
*/
void onApplicationEvent(E event);
}
2
3
4
5
6
7
8
9
10
11
ApplicationListener 有两个实现接口:SmartApplicationListener和GenericApplicationListener
例如:
@Slf4j
@Service
public class OrderLogListener implements ApplicationListener<OrderEvent> {
@Override
public void onApplicationEvent(PlaceOrderEvent event) {
log.info("[afterPlaceOrder] log.");
}
}
2
3
4
5
6
7
8
9
@Log4j2
@Component
public class AEventListener implements ApplicationListener<TestEvent> {
@Async
@EventListener
public void listener(TestEvent event) throws InterruptedException {
log.info("监听到数据:{}", event.getMessage());
}
}
2
3
4
5
6
7
8
9
10
11
12
# 事件广播器
事件广播器,将EventPubsher(事件发布者)发布的event 广播给事件EventListener(事件监听器)。
Spring提供了默认的实现SimpleApplicationEventMulticaster,如果用户没有配置自定义事件广播器, 则会默认使用SimpleApplicationEventMulticaster作为事件广播器。在容器刷新的过程中会实例化、初始化事件广播器。
ApplicationContext 本来就实现了ApplicationEventPublisher接口,因此应用上下文本来就是一个事件发布者,在AbstractApplicationContext中实现了事件发布的业务。
- 注入ApplicationContext 发布事件
@Autowired
private ApplicationContext applicationContext;
applicationContext.publishEvent();
2
3
4
5
- 注入ApplicationEventPublisher发布事件
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
applicationEventPublisher.publishEvent(myEvent)
2
3
4
- 实现ApplicationEventPublisherAware 发布事件
@Service
public class PublishEvent implements ApplicationEventPublisherAware {
public static ApplicationEventPublisher eventPublisher = null;
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
eventPublisher.publishEvent("123");
}
}
2
3
4
5
6
7
8
9
10
# Spring Event 同步模式
事件监听器默认是同步阻塞的,
# 定义事件
public class TestEvent extends ApplicationEvent {
private String message;
public TestEvent(String message) {
super(message);
this.message = message;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 监听器
@Component
@Slf4j
public class ListenerService {
@EventListener
public void listener(TestEvent event) throws InterruptedException {
Thread.sleep(5000);
log.info("监听到数据:{}", event.getMessage());
}
}
2
3
4
5
6
7
8
9
10
11
# 发布事件
@Service
public class PublishService {
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
public void publish(String message) {
applicationEventPublisher.publishEvent(new TestEvent(message));
}
}
2
3
4
5
6
7
8
9
10
11
测试:
@Autowired
PublishService publishService;
@RequestMapping("publishMsg")
public void publishMsg() {
for (int i = 0; i < 5; i++) {
publishService.publish("消息" + (i + 1));
}
}
2
3
4
5
6
7
8
9
输出结果:
2022-12-07 15:45:12.825 INFO 16304 --- [nio-8080-exec-1] com.test.service.ListenerService : 监听到数据:消息1 2022-12-07 15:45:17.838 INFO 16304 --- [nio-8080-exec-1] com.test.service.ListenerService : 监听到数据:消息2 2022-12-07 15:45:22.842 INFO 16304 --- [nio-8080-exec-1] com.test.service.ListenerService : 监听到数据:消息3 2022-12-07 15:45:27.857 INFO 16304 --- [nio-8080-exec-1] com.test.service.ListenerService : 监听到数据:消息4 2022-12-07 15:45:32.870 INFO 16304 --- [nio-8080-exec-1] com.test.service.ListenerService : 监听到数据:消息5
从结果可以看出,只有处理完一个事件后才会处理下一个事件,这就是同步模式
# Spring Event 异步模式
将上面的例子进行改造。
启动类增加 @EnableAsync 注解
@EnableAsync
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class,args);
}
}
2
3
4
5
6
7
8
9
Listener 类需要开启异步的方法增加 @Async 注解:
@EventListener
@Async
public void listener(TestEvent event) throws InterruptedException {
Thread.sleep(5000);
log.info("监听到数据:{}", event.getMessage());
}
2
3
4
5
6
输出结果:
2022-12-07 15:50:00.986 INFO 3964 --- [cTaskExecutor-5] com.test.service.ListenerService : 监听到数据:消息5 2022-12-07 15:50:00.986 INFO 3964 --- [cTaskExecutor-4] com.test.service.ListenerService : 监听到数据:消息4 2022-12-07 15:50:00.986 INFO 3964 --- [cTaskExecutor-1] com.test.service.ListenerService : 监听到数据:消息1 2022-12-07 15:50:00.986 INFO 3964 --- [cTaskExecutor-2] com.test.service.ListenerService : 监听到数据:消息2 2022-12-07 15:50:00.986 INFO 3964 --- [cTaskExecutor-3] com.test.service.ListenerService : 监听到数据:消息3
可以看到,事件监听器不会阻塞,多个事件可以同时进行。
# 自定义线程池
@Async默认线程池为 SimpleAsyncTaskExecutor,不是真的线程池,这个类不重用线程,默认每次调用都会创建一个新的线程。
自定义线程池实现方式:
- 实现接口 AsyncConfigurer
- 继承 AsyncConfigurerSupport
- 配置由自定义的 TaskExecutor 替代内置的任务执行器
配置线程池:
@Configuration
public class TaskPoolConfig {
@Bean(name = "asyncExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(200);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("asyncExecutor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
使用线程池:
@EventListener
@Async("asyncExecutor")
public void listener(TestEvent event) throws InterruptedException {
Thread.sleep(5000);
log.info("监听到数据:{}", event.getMessage());
}
2
3
4
5
6
# 总结
使用 Spring Event 实现发布/订阅模式,可以对一些业务进行解耦。