程序员子龙(Java面试 + Java学习) 程序员子龙(Java面试 + Java学习)
首页
学习指南
工具
开源项目
技术书籍

程序员子龙

Java 开发从业者
首页
学习指南
工具
开源项目
技术书籍
  • 基础

  • JVM

  • Spring

  • 并发编程

  • Mybatis

  • 网络编程

  • 数据库

  • 缓存

  • 设计模式

  • 分布式

  • 高并发

  • SpringBoot

  • SpringCloudAlibaba

  • Nginx

  • 面试

  • 生产问题

  • 系统设计

  • 消息中间件

    • 基础

    • Kafka

    • RabbitMQ

      • RabbitMQ入门教程
      • Spring Boot + RabbitMQ实现订单超时自动取消功能
        • 前言
        • 实现方式
        • 死信队列
        • 代码实现
        • 总结
    • RocketMQ

  • Java
  • 消息中间件
  • RabbitMQ
xugaoyi
2024-01-29
目录

Spring Boot + RabbitMQ实现订单超时自动取消功能

# 前言

在电商项目中,我们经常会遇到这样的需求:客户下单成功后,在一定的时间内未能按时(具体时间由业务规则决定)支付,需要将订单自动取消,释放占用的商品库存。类似于这样的需求(延时任务),我们该怎样解决呢?

# 实现方式

本文将以rabbitmq死信队列实现。

使用定时任务的方式,是有点问题的,原本业务系统希望10分钟后,如果订单未支付,就马上取消订单,并释放商品库存。但是一旦数据量大的话,就会加长获取未支付订单数据的时间,部分订单就做不到10分钟后取消了,可能是15分钟,20分钟之类的。这样的话,库存就无法及时得到释放,也就会影响成单数。

下单投放消息到A交换机(过期时间30分钟),消息到aa队列(绑定死信交换机),不设置aa队列的消费者(故此消息一直未消费). 30分钟后,过期消息投递到死信交换机,死信队列,由死信消费者消费,判断订单id是否支付,执行业务逻辑,未支付->关闭订单,返还库存.

# 死信队列

DLX(dead-letter-exchange),死信队列也是一般的队列,当消息变成死信时,消息会投递到死信队列中,经过死信队列进行消费的一种形式,对应的交换机叫死信交换机DLX。

# 代码实现

声明订单和死信交换机、 订单和死信队列 、关键步骤订单队列绑定订单交换机和死信交换机 、死信队列绑定死信交换机

