Kafka 从 2.6.0 开始,默认使用 Java 11 , 3.0.0 开始,不再支持 Java 8,详见:https://kafka.apache.org/downloads

- Producer:消息生产者,就是向 kafka broker 发消息的客户端:
- Consumer:消息消费者,向 kafka broker 取消息的客户端;
- ConsumerGroup:消费者组,由多个consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
不同的组可以消费同一个消息,且只能被消费组内的一个消费者消费 - Broker :一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker可以容纳多个 topic。
- Topic :可以理解为一个队列,生产者和消费者面向的都是一个 topic;
- Partition:为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列;
- Replica: 副本,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且 Kafka 仍然能够继续工作,kafka提供了副本机制,一个topic的每个分区都有若干个副本,一个 Leader 和 若干个 follower
- leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 leader。
- follower:每个分区多个副本中的“从”,实时从 leader 中同步数据,保持和 leader 数据的同步。leader 发生故障时,某个 follower 会成为新的 follower。
安装 JDK 11
[root@kafka1host ~]# mkdir /usr/local/java # 解压JDK [root@kafka1host ~]# tar -zxvf jdk-11.0.17_linux-x64_bin.tar.gz -C /usr/local/java # 注意 由于jdk1.8版本之后无 jre. 需要进入jdk根目录用以下命令生成jre文件夹不然在运行tomcat时会报缺失jre的错误 ./bin/jlink --module-path jmods --add-modules java.desktop --output jre # 配置环境变量 [root@localhost~]# vi /etc/profile export JAVA_HOME=/usr/local/java/jdk-11.0.17 export JRE_HOME=${JAVA_HOME} export PATH=$PATH:${JAVA_HOME}/bin export CLASSPATH=./:${JAVA_HOME}/lib:${JAVA_HOME}/lib # 让环境变更生效 [root@localhost~]# source /etc/profile # 此时 java -version 仍然显示旧版本 【1. 删除旧版本、2. 切换Java版本】 [root@kafka1host kafka]# java -version openjdk version "1.8.0_102" OpenJDK Runtime Environment (build 1.8.0_102-b14) OpenJDK 64-Bit Server VM (build 25.102-b14, mixed mode) [root@kafka1host kafka]#
切换 JAVA 版本,(如需要卸载旧版本,点击此处)
# 查看已安装的Java版本及其路径,/etc/alternatives/java 是当前Java版本的符号链接。 [root@kafka1host ~]# ls -l /usr/bin/java lrwxrwxrwx. 1 root root 22 Apr 27 2021 /usr/bin/java -> /etc/alternatives/java # 查看当前Java版本的可执行文件路径: /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.102-4.b14.el7.x86_64/jre/bin/java 是当前Java版本的可执行文件路径。 [root@kafka1host ~]# ls -l /etc/alternatives/java lrwxrwxrwx. 1 root root 71 Apr 27 2021 /etc/alternatives/java -> /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.102-4.b14.el7.x86_64/jre/bin/java # alternatives 注册 Java 版本信息 [root@kafka1host ~]# alternatives --install /usr/bin/java java /usr/local/java/jdk-11.0.17/bin/java 1 # alternatives --install <link> <name> <path> <priority> # install 表示安装 # link 是符号链接 # name 则是标识符 # path 是执行文件的路径 # priority 则表示优先级 # 选择Java配置版本 [root@kafka1host ~]# sudo alternatives --config java There are 3 programs which provide 'java'. Selection Command ----------------------------------------------- *+ 1 java-1.8.0-openjdk.x86_64 (/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.102-4.b14.el7.x86_64/jre/bin/java) 2 java-1.7.0-openjdk.x86_64 (/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.111-2.6.7.8.el7.x86_64/jre/bin/java) 3 /usr/local/java/jdk-11.0.17/bin/java # 输入对应版本的编号,然后按Enter键 Enter to keep the current selection[+], or type selection number: 3 # 查看Java版本 [root@kafka1host ~]# java -version java version "11.0.17" 2022-10-18 LTS Java(TM) SE Runtime Environment 18.9 (build 11.0.17+10-LTS-269) Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.17+10-LTS-269, mixed mode) [root@kafka1host ~]#
注意: 由于jdk1.8版本之后无 jre. 如果需要运行 tomcat ,需要进入jdk根目录用以下命令生成jre文件夹不然在运行tomcat时会报缺失jre的错误
./bin/jlink --module-path jmods --add-modules java.desktop --output jre
安装 Kafka
下载 Kafka 2.8.2
https://archive.apache.org/dist/kafka/2.8.2/kafka_2.13-2.8.2.tgz

