程序员子龙(Java面试 + Java学习) 程序员子龙(Java面试 + Java学习)
首页
学习指南
工具
开源项目
技术书籍

程序员子龙

Java 开发从业者
首页
学习指南
工具
开源项目
技术书籍
  • 基础

  • JVM

  • Spring

    • 动态代理-CGLIB
    • Hibernate Validator 参数校验优雅实战
    • Jackson序列化json时null转成0或空串
    • 别自己瞎写工具类了!SpringBoot中自带工具类,开发效率增加一倍
    • Spring @Autowired Map
    • SpringBoot 缓存之 @Cacheable 详细介绍与失效时间TTL
    • Spring Security 入门
    • Spring Security原理
    • Spring项目整合MybatisPlus出现org.mybatis.logging.LoggerFactory Not Found 异常
    • Spring在代码中获取bean
    • 别再乱写了,Controller 层代码这样写才足够规范!
    • 非静态变量给静态变量赋值
    • 过滤器与拦截器区别、使用场景
    • 接口重试机制 Spring-Retry
    • 利用cglib动态创建对象或在原对象新增属性
    • 聊聊spring事务失效的场景
    • Spring Event 事件解耦
      • 最全的Spring依赖注入方式
      • Spring初始化之ApplicationRunner、InitializingBean、@PostConstruct 使用详解
      • 为啥不建议用 BeanUtils.copyProperties 拷贝数据
    • 并发编程

    • Mybatis

    • 网络编程

    • 数据库

    • 缓存

    • 设计模式

    • 分布式

    • 高并发

    • SpringBoot

    • SpringCloudAlibaba

    • Nginx

    • 面试

    • 生产问题

    • 系统设计

    • 消息中间件

    • Java
    • Spring
    程序员子龙
    2024-01-29
    目录

    Spring Event 事件解耦

    在业务开发过程中,业务逻辑可能非常复杂,核心业务 + 多个子业务,比如,下单之后,发送通知、监控埋点、记录日志……,如果都放在一起,代码会很臃肿。而且有两个问题,一个是业务耦合,一个是串行耗时。

    还有一些业务场景不需要在一次请求中同步完成,比如邮件发送、短信发送等。对于这样的场景可以使用 MQ ,使用 MQ会增加系统设计的复杂性,还要考虑消息丢失、消息重复等问题。所以,一般在开发的时候,都会把这些操作抽象成观察者模式,也就是发布/订阅模式,下面给大家介绍一下 Spring Event。

    # 简介

    Spring Event(Application Event)其实就是一个观察者模式。观察者模式,含有主题(针对该主题的事件),发布者(发布主题或事件),订阅者(监听主题的人)。有三个部分组成,事件(ApplicationEvent)、监听器(ApplicationListener)和事件发布操作。

    在一个完整的事件体系中存在以下的角色:

    事件:描述发生了什么事情、比如说请求处理完成、Spring 容器刷新完毕 事件源:事件的产生者、任何一个事件都必须有一个事件源。比如请求处理完成的事件源就是 DispatcherServlet 、Spring 容器刷新完毕的事件源就是 ApplicationContext 事件广播器:事件和事件监听器的桥梁、负责把事件通知给事件监听器 事件监听器:监听事件的发生、可以在监听器中做一些处理

    # 事件实现方式

    实现Spring事件机制主要有4个类:

    ApplicationEvent:事件,每个实现类表示一类事件,可携带数据。 ApplicationListener:事件监听器,用于接收事件处理时间。 ApplicationEventMulticaster:事件管理者,用于事件监听器的注册和事件的广播。 ApplicationEventPublisher:事件发布者,委托ApplicationEventMulticaster完成事件发布。

    # 事件

    public abstract class ApplicationEvent extends EventObject {
    
       /** use serialVersionUID from Spring 1.2 for interoperability */
       private static final long serialVersionUID = 7099057708183571937L;
    
       /** System time when the event happened */
       private final long timestamp;
    
    
       /**
        * Create a new ApplicationEvent.
        * @param source the object on which the event initially occurred (never {@code null})
        */
       public ApplicationEvent(Object source) {
          super(source);
          this.timestamp = System.currentTimeMillis();
       }
    
       /**
        * Return the system time in milliseconds when the event happened.
        */
       public final long getTimestamp() {
          return this.timestamp;
       }
    
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26

    ApplicationEvent:应用事件携带一个 Objecgt 对象,可以被发布,source表示事件源。

    可以继承ApplicationEvent自定义事件。

    public class TestEvent extends ApplicationEvent {
        private String message;
    
        public TestEvent(String message) {
            super(message);
            this.message = message;
        }
    
        public String getMessage() {
            return message;
        }
    
        public void setMessage(String message) {
            this.message = message;
        }
    }
    
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17

    # 事件监听器

    事件监听器,有两种实现方式,一种是实现 ApplicationListener 接口,另一种是使用@EventListener 注解。

    ApplicationListener:事件监听器,职责为处理事件广播器发布的事件。

    @FunctionalInterface
    public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {
    
    	/**
    	 * Handle an application event.
    	 * @param event the event to respond to
    	 */
    	void onApplicationEvent(E event);
    
    }
    
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11

    ApplicationListener 有两个实现接口:SmartApplicationListener和GenericApplicationListener

    例如:

    @Slf4j
    @Service
    public class OrderLogListener implements ApplicationListener<OrderEvent> {
    
        @Override
        public void onApplicationEvent(PlaceOrderEvent event) {
            log.info("[afterPlaceOrder] log.");
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    @Log4j2
    @Component
    public class AEventListener implements ApplicationListener<TestEvent> {
    
        @Async
        @EventListener
        public void listener(TestEvent event) throws InterruptedException {
    
            log.info("监听到数据:{}", event.getMessage());
        }
    }
    
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12

    # 事件广播器

    事件广播器,将EventPubsher(事件发布者)发布的event 广播给事件EventListener(事件监听器)。

    Spring提供了默认的实现SimpleApplicationEventMulticaster,如果用户没有配置自定义事件广播器, 则会默认使用SimpleApplicationEventMulticaster作为事件广播器。在容器刷新的过程中会实例化、初始化事件广播器。

    ApplicationContext 本来就实现了ApplicationEventPublisher接口,因此应用上下文本来就是一个事件发布者,在AbstractApplicationContext中实现了事件发布的业务。

    • 注入ApplicationContext 发布事件
    @Autowired
    private ApplicationContext applicationContext;
    
    applicationContext.publishEvent();
    
    
    1
    2
    3
    4
    5
    • 注入ApplicationEventPublisher发布事件
    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;
    applicationEventPublisher.publishEvent(myEvent)
    
    
    1
    2
    3
    4
    • 实现ApplicationEventPublisherAware 发布事件
    @Service
    public class PublishEvent implements ApplicationEventPublisherAware {
        public static ApplicationEventPublisher eventPublisher = null;
        
        @Override
        public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
            eventPublisher.publishEvent("123");
        }
    }
    
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10

    # Spring Event 同步模式

    事件监听器默认是同步阻塞的,

    # 定义事件

    public class TestEvent extends ApplicationEvent {
        private String message;
    
        public TestEvent(String message) {
            super(message);
            this.message = message;
        }
    
        public String getMessage() {
            return message;
        }
    
        public void setMessage(String message) {
            this.message = message;
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16

    # 监听器

    @Component
    @Slf4j
    public class ListenerService {
    
        @EventListener
        public void listener(TestEvent event)  throws InterruptedException {
             Thread.sleep(5000);
    
            log.info("监听到数据:{}", event.getMessage());
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11

    # 发布事件

    @Service
    public class PublishService {
    
        @Autowired
        private ApplicationEventPublisher applicationEventPublisher;
    
        public void publish(String message) {
            applicationEventPublisher.publishEvent(new TestEvent(message));
        }
    }
    
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11

    测试:

    @Autowired
    PublishService publishService;
    
    @RequestMapping("publishMsg")
    public void publishMsg() {
        for (int i = 0; i < 5; i++) {
            publishService.publish("消息" + (i + 1));
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9

    输出结果:

    2022-12-07 15:45:12.825 INFO 16304 --- [nio-8080-exec-1] com.test.service.ListenerService : 监听到数据:消息1 2022-12-07 15:45:17.838 INFO 16304 --- [nio-8080-exec-1] com.test.service.ListenerService : 监听到数据:消息2 2022-12-07 15:45:22.842 INFO 16304 --- [nio-8080-exec-1] com.test.service.ListenerService : 监听到数据:消息3 2022-12-07 15:45:27.857 INFO 16304 --- [nio-8080-exec-1] com.test.service.ListenerService : 监听到数据:消息4 2022-12-07 15:45:32.870 INFO 16304 --- [nio-8080-exec-1] com.test.service.ListenerService : 监听到数据:消息5

    从结果可以看出,只有处理完一个事件后才会处理下一个事件,这就是同步模式

    # Spring Event 异步模式

    将上面的例子进行改造。

    启动类增加 @EnableAsync 注解

    @EnableAsync
    @SpringBootApplication
    public class Application {
    
        public static void main(String[] args) {
    
            SpringApplication.run(Application.class,args);
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9

    Listener 类需要开启异步的方法增加 @Async 注解:

    @EventListener
    @Async
    public void listener(TestEvent event) throws InterruptedException {
        Thread.sleep(5000);
        log.info("监听到数据:{}", event.getMessage());
    }
    
    1
    2
    3
    4
    5
    6

    输出结果:

    2022-12-07 15:50:00.986 INFO 3964 --- [cTaskExecutor-5] com.test.service.ListenerService : 监听到数据:消息5 2022-12-07 15:50:00.986 INFO 3964 --- [cTaskExecutor-4] com.test.service.ListenerService : 监听到数据:消息4 2022-12-07 15:50:00.986 INFO 3964 --- [cTaskExecutor-1] com.test.service.ListenerService : 监听到数据:消息1 2022-12-07 15:50:00.986 INFO 3964 --- [cTaskExecutor-2] com.test.service.ListenerService : 监听到数据:消息2 2022-12-07 15:50:00.986 INFO 3964 --- [cTaskExecutor-3] com.test.service.ListenerService : 监听到数据:消息3

    可以看到,事件监听器不会阻塞,多个事件可以同时进行。

    # 自定义线程池

    @Async默认线程池为 SimpleAsyncTaskExecutor,不是真的线程池,这个类不重用线程,默认每次调用都会创建一个新的线程。

    自定义线程池实现方式:

    • 实现接口 AsyncConfigurer
    • 继承 AsyncConfigurerSupport
    • 配置由自定义的 TaskExecutor 替代内置的任务执行器

    配置线程池:

    @Configuration
    public class TaskPoolConfig {
    
        @Bean(name = "asyncExecutor")
        public Executor taskExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(10);
            executor.setMaxPoolSize(20);
            executor.setQueueCapacity(200);
            executor.setKeepAliveSeconds(60);
            executor.setThreadNamePrefix("asyncExecutor-");
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            return executor;
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15

    使用线程池:

       @EventListener
        @Async("asyncExecutor")
        public void listener(TestEvent event) throws InterruptedException {
            Thread.sleep(5000);
            log.info("监听到数据:{}", event.getMessage());
        }
    
    1
    2
    3
    4
    5
    6

    # 总结

    使用 Spring Event 实现发布/订阅模式,可以对一些业务进行解耦。

    上次更新: 2024/01/30, 15:08:57
    聊聊spring事务失效的场景
    最全的Spring依赖注入方式

    ← 聊聊spring事务失效的场景 最全的Spring依赖注入方式→

    最近更新
    01
    一个注解,优雅的实现接口幂等性
    11-17
    02
    MySQL事务(超详细!!!)
    10-14
    03
    阿里二面:Kafka中如何保证消息的顺序性?这周被问到两次了
    10-09
    更多文章>
    Theme by Vdoing | Copyright © 2024-2024

        辽ICP备2023001503号-2

    • 跟随系统
    • 浅色模式
    • 深色模式
    • 阅读模式