程序员子龙(Java面试 + Java学习) 程序员子龙(Java面试 + Java学习)
首页
学习指南
工具
开源项目
技术书籍

程序员子龙

Java 开发从业者
首页
学习指南
工具
开源项目
技术书籍
  • 基础

  • JVM

  • Spring

  • 并发编程

  • Mybatis

  • 网络编程

  • 数据库

  • 缓存

  • 设计模式

  • 分布式

  • 高并发

  • SpringBoot

  • SpringCloudAlibaba

  • Nginx

  • 面试

  • 生产问题

  • 系统设计

  • 消息中间件

    • 基础

    • Kafka

      • Kafka 介绍
      • 扫盲Kafka,看这一篇就够了!
      • kafka基本使用
        • 安装kafka
          • 安装zookeeper
          • 下载安装包
          • 修改配置
          • 启动服务
          • kafka的两个配置listeners和advertised.listeners
          • listeners:
          • advertised.listeners
          • 创建主题
          • 发送消息
          • 消费消息
        • kafka集群实战
        • 副本的ACK机制
        • Spring Boot整合Kafka
        • 分区策略
          • 1. 轮询分区策略
          • 2. 按key分区分配策略
          • 3. 随机分区策略
          • 4. 自定义分区策略
          • 消费者分区策略
      • kafka中的@KafkaListener如何动态获得topic
      • spring boot中 设置kafka手动提交OFFSET
      • 面试官:聊聊kafka线上使用会有哪些问题
      • 阿里二面:Kafka中如何保证消息的顺序性?这周被问到两次了
      • 面试官:kafka 分布式的情况下,如何保证消息的顺序消费?
    • RabbitMQ

    • RocketMQ

  • Java
  • 消息中间件
  • Kafka
程序员子龙
2024-03-11
目录

kafka基本使用

# 安装kafka

# 安装zookeeper

kafka依赖zookeeper,需要先安装zookeeper

wget https://archive.apache.org/dist/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gz       
tar -zxvf apache-zookeeper-3.5.8-bin.tar.gz
cd  apache-zookeeper-3.5.8-bin
cp conf/zoo_sample.cfg conf/zoo.cfg

# 启动zookeeper
bin/zkServer.sh start
bin/zkCli.sh 
1
2
3
4
5
6
7
8

# 下载安装包

wget https://archive.apache.org/dist/kafka/2.4.1/kafka_2.11-2.4.1.tgz
tar -xzf kafka_2.11-2.4.1.tgz
cd kafka_2.11-2.4.1
1
2
3

# 修改配置

修改配置文件config/server.properties:

#broker.id属性在kafka集群中必须要是唯一
broker.id=0
#kafka部署的机器ip和提供服务的端口号
listeners=PLAINTEXT://ip:9092   
#kafka的消息存储文件
log.dir=/usr/local/data/kafka-logs
#kafka连接zookeeper的地址
zookeeper.connect=ip:2181
1
2
3
4
5
6
7
8

# 启动服务

#-daemon表示以后台进程运行
kafka-server-start.sh [-daemon] server.properties

#或者用
bin/kafka-server-start.sh config/server.properties &
1
2
3
4
5

server.properties核心配置详解:

Property Default Description
broker.id 0 每个broker都可以用一个唯一的非负整数id进行标识;这个id可以作为broker的“名字”,你可以选择任意你喜欢的数字作为id,只要id是唯一的即可。
log.dirs /tmp/kafka-logs kafka存放数据的路径。这个路径并不是唯一的,可以是多个,路径之间只需要使用逗号分隔即可;每当创建新partition时,都会选择在包含最少partitions的路径下进行。
listeners PLAINTEXT://192.168.123.10:9092 server接受客户端连接的端口,ip配置kafka本机ip即可
zookeeper.connect localhost:2181 zooKeeper连接字符串的格式为:hostname:port,此处hostname和port分别是ZooKeeper集群中某个节点的host和port;zookeeper如果是集群,连接方式为 hostname1:port1, hostname2:port2, hostname3:port3
log.retention.hours 168 每个日志文件删除之前保存的时间。默认数据保存时间对所有topic都一样。
num.partitions 1 创建topic的默认分区数
default.replication.factor 1 自动创建topic的默认副本数量,建议设置为大于等于2
min.insync.replicas 1 当producer设置acks为-1时,min.insync.replicas指定replicas的最小数目(必须确认每一个repica的写数据都是成功的),如果这个数目没有达到,producer发送消息会产生异常
delete.topic.enable false 是否允许删除主题

# kafka的两个配置listeners和advertised.listeners

# listeners:

kafka监听的网卡的ip,假设你机器上有两张网卡,内网192.168.0.213和外网101.89.163.1 如下配置

#kafka只监听内网网卡,即只接收内网网卡的数据
listeners=PLAINTEXT://192.168.0.213:9092

#kafka只监听外网网卡,即只接收外网网卡的数据
listeners=PLAINTEXT://101.89.163.1:9092
1
2
3
4
5
# advertised.listeners
listeners=PLAINTEXT://192.168.0.213:9092
advertised.listeners=PLAINTEXT://101.89.163.1:9092
1
2

