准备工作
安装解压缩工具 tar
# 检查是否安装了解压缩工具 tar yum list tar # 如未安装 tar yum install tar -y
安装必备的 java
# 检查是否安装了 java-openjdk,这里选择 java-1.8.0.openjdk 版 yum list java-1.8.0.openjdk # 如未安装 java-openjdk yum install java-1.8.0.openjdk -y
安装 kafka
官网下载地址:https://kafka.apache.org/downloads,这里下载 kafka_2.13-3.2.3.tgz 版。
# 下载 kafka 安装包文件 curl -O https://downloads.apache.org/kafka/3.2.3/kafka_2.13-3.2.3.tgz # 解压安装包 tar -xzvf kafka_2.13-3.2.3.tgz -C /usr/local
运行
在 2.8 之前,kafka 依赖与 zookeeper;2.8 之后,zookeeper 的替代品 karft。
以下命令均在解压包(kafka)的根目录进行。
基于 zookeeper 方式
# 启动 # # 切换到解压后的 kafka 根目录 cd /usr/local/kafka_2.13-3.2.3 # 启动 zookeeper bin/zookeeper-server-start.sh -daemon config/zookeeper.properties # 启动 kafka bin/kafka-server-start.sh -daemon config/server.properties # # 验证启动;zookeeper 默认端口2181,kafka 默认端口为9092 ss -plnts # 端口列是否列出了 2181、9092,表示启动正常 # # 停止 kafka bin/kafka-server-stop.sh # 停止 zookeeper bin/zookeeper-server-stop.sh
基于 kraft 方式
# 启动 # # 切换到解压后的 kafka 根目录 cd /usr/local/kafka_2.13-3.2.3 # 准备 uuid bin/kafka-storage.sh random-uuid # 配置 uuid bin/kafka-storage.sh format -t {uuid} -c config/kraft/server.properties # 通过 kraft 配置 后台启动 kafka bin/kafka-server-start.sh -daemon config/kraft/server.properties # # 验证启动;kafka 默认端口为9092 ss -plnts # 端口列是否列出了 9092,表示启动正常 # # 停止 kafka bin/kafka-server-stop.sh
基本命令
开放端口:确保每个服务器所应用到的端口(2181 和 9092等)开放(参考firewall)。
Topic
# 查看已有的 Topic bin/kafka-topics.sh --bootstrap-server localhost:9092 --list # 创建一个新的 Topic,分区数1个,副本数3份 bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic {name} --create --partitions 1 --replication-factor 1 # 查看一个 Topic 详细 bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic {name} --describe # 修改一个 Topic 的属性:分区变更为2个(分区只能加,不能减少,对于消费者分不清数据所属) bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic {name} --alter --partitions 2
Consumer
# 消费者订阅测试;新打开一个终端,连接到服务器,消费者订阅一个topic,接收消息测试 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic {name} --from-beginning
Producer
# 生产者发送消息;新开一个终端窗体,连接到服务器,用来生产者发送消息(回车后输入要发送的消息内容) bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic {name}
切换到 消费者终端窗口,查看接收到订阅的 topic 消息。
集群部署
这里用三台机组成集群模式
- 把解压后的 /usr/local/kafka_2.13-3.2.3 文件夹分发到集群中的每个服务器
- 配置文件 config/server.properties | config/kraft/server.properties 的配置
- 启动每台服务器的 kafka
安装包分发到每台服务器
# 把解压后的 kafka 根目录文件夹分发到集群中的每台服务器 scp -r /usr/local/kafka_2.13-3.2.3 root@{服务器IP} /usr/local
zookeeper 方式
以 config/server.properties 文件为主的配置及启动
zookeeper 使用的端口:2181;kafka 使用的端口:9092。
# 新终端窗口登录到每台服务器,配置每台服务器的 config/server.properties # # # 切换到每台服务器的 kafka 根目录 cd /usr/local/kafka_2.13-3.2.3 # # 编辑每台配置文件 config/server.properties vi config/server.properties # 以下两点必须的变更 # 1、确保每台 config/server.properties 中的 broker.id 唯一(如:0,1,2) # 2、确保每台 config/server.properties 中的 zookeeper.connect 都相同 # 示例:zookeeper.connect={serverA}:2181,{serverB}:2181,{serverB}:2181 # # # 启动每台服务器的 zookeeper,kafka bin/zookeeper-server-start.sh -daemon config/zookeeper.properties bin/kafka-server-start.sh -daemon config/server.properties
kraft 方式
以 config/kraft/server.properties 文件为主的配置及启动
kraft 使用的端口:9093;kafka 使用的端口:9092。
# 新终端窗口登录到每台服务器,配置每台服务器的 config/kraft/server.properties # # # 切换到每台服务器的 kafka 根目录 cd /usr/local/kafka_2.13-3.2.3 # # 编辑每台配置文件 config/kraft/server.properties vi config/kraft/server.properties #以下三点必须的变更 # # 1、确保每台 config/kraft/server.properties 中的 node.id 唯一(如:1,2,3) # 示例:node.id=1 # # 2、确保每台 config/kraft/server.properties 中的 listeners 配置各自的主机名称或IP # 示例:listeners=PLAINTEXT://{localhost}:9092,CONTROLLER://{localhost}:9093 # # 3、确保每台 config/kraft/server.properties 中的 controller.quorum.voters 都相同 # 格式:controller.quorum.voters={node.id}@{host}:{port} # 示例:controller.quorum.voters=1@{hostnameA}:9093,2@{hostnameB}:9093,3@{hostnameC}:9093 # # # 启动 kafka # 先为集群生成一个uuid bin/kafka-storage.sh random-uuid # 分别为每台服务器配置相同的uuid后启动 bin/kafka-storage.sh format -t {uuid} -c config/kraft/server.properties bin/kafka-server-start.sh -daemon config/kraft/server.properties