@Component
public class DeadLetterMQConfig {
    /**
     * 订单交换机
     */
    @Value("${mayikt.order.exchange}")
    private String orderExchange;
    /**
     * 订单队列
     */
    @Value("${mayikt.order.queue}")
    private String orderQueue;
    /**
     * 订单路由key
     */
    @Value("${mayikt.order.routingKey}")
    private String orderRoutingKey;
    /**
     * 死信交换机
     */
    @Value("${mayikt.dlx.exchange}")
    private String dlxExchange;
    /**
     * 死信队列
     */
    @Value("${mayikt.dlx.queue}")
    private String dlxQueue;
    /**
     * 死信路由
     */
    @Value("${mayikt.dlx.routingKey}")
    private String dlxRoutingKey;
    /**
     * 声明死信交换机
     */
    @Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange(dlxExchange);
    }
    /**
     * 声明死信队列
     */
    @Bean
    public Queue dlxQueue() {
        return new Queue(dlxQueue);
    }
    /**
     * 声明订单业务交换机
     */
    @Bean
    public DirectExchange orderExchange() {
        return new DirectExchange(orderExchange);
    }
    /**
     * 声明订单队列 核心操作一
     */
    @Bean
    public Queue orderQueue() {
        Map<String, Object> arguments = new HashMap<>(2);
        // 绑定我们的死信交换机
        arguments.put("x-dead-letter-exchange", dlxExchange);
        // 绑定我们的路由key
        arguments.put("x-dead-letter-routing-key", dlxRoutingKey);
        return new Queue(orderQueue, true, false, false, arguments);
    }
    /**
     * 绑定订单队列到订单交换机
     */
    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(orderRoutingKey);
    }
    /**
     * 绑定死信队列到死信交换机
     */
    @Bean
    public Binding binding() {
        return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(dlxRoutingKey);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80

application.properties配置文件

server.port=8082
spring.datasource.url=jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8
spring.datasource.username=root
spring.datasource.password=root
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
 
#开启驼峰命名  譬如数据库create_time 自动映射pojo属性createTime
mybatis.configuration.map-underscore-to-camel-case=true
 
#配置virtual-host虚拟主机
spring.rabbitmq.virtual-host=/zhang_rabbit
#ip地址
spring.rabbitmq.host=127.0.0.1
#用户名  密码
spring.rabbitmq.username=xxx
spring.rabbitmq.password=xxx
#连接端口号
spring.rabbitmq.port=5672
#spring.rabbitmq.publisher-confirm-type=
 
#模拟演示死信队列
mayikt.dlx.exchange=mayikt_order_dlx_exchange
mayikt.dlx.queue=mayikt_order_dlx_queue
mayikt.dlx.routingKey=dlx
 
##备胎交换机
mayikt.order.exchange=mayikt_order_exchange
mayikt.order.queue=mayikt_order_queue
mayikt.order.routingKey=mayikt.order
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

创建producer生产者.暂时设置消息10秒过期,验证消息是否加入死信队列

@RestController
public class OrderController {
 
    @Autowired
    private OrderMapper orderMapper;
 
    @Autowired
    private RabbitTemplate rabbitTemplate;
 
    @Value("${mayikt.order.exchange}")
    private String orderExchange; //订单交换机
 
    @Value("${mayikt.order.routingKey}")
    private String orderRoutingKey; //订单路由key
 
    @GetMapping("/addOrder")
    public String addOrder(){
        String orderId=System.currentTimeMillis()+"";
        OrderEntity orderEntity=new OrderEntity("订单30分钟过期",orderId,0);
        //订单入库
        int result= orderMapper.addOrder(orderEntity);
        if(result<=0){
            return "fail";
        }
        //rabbit投递消息
    rabbitTemplate.convertAndSend(orderExchange,orderRoutingKey,orderId,messagePostProcessor());
        return "success";
    }
 
    //处理待发送消息
    private MessagePostProcessor messagePostProcessor(){
        return  new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //设置有效期30分钟
                //message.getMessageProperties().setExpiration("1800000");
                message.getMessageProperties().setExpiration("10000");
                return message;
            }
        };
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

死信消费者消费过期消息

@Component //死信队列
public class OrderDlxConsumer {
    
    @Autowired
    private OrderMapper orderMapper;
 
    /**
     * 监听我们的死信队列
     */
    @RabbitListener(queues = "mayikt_order_dlx_queue")
    public void orderConsumer(String orderId) {
        System.out.println("死信队列获取消息:" + orderId);
        if (StringUtils.isEmpty(orderId)) {
            return;
        }
        //根据id查询
        OrderEntity orderEntity = orderMapper.getOrder(orderId);
        if (null == orderEntity) {
            return;
        }
        //获取状态
        Integer orderStatus=orderEntity.getOrderStatus();
        //判断未支付 , 关闭订单
        if(0==orderStatus){
            //库存返还
            orderMapper.updateStatus(orderId,2);
            
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

下单测试 http://localhost:8082/addOrder.订单入库成功.状态0未支付

10秒后,死信消费者处理 状态0未支付 变为2 已关闭

# 总结

除了基于RabbitMQ实现延时任务,还有其它很多种包括像定时任务、JDK延迟队列、redis、RocketMQ等等来实现,至于选哪种方式,还是应该由具体的业务来决定(适合最重要)。

来源:https://blog.csdn.net/zhangshengqiang168/article/details/104718979

技术虐我千百遍,我待技术如初恋,我们下期再见!

大家的点赞、收藏和评论对子龙非常重要,如文章对你有帮助还请转发支持下,谢谢!

上次更新: 2024/03/11, 15:54:57
RabbitMQ入门教程
RocketMQ基础知识

← RabbitMQ入门教程 RocketMQ基础知识→

最近更新
01
一个注解,优雅的实现接口幂等性
11-17
02
MySQL事务(超详细!!!)
10-14
03
阿里二面:Kafka中如何保证消息的顺序性?这周被问到两次了
10-09
更多文章>
Theme by Vdoing | Copyright © 2024-2024

    辽ICP备2023001503号-2

  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式