kafka 节点启动后,会向 zookeeper 注册自己,同时告诉 zookeeper 自身的通信地址,这个地址就是配置文件中的 advertised.listeners,如果没有配置 advertised.listeners,就会使用listeners。同时从 zookeeper 中获取兄弟节点的这个地址,以便与兄弟节点通信。即 kafka 节点是从 zookeeper 获取的其他节点的通信地址。 我们使用客户端以一个 ip 地址首次连接 kafka 节点后,节点返回给客户端的 kafka 集群地址就是从 zookeeper 中获得的这些地址,也就是各个节点配置的 advertised.listeners,包括当前连接的节点。所以可能客户端后续访问当前节点的 ip 地址有可能和首次连接的 ip 地址并不一样。

# 创建主题

现在我们来创建一个名字为“test”的Topic,这个topic只有一个partition,并且备份因子也设置为1:

bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test  
1

创建多个分区的主题:

bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 2 --topic test1           
1

# 发送消息

kafka自带了一个producer命令客户端

bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
1

# 消费消息

kafka默认有一个命令行客户端,会将获取到内容在命令中进行输出,默认是消费最新的消息:

bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test   
1

指定消费组

bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092  --consumer-property group.id=testGroup --topic test 
1

同一条消息只能被同一个消费组下的某一个消费者消费,要实现多播只要保证这些消费者属于不同的消费组即可。

# kafka集群实战

多个kafka节点组成kafka集群。本次演示同一台机器安装3个实例。

建立另外2个broker的配置文件:

cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
1
2

config/server-1.properties:

#broker.id属性在kafka集群中必须要是唯一
broker.id=1
#kafka部署的机器ip和提供服务的端口号
listeners=PLAINTEXT://192.168.123.10:9093   
log.dir=/usr/local/data/kafka-logs-1
#kafka连接zookeeper的地址,要把多个kafka实例组成集群,对应连接的zookeeper必须相同
zookeeper.connect=127.0.0.1:2181
1
2
3
4
5
6
7

config/server-2.properties:

broker.id=2
listeners=PLAINTEXT://192.168.123.10:9094
log.dir=/usr/local/data/kafka-logs-2
zookeeper.connect=127.0.0.1:2181        
1
2
3
4

启动2个broker实例即可:

bin/kafka-server-start.sh -daemon config/server-1.properties
bin/kafka-server-start.sh -daemon config/server-2.properties
1
2

# 副本的ACK机制

producer是不断地往Kafka中写入数据,写入数据会有一个返回结果,表示是否写入成功。这里对应有一个ACKs的配置。

  • acks = 0:生产者只管写入,不管是否写入成功,可能会数据丢失。性能是最好的

  • acks = 1:生产者会等到leader分区写入成功后,返回成功,接着发送下一条

  • acks = -1/all:确保消息写入到leader分区、还确保消息写入到对应副本都成功后,接着发送下一条,性能是最差的

# Spring Boot整合Kafka

引入spring boot kafka依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
1
2
3
4

application.yml配置:

server:
  port: 8080

spring:
  kafka:
    bootstrap-servers: 8.140.246.47:9092
    producer: # 生产者
      retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送

      batch-size: 16384
      buffer-memory: 33554432
      acks: 1
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      properties:
        retry.backoff.ms: 100 #重试时间间隔,默认100
        linger.ms: 0 #默认为0,表示批量发送消息之前等待更多消息加入batch的时间
        max.request.size: 1048576 #默认1MB,表示发送消息最大值
        connections.max.idle.ms: 540000 #默认9分钟,表示多久后关闭限制的连接
        receive.buffer.bytes: 32768 #默认32KB,表示socket接收消息缓冲区的大小,为-1时使用操作系统默认值
        send.buffer.bytes: 131072 #默认128KB,表示socket发送消息缓冲区大小,为-1时使用操作系统默认值
#        request.timeout.ms: 10000 #默认30000ms,表示等待请求响应的最长时间
#        max.block.ms: 1000

    consumer:
      auto-commit-interval: 5000 #自动提交消费位移时间隔时间
      max-poll-records: 500 #批量消费每次最多消费多少条消息
      client-id: kafka.consumer.client.id #消费者客户端ID
      group-id: default-group
      enable-auto-commit: false
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      fetch-max-wait: 400 #最大等待时间
      fetch-min-size: 1 #最小消费字节数
      heartbeat-interval: 3000 #分组管理时心跳到消费者协调器之间的预计时间
      isolation-level: read_committed

    listener:
      # 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
      # RECORD
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
      # BATCH
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
      # TIME
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
      # COUNT
      # TIME | COUNT 有一个条件满足时提交
      # COUNT_TIME
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
      # MANUAL
      # 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种
      # MANUAL_IMMEDIATE
      ack-mode: manual_immediate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56

生产者代码:

package com.demo.service;

import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.util.concurrent.ExecutionException;

@Service
@RequiredArgsConstructor
public class ProducerServiceImpl  {

    private final KafkaTemplate<String, String> kafkaTemplate;
    private static final Logger logger = LoggerFactory.getLogger(ProducerServiceImpl.class);


