Kafka入门:从初识到Spring Boot实战

回顾完RabbitMQ,再跟我一起回顾下Kafka ~

一、Kafka介绍

1. 什么是Kafka?

Kafka是由Apache软件基金会开发的分布式流处理平台,最初由LinkedIn公司设计,现已成为大数据领域核心的消息中间件。它能处理实时数据流,支持高吞吐、低延迟、可扩展的消息传递,广泛用于日志收集、实时分析、事件驱动架构等场景。

2. 核心特点

  • 高吞吐:单机可支持百万级消息/秒,通过分区并行处理实现。
  • 持久化:消息持久化到磁盘,支持TB级数据存储(默认保留7天)。
  • 分布式:集群由多个Broker(服务器)组成,支持水平扩展。
  • 多订阅者:一个Topic的消息可被多个消费者组独立消费(广播/负载均衡)。

二、Kafka架构与核心组件

1. 核心组件

组件 作用
Broker Kafka服务器节点,存储Topic数据,每个Broker有唯一ID(broker.id)。
Topic 消息的逻辑分类(如order-topic),类似“消息频道”,包含多个Partition。
Partition Topic的物理分片(有序日志文件),分布式存储的基本单位,每个Partition有Leader和Follower副本。
Producer 发送消息到Topic的客户端(如订单服务)。
Consumer 从Topic订阅消息的客户端(如库存服务)。
Consumer Group 消费者组,组内多个消费者负载均衡消费Partition,组间独立消费(广播)。

2. 架构图(Mermaid)

graph TD subgraph Kafka Cluster Broker1[Broker 1<br/>broker.id=0<br/>• TopicA-Partition0 Leader<br/>• TopicB-Partition1 Follower] Broker2[Broker 2<br/>broker.id=1<br/>• TopicA-Partition1 Leader<br/>• TopicB-Partition0 Leader] Broker3[Broker 3<br/>broker.id=2<br/>• TopicA-Partition0 Follower] end ZK[ZooKeeper<br/>集群协调<br/>存储元数据] Producer[Producer<br/>发送消息到Topic] ConsumerGroup[Consumer Group<br/>组内负载均衡消费] TopicA[TopicA<br/>• Partition0<br/>• Partition1] Producer -->|发送消息| TopicA TopicA -->|分区存储| Broker1 TopicA -->|分区存储| Broker2 Broker1 -->|同步数据| Broker3 TopicA -->|负载均衡消费| ConsumerGroup ZK -.->|协调| Broker1 ZK -.->|协调| Broker2 ZK -.->|协调| Broker3 ZK -.->|管理消费者组| ConsumerGroup

三、消息流转完整路径(生产者→消费者)

1. 流转步骤

  1. 生产者发送消息:生产者指定Topic和Key(可选),通过分区器将消息分配到Partition(默认按Key哈希)。
  2. Broker存储消息:Leader副本接收消息并写入磁盘(Segment文件),Follower副本同步数据。
  3. 消费者组分配Partition:消费者组启动时,协调者(Coordinator)将Topic的Partition分配给组内消费者(一个Partition仅被一个消费者消费)。
  4. 消费者拉取消息:消费者定期拉取(Poll)分配到的Partition消息,处理后提交偏移量(Offset)。

2. 消息流转图示(Mermaid)

sequenceDiagram participant P as Producer participant B as Broker (Leader) participant F as Broker (Follower) participant C as Consumer (Group) Note over P,B: 1. 生产者发送消息 P->>B: 发送消息到Topic-Partition0 (Key: order-1) B->>B: 写入本地日志 (LEO=100) B->>F: 同步消息 (LEO=100) F->>B: 确认同步 (LEO=100) B->>P: 返回ACK (消息提交成功) Note over B,C: 2. 消费者拉取消息 C->>B: Poll请求 (获取Partition0消息) B->>C: 返回消息 (Offset=99, Value=订单数据) C->>C: 处理消息 (扣减库存) C->>B: 提交偏移量 (Offset=100)

四、Kafka安装(ZooKeeper传统模式,CentOS 7)

