程序员子龙(Java面试 + Java学习) 程序员子龙(Java面试 + Java学习)
首页
学习指南
工具
开源项目
技术书籍

程序员子龙

Java 开发从业者
首页
学习指南
工具
开源项目
技术书籍
  • 基础

  • JVM

  • Spring

  • 并发编程

  • Mybatis

  • 网络编程

  • 数据库

  • 缓存

  • 设计模式

  • 分布式

  • 高并发

  • SpringBoot

  • SpringCloudAlibaba

  • Nginx

  • 面试

  • 生产问题

  • 系统设计

  • 消息中间件

    • 基础

    • Kafka

      • Kafka 介绍
      • 扫盲Kafka,看这一篇就够了!
      • kafka基本使用
      • kafka中的@KafkaListener如何动态获得topic
      • spring boot中 设置kafka手动提交OFFSET
        • 面试官:聊聊kafka线上使用会有哪些问题
        • 阿里二面:Kafka中如何保证消息的顺序性?这周被问到两次了
        • 面试官:kafka 分布式的情况下,如何保证消息的顺序消费?
      • RabbitMQ

      • RocketMQ

    • Java
    • 消息中间件
    • Kafka
    程序员子龙
    2024-01-29
    目录

    spring boot中 设置kafka手动提交OFFSET

    # spring boot中 设置kafka手动提交OFFSET

    1、enable.auto.commit参数设置成了false

    2、org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode

     /**
    
       * The offset commit behavior enumeration.
         /
      public enum AckMode { 
        /**
         * 每处理一条commit一次
         */
        RECORD,
     
        /**
         * 每次poll的时候批量提交一次,频率取决于每次poll的调用频率
         */
        BATCH,
     
        /**
         * 每次间隔ackTime的时间去commit
         */
        TIME,
     
        /**
         * 累积达到ackCount次的ack去commit
         */
        COUNT,
     
        /**
         *ackTime或ackCount哪个条件先满足,就commit
         */
        COUNT_TIME,
     
        /**
         *  listener负责ack,但是实际上也是批量上去
         */
        MANUAL,
     
        /**
         
         * listner负责ack,每调用一次,就立即commit
         /
        MANUAL_IMMEDIATE;
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40

    # MANUAL COMMIT

    @KafkaListener(topics = "k010")
    public void listen(ConsumerRecord<?, ?> cr,Acknowledgment ack) throws Exception {
    
            LOGGER.info(cr.toString());
    
            ack.acknowledge();
    
    }
    
    1
    2
    3
    4
    5
    6
    7
    8

    方法参数里头传递Acknowledgment,然后手工ack

    如果只添加上面语句会报错:

    the listener container must have a MANUAL Ackmode to populate the Acknowledgment
    
    1

    我们要配置AckMode为MANUAL Ackmode

    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
    
    1

    在spring boot 可以直接配置

    spring:
    
      kafka:
      
        # kafka服务地址与端口
        bootstrap-servers: 127.0.0.1:9092
        key.serializer: org.apache.kafka.common.serialization.StringSerializer
        value.serializer: org.apache.kafka.common.serialization.StringSerializer
    
        consumer:
          group-id: bbb
          topic: test
          auto-offset-reset: earliest
      
          max-poll-records: 100
          enable-auto-commit: false
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
      listener:
        ack-mode: manual
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20

    # 设置了手动提交,消息重复消费原因

    kafka 有个offset的概念,当每个消息被写进去后,都有一个offset,代表他的序号,然后consumer消费该数据之后,隔一段时间,会把自己消费过的消息的offset提交一下,代表我已经消费过了。

    max.poll.interval.ms

    两次poll操作允许的最大时间间隔。单位毫秒。默认值300000(5分钟)。

    两次poll超过此时间间隔,Kafka服务端会进行rebalance操作,导致客户端连接失效,无法提交offset信息,从而引发重复消费。可以适当调大这个参数避免重复消费。

    session.timeout.ms

    比如一条消息处理需要5分钟,session.timeout.ms = 3000ms,等消费者处理完消息,消费组早就将消费者移除消费者了,那么就会re-balance重平衡,此时有一定几率offset没提交,会导致重平衡后重复消费。

    上次更新: 2024/03/11, 15:54:57
    kafka中的@KafkaListener如何动态获得topic
    面试官:聊聊kafka线上使用会有哪些问题

    ← kafka中的@KafkaListener如何动态获得topic 面试官:聊聊kafka线上使用会有哪些问题→

    最近更新
    01
    一个注解,优雅的实现接口幂等性
    11-17
    02
    MySQL事务(超详细!!!)
    10-14
    03
    阿里二面:Kafka中如何保证消息的顺序性?这周被问到两次了
    10-09
    更多文章>
    Theme by Vdoing | Copyright © 2024-2024

        辽ICP备2023001503号-2

    • 跟随系统
    • 浅色模式
    • 深色模式
    • 阅读模式