    public void sendSyncMessage(String topic, String data) throws ExecutionException, InterruptedException {
        SendResult<String, String> sendResult = kafkaTemplate.send(topic, data).get();
        RecordMetadata recordMetadata = sendResult.getRecordMetadata();
        logger.info("发送同步消息成功!发送的主题为:{}", recordMetadata.topic());
    }


    public void sendMessage(String topic, String data) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, data);
        future.addCallback(success -> logger.info("发送消息成功!"), failure -> logger.error("发送消息失败!失败原因是:{}", failure.getMessage()));
    }


    public void sendMessage(ProducerRecord<String, String> record) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(record);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                logger.error("发送消息失败!失败原因是:{}", throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, String> sendResult) {
                RecordMetadata metadata = sendResult.getRecordMetadata();
                logger.info("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition());
            }
        });
    }

 
    public void sendMessage(Message<String> message) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(message);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                logger.error("发送消息失败!失败原因是:{}", throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, String> sendResult) {
                RecordMetadata metadata = sendResult.getRecordMetadata();
                logger.info("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition());
            }
        });
    }


    public void sendMessage(String topic, String key, String data) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, key, data);
        future.addCallback(success -> logger.info("发送消息成功!"), failure -> logger.error("发送消息失败!失败原因是:{}", failure.getMessage()));
    }


    public void sendMessage(String topic, Integer partition, String key, String data) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, partition, key, data);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                logger.error("发送消息失败!失败原因是:{}", throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, String> sendResult) {
                RecordMetadata metadata = sendResult.getRecordMetadata();
                logger.info("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition());
            }
        });
    }

    public void sendMessage(String topic, Integer partition, Long timestamp, String key, String data) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, partition, timestamp, key, data);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                logger.error("发送消息失败!失败原因是:{}", throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, String> sendResult) {
                RecordMetadata metadata = sendResult.getRecordMetadata();
                logger.info("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition());
            }
        });
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110

消费者代码:

@Component
public class MyConsumer {

    /**
     * @KafkaListener(groupId = "testGroup", topicPartitions = {
     *             @TopicPartition(topic = "topic1", partitions = {"0", "1"}),
     *             @TopicPartition(topic = "topic2", partitions = "0",
     *                     partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
     *     },concurrency = "6")
     *  //concurrency就是同组下的消费者个数,就是并发消费数,必须小于等于分区总数
     * @param record
     */
    @KafkaListener(topics = "my-replicated-topic",groupId = "costomGroup")
    public void listen(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String value = record.value();
        System.out.println(value);
        System.out.println(record);
        //手动提交offset
        ack.acknowledge();
    }


}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

# 分区策略

生产者写入消息到topic,Kafka将依据不同的策略将数据分配到不同的分区中:

# 1. 轮询分区策略

即按消息顺序进行分区顺序分配,是默认的策略,也是使用最多的策略,可以最大限度保证所有消息平均分配到一个分区;

key为null,则使用轮询算法均衡地分配分区;

# 2. 按key分区分配策略

key不为null,key.hash() % n

但是按照key决定分区有可能会造成数据倾斜

# 3. 随机分区策略

随机分区,不建议使用

# 4. 自定义分区策略

根据业务需要制定以分区策略

# 消费者分区策略

同一时刻,一条消息只能被组中的一个消费者实例消费:

  1. 消费者数=分区数:一个分区对应一个消费者
  2. 消费者数<分区数:一个消费者对应多个分区
  3. 消费者数>分区数:多出来的消费者将不会消费任何消息

分区分配策略:保障每个消费者尽量能够均衡地消费分区的数据,不能出现某个消费者消费分区的数量特别多,某个消费者消费的分区特别少

1. Range分配策略(范围分配策略):Kafka默认的分配策略

计算公式:

n=分区数量/消费者数量

m=分区数量%消费者数量

前m个消费者消费n+1个,剩余消费者消费n个

以上图为例:n=8/3=2m=8%3=2因此前2个消费者消费2+1=3个分区,剩下1个消费者消费2个分区

2. RoundRobin分配策略(轮询分配策略)

消费者挨个分配消费的分区:

如下图,3个消费者共同消费8个分区

第一轮:Consumer0-->A-Partition0;Consumer1-->A-Partition1;Consumer2-->A-Partition2

第二轮:Consumer0-->A-Partition3;Consumer1-->B-Partition0;Consumer2-->B-Partition1

第三轮:Consumer0-->B-Partition2;Consumer1-->B-Partition3

上次更新: 2024/03/18, 15:55:19
扫盲Kafka,看这一篇就够了!
kafka中的@KafkaListener如何动态获得topic

← 扫盲Kafka,看这一篇就够了! kafka中的@KafkaListener如何动态获得topic→

最近更新
01
一个注解,优雅的实现接口幂等性
11-17
02
MySQL事务(超详细!!!)
10-14
03
阿里二面:Kafka中如何保证消息的顺序性?这周被问到两次了
10-09
更多文章>
Theme by Vdoing | Copyright © 2024-2024

    辽ICP备2023001503号-2

  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式