1. 环境准备

  • CentOS 7系统,关闭防火墙(或开放端口2181、9092):
    systemctl stop firewalld && systemctl disable firewalld 
  • 安装JDK 8+:
    yum install java-1.8.0-openjdk-devel -y 

2. 安装ZooKeeper(Kafka依赖)

步骤1:下载并解压

wget https://archive.apache.org/dist/zookeeper/zookeeper-3.7.1/apache-zookeeper-3.7.1-bin.tar.gz tar -zxvf apache-zookeeper-3.7.1-bin.tar.gz -C /opt/ mv /opt/apache-zookeeper-3.7.1-bin /opt/zookeeper 

步骤2:配置ZooKeeper

cd /opt/zookeeper/conf cp zoo_sample.cfg zoo.cfg vim zoo.cfg  # 修改以下配置 
dataDir=/var/lib/zookeeper  # 数据存储目录 clientPort=2181             # 客户端端口 

步骤3:启动ZooKeeper

mkdir -p /var/lib/zookeeper /opt/zookeeper/bin/zkServer.sh start  # 启动 /opt/zookeeper/bin/zkServer.sh status  # 查看状态(显示Mode: standalone为成功) 

3. 安装Kafka Broker

步骤1:下载并解压

wget https://archive.apache.org/dist/kafka/3.6.0/kafka_2.13-3.6.0.tgz tar -zxvf kafka_2.13-3.6.0.tgz -C /opt/ mv /opt/kafka_2.13-3.6.0 /opt/kafka 

步骤2:配置Kafka

cd /opt/kafka/config vim server.properties  # 修改以下配置 
# 核心配置 broker.id=0                  # 当前Broker唯一ID(集群中不可重复) listeners=PLAINTEXT://localhost:9092  # 监听地址(本地测试用localhost) log.dirs=/var/lib/kafka/logs  # 消息存储目录 zookeeper.connect=localhost:2181/kafka  # 连接ZooKeeper(/kafka为根节点) 

步骤3:启动Kafka

mkdir -p /var/lib/kafka/logs /opt/kafka/bin/kafka-server-start.sh -daemon config/server.properties  # 后台启动 jps  # 查看进程(显示Kafka为成功) 

4. 创建Topic(测试用)

/opt/kafka/bin/kafka-topics.sh --create    --topic order-topic           # 主题名称   --bootstrap-server localhost:9092   # Kafka地址   --partitions 3                # 分区数(建议≥3)   --replication-factor 1         # 副本数(单节点只能设1) 

五、Spring Boot保姆级案例(生产者+消费者)

1. 项目结构

src/main/java/com/example/kafkademo/ ├── KafkaDemoApplication.java  # 启动类 ├── model/Order.java           # 订单实体类 ├── producer/OrderProducer.java # 生产者服务 ├── consumer/OrderConsumer.java # 消费者服务 └── controller/OrderController.java # 测试接口 src/main/resources/ └── application.yml            # 配置文件 

2. pom.xml依赖

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">     <modelVersion>4.0.0</modelVersion>     <parent>         <groupId>org.springframework.boot</groupId>         <artifactId>spring-boot-starter-parent</artifactId>         <version>2.7.15</version> <!-- Spring Boot 2.7.x稳定版 -->         <relativePath/>     </parent>      <groupId>com.example</groupId>     <artifactId>kafka-demo</artifactId>     <version>0.0.1-SNAPSHOT</version>     <name>kafka-demo</name>      <dependencies>         <!-- Web依赖(提供HTTP接口) -->         <dependency>             <groupId>org.springframework.boot</groupId>             <artifactId>spring-boot-starter-web</artifactId>         </dependency>          <!-- Kafka依赖 -->         <dependency>             <groupId>org.springframework.kafka</groupId>             <artifactId>spring-kafka</artifactId>         </dependency>          <!-- Lombok(简化实体类) -->         <dependency>             <groupId>org.projectlombok</groupId>             <artifactId>lombok</artifactId>             <optional>true</optional>         </dependency>     </dependencies>      <build>         <plugins>             <plugin>                 <groupId>org.springframework.boot</groupId>                 <artifactId>spring-boot-maven-plugin</artifactId>             </plugin>         </plugins>     </build> </project> 

