开心一刻
今天小学女同学给我发消息
她:你现在是毕业了吗
我:嗯,今年刚毕业
她给我发了一张照片,怀里抱着一只大橘猫
她:我的眯眯长这么大了,好看吗
我:你把猫挪开点,它挡住了,我看不到
她:你是 sb 吗,滚
我解释道:你说的是猫呀
可消息刚发出,就出现了红色感叹号,并提示:消息已发出,但被对方拒收了

今天小学女同学给我发消息
她:你现在是毕业了吗
我:嗯,今年刚毕业
她给我发了一张照片,怀里抱着一只大橘猫
她:我的眯眯长这么大了,好看吗
我:你把猫挪开点,它挡住了,我看不到
她:你是 sb 吗,滚
我解释道:你说的是猫呀
可消息刚发出,就出现了红色感叹号,并提示:消息已发出,但被对方拒收了

出于简单考虑,基于 docker 搭建一个 kafka 节点;因为一些原因,国内的 Docker Hub 镜像加速器都不可用了,目前比较靠谱的做法是搭建个人镜像仓库,可参考:Docker无法拉取镜像解决办法,我已经试过了,是可行的,但还是想补充几点
sync-image-example.yml 只需要修改最后的镜像拷贝,其他内容不需要改

支持一次配置多个镜像的拷贝
镜像拷贝
docker 镜像拷贝命令的格式
skopeo copy docker://docker.io/命名空间/镜像名:TAG docker://阿里云镜像地址/命名空间/镜像名:TAG
我们以 kafka 为例,去 Docker Hub 一搜,好家伙,搜出来上万个

我们将搜索条件精确化一些,搜 wurstmeister/kafka

点进去,它在 Docker Hub 的地址是:
那它的 docker 地址就是
docker://docker.io/wurstmeister/kafka
其他的镜像用类似的方式去找,所以最终的拷贝命令类似如下:
skopeo copy docker://docker.io/wurstmeister/kafka:latest docker://registry.cn-hangzhou.aliyuncs.com/qingshilu/wurstmeister_kafka:latest
如果一切顺利,那么在我们的阿里云个人镜像仓库就能看到我们拷贝的镜像了

如何 pull
在个人仓库点镜像名,会看到 操作指南

我们只关注前两步,就可以将镜像 pull 下来

镜像获取到之后,就可以搭建 kafka 了;因为依赖 zookeeper,我们先启动它
docker run -d --name zookeeper-test -p 2181:2181 --env ZOO_MY_ID=1 -v zookeeper_vol:/data -v zookeeper_vol:/datalog -v zookeeper_vol:/logs registry.cn-hangzhou.aliyuncs.com/qingshilu/wurstmeister_zookeeper
然后启动 kafka
docker run -d --name kafka-test -p 9092:9092 --env KAFKA_ZOOKEEPER_CONNECT=192.168.2.118:2181 --env KAFKA_ADVERTISED_HOST_NAME=192.168.2.118 --env KAFKA_ADVERTISED_PORT=9092 --env KAFKA_LOG_DIRS=/kafka/logs -v kafka_vol:/kafka registry.cn-hangzhou.aliyuncs.com/qingshilu/wurstmeister_kafka
不出意外的话,都启动成功

如果出意外了,大家也别慌,用 docker log 去查看日志,然后找对应的解决方案
# 1.先找到启动失败的容器id docker ps -a # 2.用 docker log 查看容器启动日志 docker log 容器id
如果需要开启 kafka 的 SASL 认证,可参考:Docker-Compose搭建带SASL用户密码验证的Kafka 来搭建
详情可查看:kafka可视化客户端工具(Kafka Tool)的基本使用

创建 Topic:test-topic,并发送一条消息

此时 test-topic 中有 1 条消息
代码很简单
/** * @author: 青石路 */ public class MsgConsumer { private static final Logger LOGGER = LoggerFactory.getLogger(MsgConsumer.class); public static void main(String[] args) { Properties props = new Properties(); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.2.118:9092"); props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test_group"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // 如果在kafka中找不到当前消费者的偏移量,则设置为最旧的 props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props); // 订阅主题 consumer.subscribe(Collections.singleton("test-topic")); ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); LOGGER.info("records count = {}", records.count()); records.forEach(record -> LOGGER.info("{} - {} - {}", record.offset(), record.key(), record.value())); // consumer.commitAsync(); consumer.close(); } }
我们执行下,输出日志如下

竟然 poll 不到消息,为什么呀?

我们调整下代码,循环 poll
while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); LOGGER.info("records count = {}", records.count()); records.forEach(record -> LOGGER.info("{} - {} - {}", record.offset(), record.key(), record.value())); }
我们再执行下,输出日志如下

消费者 poll 的过程中会先判断当前消费者是否在 消费者组 中,如果不在,会先加入消费者组,在加入过程中,ConsumerCoordinator 会对这个消费者组 Rebalance,整个过程中该消费者组内的所有消费者都不能工作,而 poll 又配置了超时时间(100 毫秒),如果在超时时间内,当前消费者还未正常加入消费者组中,那么 poll 肯定是拉取不到数据的;根据日志可以看出,第 3 次 poll 的时候,消费者已经正常加入消费者组中,那么就能 poll 到数据了
很多小伙伴可能可能会有这样的疑问
平时在项目中使用的时候,从来没有感受到这样的问题,为什么呢
原因有以下几点
所以你们感受不到这样;但如果某些场景下,比如 DataX 从 kafka 读数据
消费者要不断新建,那么 poll 不到数据的异常情况的占比就会上来了,那就需要通过一些机制来降低其所造成的的影响了,比如说重试机制
评论已关闭。