Kafka部署安装

一、环境准备

1、jdk 8+

2、zookeeper 

3、kafka

说明:在kafka较新版本中已经集成了zookeeper,所以不用单独安装zookeeper,只需要在kafka文件目录中启动zookeeper即可

二、下载地址

https://kafka.apache.org/downloads

Kafka部署安装

 

 

三、部署

1、启动zookeeper

-- 启动 ./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties  -- 查看是否启动成功 ps -ef | grep zoo

2、进入解压的kafka目录,修改/config/kafka-server的配置文件

vi config/server.properties

-- 重点配置节点说明  listeners=PLAINTEXT://localhost:9092   -- 将localhost修改为主机ip  log.dirs=/bigdata/kafka/logs-1  -- 默认不修改也可

Kafka部署安装

 

 3、使用kafka-server-start.sh,启动kafka服务

./bin/kafka-server-start.sh config/server.properties

四、使用客户端kafka tools连接kafka

客户端下载地址:https://www.kafkatool.com/download.html

关于客户端如何使用可查看:https://www.cnblogs.com/frankdeng/p/9452982.html

Kafka部署安装

 

 

 

 

Kafka部署安装

 

 

 

 

五、kafka实战简单使用

 

NuGet:Confluent.Kafka

 1、新建.Net Core控制台项目,代码如下:

 

 

static void Main(string[] args)         {             // 发送消息             var producerConfig = new ProducerConfig             {                 BootstrapServers = "192.168.140.131:9092",                 MessageTimeoutMs = 50000             };             var builder = new ProducerBuilder<string, string>(producerConfig);             using (var producer = builder.Build())             {                 var data = new { key = "1", value = "001" };                 var json = JsonConvert.SerializeObject(data);                 var dr = producer.ProduceAsync("order", new Message<string, string> { Key = "order", Value = json }).GetAwaiter().GetResult();                 Console.WriteLine($"发送事件{dr.Value}到{dr.TopicPartitionOffset}成功");             }              // 消费消息             var consumerConfig = new ConsumerConfig {                 BootstrapServers = "192.168.140.131:9092",                 AutoOffsetReset=AutoOffsetReset.Earliest,                 GroupId="1111", // 自定义                 EnableAutoCommit=true             };             var consumerBuilder = new ConsumerBuilder<string, string>(consumerConfig);               using (var consumer = consumerBuilder.Build())             {                 // 1、订阅                 consumer.Subscribe("order");                 while (true)                 {                     try                     {                         // 2、消费(自动确认)                         var result = consumer.Consume();                          // 3、业务逻辑                         string key = result.Key;                         string value = result.Value;                          Console.WriteLine($"创建商品:Key:{key}");                         Console.WriteLine($"创建商品:Order:{value}");                         consumer.Commit(result);                      }                     catch (Exception e)                     {                         Console.WriteLine($"异常:Order:{e}");                     }                 }             }         }

 

2、使用kafka客户端查看消息投递

Kafka部署安装

 

发表评论

相关文章