3. application.yml配置

server:   port: 8080  # 应用端口  spring:   application:     name: kafka-demo  # 应用名称   kafka:     bootstrap-servers: localhost:9092  # Kafka集群地址(多个用逗号分隔)          # 生产者配置     producer:       key-serializer: org.apache.kafka.common.serialization.StringSerializer  # Key序列化器(字符串)       value-serializer: org.springframework.kafka.support.serialization.JsonSerializer  # Value序列化器(JSON)       acks: all  # 消息确认级别:all=所有ISR副本确认(最高可靠性)       retries: 3  # 发送失败重试次数       enable-idempotence: true  # 启用幂等性(防重复消息)          # 消费者配置     consumer:       group-id: order-group  # 消费者组ID(同一组内负载均衡)       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer  # Key反序列化器       value-deserializer: org.springframework.kafka.support.serialization.JsonDeserializer  # Value反序列化器       auto-offset-reset: earliest  # 无偏移量时策略:earliest=从头消费       enable-auto-commit: false  # 关闭自动提交偏移量(手动控制)       properties:         spring.json.trusted.packages: "com.example.kafkademo.model"  # 信任的实体类包(JSON反序列化用)          # 监听器配置(消费者)     listener:       ack-mode: manual_immediate  # 手动立即提交偏移量(处理完一条提交一条)       concurrency: 3  # 并发消费者数(建议=Topic分区数,此处3分区) 

4. 实体类(Order.java)

package com.example.kafkademo.model;  import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import java.math.BigDecimal;  /**  * 订单实体类(消息载体)  */ @Data  // Lombok注解:自动生成getter/setter/toString等 @NoArgsConstructor  // 无参构造 @AllArgsConstructor  // 全参构造 public class Order {     private String orderId;  // 订单ID     private String productName;  // 商品名称     private BigDecimal amount;  // 订单金额     private String status;  // 订单状态(CREATED/PAID/SHIPPED) } 

5. 生产者服务(OrderProducer.java)

package com.example.kafkademo.producer;  import com.example.kafkademo.model.Order; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback;  import java.math.BigDecimal; import java.util.UUID;  /**  * 订单生产者服务:发送订单消息到Kafka  */ @Service  // 标记为Spring服务组件 @Slf4j  // Lombok日志注解 public class OrderProducer {      // 注入KafkaTemplate(Spring Boot自动配置,用于发送消息)     @Autowired     private KafkaTemplate<String, Order> kafkaTemplate;      private static final String TOPIC_NAME = "order-topic";  // 目标Topic名称(需与消费者一致)      /**      * 发送订单消息      * @param order 订单对象(若为null则自动生成测试订单)      */     public void sendOrder(Order order) {         // 1. 若订单ID为空,生成UUID作为订单ID         if (order == null) {             order = new Order();             order.setOrderId(UUID.randomUUID().toString());  // 随机生成订单ID             order.setProductName("测试商品");  // 测试商品名称             order.setAmount(new BigDecimal("99.99"));  // 测试金额             order.setStatus("CREATED");  // 初始状态:已创建         }          // 2. 发送消息到Kafka(Key=订单ID,确保同一订单进入同一Partition)         ListenableFuture<SendResult<String, Order>> future =              kafkaTemplate.send(TOPIC_NAME, order.getOrderId(), order);          // 3. 异步回调:处理发送结果(成功/失败)         future.addCallback(new ListenableFutureCallback<SendResult<String, Order>>() {             @Override             public void onSuccess(SendResult<String, Order> result) {                 // 发送成功:打印消息元数据(Topic、分区、偏移量)                 log.info("订单发送成功:orderId={}, 分区={}, 偏移量={}",                         order.getOrderId(),                         result.getRecordMetadata().partition(),                         result.getRecordMetadata().offset());             }              @Override             public void onFailure(Throwable ex) {                 // 发送失败:打印错误信息                 log.error("订单发送失败:orderId={}, 原因={}", order.getOrderId(), ex.getMessage());             }         });     } } 