防火墙
配置防火墙规则
# 查看防火墙状态 [root@kafka1host ~]# firewall-cmd --state running # 查看默认作用域 -- 一般不需要查看 [root@kafka1host ~]# firewall-cmd --get-default-zone public # 添加防火墙规则,允许访问 2181、9092 端口 [root@kafka1host ~]# firewall-cmd --permanent --zone=public --add-port=2181/tcp # –permanent : 表示使设置永久生效,不加的话机器重启之后失效, # –add-port=2181/tcp : 表示添加一个端口和协议的规则, # --zone=public: 作用域(默认为 public 可不加) [root@kafka1host ~]# firewall-cmd --permanent --add-port=9092/tcp # 更新防火墙规则 [root@kafka1host ~]# firewall-cmd --reload # 查看所有打开的端口 [root@kafka1host ~]# firewall-cmd --list-port 2181/tcp 9092/tcp
修改配置
修改配置 server.properties
注意配置中的“=”前后不能有空格
# kafka broker 实际监听的地址和端口,集群间配置使用,如果不配置会使用 hostname 导致程序无法访问,如报:无法连接:kafka1host:9092 listeners=PLAINTEXT://172.0.30.100:9092 #允许删除topic需要server.properties中设置delete.topic.enable=true否则只是标记删除 delete.topic.enable=true # 消息日志,默认日志会存放在 /tmp 目录下,有人喜欢放程序目录下 log.dirs=/tmp/kafka-logs # 以下可不配置-------更多详细配置百度 # 对外提供的地址,它会注册到 zookeeper上,如果不配置,会使用上面 listeners 的值 # advertised.listeners=PLAINTEXT://10.100.25.230:9092
运行测试
一般 zookeeper 单独部署。这边单节点部署,为了省事,直接使用 kafka 包中内置的 Zookeeper。
启动 zookeeper 窗口A
# 先启动 zookeeper [root@kafka1host ~]# /usr/local/kafka_2.13-2.8.2/bin/zookeeper-server-start.sh /usr/local/kafka_2.13-2.8.2/config/zookeeper.properties
启动 Kafka , 新开一个命令行窗口B
# 再启动 Kafka [root@kafka1host ~]# /usr/local/kafka_2.13-2.8.2/bin/kafka-server-start.sh /usr/local/kafka_2.13-2.8.2/config/server.properties
创建 topic 再开一个命令行窗口C测试
# 创建 topic [root@kafka1host ~]# /usr/local/kafka_2.13-2.8.2/bin/kafka-topics.sh --create --topic kafka-vipsoft --replication-factor 1 --partitions 1 --zookeeper localhost:2181 Created topic kafka-vipsoft. # 查看 topic [root@kafka1host ~]# /usr/local/kafka_2.13-2.8.2/bin/kafka-topics.sh --list --zookeeper localhost:2181 __consumer_offsets kafka-vipsoft # 删除 topic,如果 delete.topic.enable=true 没设的话,在kafka重启后才会生效 [root@kafka1host ~]# /usr/local/kafka_2.13-2.8.2/bin/kafka-topics.sh --delete --topic kafka-vipsoft --zookeeper localhost:2181 Topic kafka-vipsoft is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true.
启动生产者 窗口C
# 启动生产者, 不能使用 localhost:9092 需要和 配置中的 listeners 保持一致 [root@kafka1host ~]# /usr/local/kafka_2.13-2.8.2/bin/kafka-console-producer.sh --broker-list 172.0.30.100:9092 --topic kafka-vipsoft > Hello 123
启动消费者,再开一个命令行窗口D
#启动消费者 [root@kafka1host ~]# /usr/local/kafka-console-consumer.sh --bootstrap-server 172.16.3.203:9092 --topic kafka-vipsoft --from-beginning Hello 123
自启动
创建启动命令
vi /usr/local/kafka_2.13-2.8.2/kafka_start.sh
#!/bin/sh /usr/local/kafka_2.13-2.8.2/bin/zookeeper-server-start.sh /usr/local/kafka_2.13-2.8.2/config/zookeeper.properties & sleep 5 #先启动 zookeeper,等5秒后启动 kafka #启动kafka /usr/local/kafka_2.13-2.8.2/bin/kafka-server-start.sh /usr/local/kafka_2.13-2.8.2/config/server.properties &
创建停止命令
vi /usr/local/kafka_2.13-2.8.2/kafka_stop.sh
#!/bin/sh /usr/local/kafka_2.13-2.8.2/bin/kafka-server-stop.sh & sleep 3 #先停Kafka 再停 zookeeper 否则 kafka 停不掉 /usr/local/kafka_2.13-2.8.2/bin/zookeeper-server-stop.sh
修改脚本执行权限
chmod 775 kafka_start.sh chmod 775 kafka_stop.sh
验证脚本
sh /usr/local/kafka_2.13-2.8.2/kafka_start.sh
设置开机启动
vi /etc/rc.d/rc.local
# 添加如下脚本 sh /usr/local/kafka_2.13-2.8.2/kafka_start.sh &
验证端口
查看多端口状态
netstat -ntpl | grep ':2181|:9020' [root@kafka1host ~]# netstat -ntpl | grep ':2181|:9092' tcp6 0 0 172.16.3.203:9092 :::* LISTEN 5419/java tcp6 0 0 :::2181 :::* LISTEN 4182/java