6. 消费者服务(OrderConsumer.java)

package com.example.kafkademo.consumer;  import com.example.kafkademo.model.Order; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Service;  /**  * 订单消费者服务:从Kafka接收并处理订单消息  */ @Service  // 标记为Spring服务组件 @Slf4j  // Lombok日志注解 public class OrderConsumer {      /**      * 监听order-topic主题,消费订单消息      * @param record 消息记录(包含Topic、分区、偏移量、消息体等)      * @param ack 手动提交偏移量的工具类      */     @KafkaListener(topics = "order-topic", groupId = "order-group")  // 监听的Topic和消费者组ID     public void consumeOrder(ConsumerRecord<String, Order> record, Acknowledgment ack) {         // 1. 解析消息体(Order对象)         Order order = record.value();         log.info("收到订单消息:orderId={}, productName={}, amount={}, status={}, 分区={}, 偏移量={}",                 order.getOrderId(),                 order.getProductName(),                 order.getAmount(),                 order.getStatus(),                 record.partition(),                 record.offset());          try {             // 2. 模拟业务处理(如扣减库存、更新订单状态)             log.info("处理订单:orderId={},开始扣减库存...", order.getOrderId());             order.setStatus("PAID");  // 更新状态为“已支付”             Thread.sleep(500);  // 模拟处理耗时             log.info("订单处理完成:orderId={},状态更新为{}", order.getOrderId(), order.getStatus());              // 3. 手动提交偏移量(确认消息已处理,Kafka不再重复投递)             ack.acknowledge();             log.info("偏移量提交成功:orderId={},分区={},偏移量={}",                     order.getOrderId(), record.partition(), record.offset());         } catch (Exception e) {             log.error("订单处理失败:orderId={}, 原因={}", order.getOrderId(), e.getMessage());             // 处理失败时抛出异常,触发重试(需配置重试策略,此处简化)         }     } } 

7. 测试控制器(OrderController.java)

package com.example.kafkademo.controller;  import com.example.kafkademo.model.Order; import com.example.kafkademo.producer.OrderProducer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController;  import java.math.BigDecimal;  /**  * 测试接口:通过HTTP发送订单消息  */ @RestController  // 标记为REST接口控制器 public class OrderController {      @Autowired  // 注入生产者服务     private OrderProducer orderProducer;      /**      * 创建订单接口(发送消息到Kafka)      * @param order 订单对象(JSON格式)      * @return 响应消息      */     @PostMapping("/create-order")     public String createOrder(@RequestBody(required = false) Order order) {         // 调用生产者发送订单(若order为null,生产者自动生成测试订单)         orderProducer.sendOrder(order);         return "订单消息已发送,请查看控制台日志!";     } } 

8. 启动类(KafkaDemoApplication.java)

package com.example.kafkademo;  import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;  @SpringBootApplication  // Spring Boot启动类注解 public class KafkaDemoApplication {     public static void main(String[] args) {         SpringApplication.run(KafkaDemoApplication.class, args);  // 启动应用     } } 

六、测试步骤

  1. 启动Kafka集群:确保ZooKeeper和Kafka Broker已启动(参考“四、Kafka安装”)。
  2. 创建Topic:若未创建,执行命令:
    /opt/kafka/bin/kafka-topics.sh --create --topic order-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1 
  3. 运行Spring Boot应用:启动KafkaDemoApplication,访问http://localhost:8080/create-order(POST请求,可传JSON订单或用Postman测试)。
  4. 查看日志
    • 生产者日志:显示“订单发送成功”及分区、偏移量。
    • 消费者日志:显示“收到订单消息”“处理订单”“偏移量提交成功”。

总结

本文从Kafka架构、安装到Spring Boot实战,覆盖了入门级核心内容。通过“生产者发送订单→消费者消费订单”的简单案例,快速上手Kafka的基本使用。后续可深入学习分区策略、数据可靠性、事务等高级特性。

发表评论

评论已关闭。

相关文章