[TSDB] InfluxDB 概述:主要特点、架构、核心原理

0 序

  • 笔者在项目中使用/运维 InfluxDB 和 OpenGemini 时序数据库已有些年头了,虽然对其数据库源码研究得还不算特别精深,但仍有必要沉淀一二,总结一二了。

此篇主要针对 influxdb v1。如无特殊说明,则默认基于 influxdb v1.7.5 进行源码、原理和架构的分析。
可对标 opengemini v1.2.0(亲测,但不完全兼容)。

  • 持续完善中。

1 概述:InfluxDB

InfluxDB

  • InfluxDB 是一个用Go语言编写的、开源的、分布式的、(事件、指标)、时间序列(time series database, TSDB)数据库,无需外部依赖;主要处理较高的写入和查询负载,用来存放监控数据。
  • 主要用作大量时间戳数据的任何用例的后备存储,例如:DevOps 监控、应用程序指标、Lot传感器数据和实时分析。
  • 客观来看,目前InfluxDB还是事实上的业界标准,其也一直在DB-Engines时序型数据库排行榜上排名第一。
  • InfluxDB的母公司是InfluxData,公开信息显示其成立于2013年,其目标是希望解决时间序列数据高性能的存储和查询问题

[TSDB] InfluxDB 概述:主要特点、架构、核心原理

发展历程

[TSDB] InfluxDB 概述:主要特点、架构、核心原理

  • 亮点
  • 存储引擎: 基于LevelDB => 默认 RocksDB => 放弃难以推进的 Facebook RocksDB / BlotDB 引擎 => 自研 TSM 存储引擎
  • 自研 TSI 索引
  • 自研 函数式语言 Flux
  • ...
  • 2013年,InfluxDB产品首次发布了V0.0.1版本,客观来讲这更多的还是个非常初级的产品,其存储引擎最开始也是选择了LevelDB,同年其完成了天使轮融资。由于产品的问题,2014年其发布的V0.8.0版本,启用了多存储引擎,默认RocksDB,同时也支持LevelDB。

  • 在2014到2015年间(V0.8.0 ~ V0.9.0),InfluxDB曾主动找到Facebook RocksDB团队,希望其尽快优化压缩等事项,但一直未果。所以在2015年,InfluxDB在V0.9.0启用单一存储引擎BoltDB的版本发布后,其V0.9.5.1启用自研存储引擎TSM。直到2016年的09月06日InfluxDB的V 1.0 正式版发布才发布。我们可以看到,作为行业头部时序数据库厂商的InfluxDB在13至16年持续试错,历经3年多,几经周折,最终才找到了自己的方向。也正是在其1.0正式版发布之后,InfluxDB于2016年09月21日完成了1600万美元的B轮融资。

  • 时间来到2018年,InfluxDB拓展至EMEA市场,并发布了InfluxDB on AWS,我们可以看到在产品相对稳定后,InfluDB的扩张脚步加快,2018年02月03其也完成3500万美元的C轮融资。2019年其开立了伦敦办公室并建立EMEA总部,资本上完成了6000万美元的D轮融资。2020年InfluxDB拓展至APAC市场,也发布了Azure、Google上的云版本。我们可以看到18年~20年是InfluxDB的高速发展期。但与此同时,其也并非没有隐忧,面对时间线膨胀问题,面对由于自己分布式闭源而造成的用户流失问题等,其目前的版本仍有显著得不足,且有些积重难返。因此2020年11月20日,InfluxDB官宣自己要研发新产品IOx,用以解决其目前产品所存在得问题,但目前经过一年半左右的研发,产品进展不容乐观,一拖再拖。

  • 中国TSDB数据库厂商的启示。

首先,我觉得大家还是需要客观承认InfluxDB目前是业内的老大,而且其之前在底层存储选型上的多次试错,客观上也帮后来者踩了很多坑,其对于产品的思考是值得我们学习借鉴的。
其次,对于商业开源厂商如何选择商业模式的问题,还需谨慎。因为在Feature-Based模式下,哪些功能开源哪些闭源,对客户流失程度的影响并不好判断那么准确,正如InfluxDB发现问题时,其又因为代理商以及股东等各种利益问题,而造成企业并不好掉头。
最后,我们也应该看到机会,实际上InfluxDB自己也在不停的推翻自己,自己做变革。面对物联网时代时间线膨胀问题,其再一次启程出发要做新产品IOx,而伴随着美国通胀问题的严重,其人员流失等问题也造成了IOx进度不如人意,这正是中国TSDB厂商的机会,在面向时代巨变的今天,大家还是站在同一起跑线上,我们完全可以吸收借鉴他们之前的经验和教训,实现弯道超车。

主要特色

  • 专为时间序列数据量身打造的高性能数据存储
  • TSM引擎提供数据高速读写和压缩等功能。
  • 基于时间序列,支持min, max, sum, count, mean, median 等系列函数,方便统计
  • 简单、高性能的查询/写入 HTTP API。

原生的HTTP支持,内置HTTP API

  • 针对时序数据,量身打造类SQL的查询语言,轻松查询聚合数据。

富有强大的SQL语句,轻松查询聚合数据。

  • 允许对tag建索引,实现快速有效的查询。

  • 数据保留策略(Retention policies)能够有效地使旧数据自动失效。

有效地自动处理掉过期数据。

  • 外部依赖少:用GO语言编写,可以编译一个没有外部依赖项的二进制文件。
  • 无结构(无模式):可以是任意数量的列,表字段可自由扩展(普通field)

绝大部分数据库不支持此特性。

  • 可拓展的
  • 插件支持其他数据摄取协议,例如:Graphite、collected 和 OpenTSDB。
  • 连续查询会自动计算聚合数据,从而使频繁查询更加有效。
  • 可度量性:你可以实时对大量数据进行计算
  • 基于事件:它支持任意的事件数据
  • 自带管理界面,方便使用

局限性

  • 不支持 UNIONJOINHAVING等SQL语法

客户端

Java

依赖坐标

  • Maven
<!-- influxdb-java 	https://github.com/influxdata/influxdb-java/blob/influxdb-java-2.22/src/main/java/org/influxdb/dto/Point.java#concatenatedFields (可做的写入性能优化: 针对 Double / Float / BigDecimal,取消 format to String 的方式)  	{@link com.xxx.sink.MyInfluxDBSinkHistory.invoke } 	↘ {@link com.xxx.datasource.InfluxDbConnection.insertBatchAfterHealthCheck } 	↘ {@link com.xxx.datasource.InfluxDbConnection.insertBatch(java.util.Collection<org.influxdb.dto.Point>) } 	↘ {@link org.influxdb.InfluxDB.write(org.influxdb.dto.BatchPoints) } 	↘ {@link org.influxdb.impl.InfluxDBImpl.write(org.influxdb.dto.BatchPoints) } 	↘ {@link org.influxdb.dto.BatchPoints.lineProtocol } 	↘ {@link org.influxdb.dto.Point.lineProtocol(java.util.concurrent.TimeUnit) } 	↘ {@link org.influxdb.dto.Point.concatenatedFields }  --> <dependency> 	<groupId>org.influxdb</groupId> 	<artifactId>influxdb-java</artifactId> 	<version>2.22</version> </dependency> 

2 工作原理与架构剖析

核心概念

InfluxDB vs 传统数据库

InfluxDB 传统数据库概念 区别
database database(数据库)
measurement table(数据表) 最明显的区别: 无单独的创建measurement的方法,直接新增一条数据时,若measurement不存在,则直接创建并插入一条数据
point := timestamp + tag + field record(记录;行,表里面的一行数据) 在influxDB中,表示: 每个表中,某个时刻,满足某个条件的filed数据(简单来说就是 timestamp + tag + filed)的组成一个point

Database

  • 1个数据库,含多个 保留策略、measurement。

Retention Policy/RP

  • Retention Policy(数据保留策略):用以自动处理过期数据的存储策略。
  • influxdb 每个新建的数据库都会有一个对应的数据保留策略(retention policy),该策略用来管理数据库中时间过期的数据;如果没有指定策略,数据库会有默认的保留策略autogen

[TSDB] InfluxDB 概述:主要特点、架构、核心原理

[TSDB] InfluxDB 概述:主要特点、架构、核心原理

最初autogen保留策略的 shargGroupDuration 为默认值7day,duration=0h/永久。

[TSDB] InfluxDB 概述:主要特点、架构、核心原理

后来autogen保留策略的 shardGroupDuration 为1h,duration=1h

Measurement (table)

  • 1个 Measurement(表) 属于 1个 Retention Policy(保留策略)对应的 Database(数据库)

Shards : 分片[time + measurement]

  • shard是influxdb存储引擎TSM的具体实现,负责数据的编码存储、读写服务等。

Shard与TSM文件

  • shard包含实际的编码和压缩数据,并由磁盘上的TSM(Time Sort Merge)文件表示。
  • 将infuxdb中时间序列化的数据按照时间的先后顺序存入到shard中,每个shard都负责influxdb中一部分的数据存储工作,并以 tsm 文件的表现形式存储在物理磁盘上;每个存放了数据的shard都属于唯一的、一个shard group(即 多个shard可能存在于单个shard group中)。
  • TSM Tree是专门为influxdb构建的数据存储格式。

与现有的B+ treeLSM tree实现相比,TSM tree具有更好的压缩和更高的读写吞吐量。

  • 每个shard包含一组特定的series

给定shard group中的给定series上的所有 Point 将存储在磁盘上的相同 shard(TSM文件)中。

[TSDB] InfluxDB 概述:主要特点、架构、核心原理

  • shard 主要由4部分组成: Cache、Wal、Tsm file、Compactor。

shard group (分片组)

  • 数据存储在shardGroup的时间跨度。shardGroupinfluxdb的一个逻辑存储结构、逻辑容器,其下包含多个shard
  • 每一个shard group都有一个不重叠的时间跨度,数据根据不同的时间跨度存储在不同的shard group中。
  • 数据保留策略提供了一个简单高效的方法来清除influxdb数据库中过期数据,一旦数据超过过期时间,数据会自动从influxdb中清除,而过期数据清除的时间单位以"shard group duration"为单位。

  • shard group 负责指定时间跨度的数据存储,这个shard时间跨度(shard duration)就由上文提到的创建RP时指定。如果没有指定,系统将通过RP的数据保留时间来计算。

[TSDB] InfluxDB 概述:主要特点、架构、核心原理

  • 不同shard group的时间跨度不会重叠。shard group实现了数据按时间分区,这样做的目的是什么?
  • 一定程度上缓解数据写入热点问题
  • 加快数据删除的效率
  • 将数据按照时间分割成小的粒度会使得数据过期实现非常简单,InfluxDB中过期数据删除的执行粒度就是Shard Group,系统会对每一个Shard Group判断是否过期,而不是一条一条记录判断。

[TSDB] InfluxDB 概述:主要特点、架构、核心原理

图: 仅保留1个月数据的保留策略,(Retention Policy) Duration != Shard Group Duration

shard磁盘存储统计分析

  • 基于华为云 GaussDB for INFLUXDB (基于 OpenGemini)的shard统计:

[TSDB] InfluxDB 概述:主要特点、架构、核心原理

[TSDB] InfluxDB 概述:主要特点、架构、核心原理

select       *   from "_internal"."monitor"."shard"   where "database" = 'xxx_dwd' and id='3467'   order by time desc  

Series := tag column sets

  • Series: retentionPolicy(RP)、measurement、tag set(tag key +tag value)的唯一组合。

  • Series的数量 := RP*measurement*tagSet

  • Series:所有在数据库的数据都是通过图表来展示,而这个series表示这个表里面的数据,可以在图表上画成几条线:通过tags排列组合算出来。

[TSDB] InfluxDB 概述:主要特点、架构、核心原理

  • 一般来讲,监控对象稳定后,Series基本是固定的。

Influxdb将Series放在内存作为索引,加快了数据查询,这使得Series的数量不能太大;否则,Influxdb的内存会被撑爆;默认单个databaseSeries限制为<100W个。

Point : 记录/record

  • Point:时间点数据,数据库中的一行数据。其由 时间戳(time)、标签(tags)、普通字段(field)组成,如下图所示:
point属性 传统数据库概念
time(时间戳,单位:ns) 每个数据记录时间,自动生成,是数据库的主索引(传统数据库的主键) 每个记录都必然有这个属性,没有显示添加时,默认给一个
field 各种记录值(没有索引的属性)
tags 各种有索引的属性

time/时间列

概念解释
  • 每个Point(记录)都必然有这个属性(time),没有显示添加时,默认给一个。
UTC时间
  • Influxdb采用UTC存储时间。

UTC 时间就是世界标准时间
UTC 时间戳就是以 1970-01-01T00:00:00 为起点的毫秒数,不考虑闰秒。时间戳不分时区,在任意时间点,不同时区的 UTC 时间戳是相同的。

  • 时间戳 0 对应的时刻是:Thu Jan 01 1970 08:00:00 GMT+0800 (中国标准时间)。

  • 时间戳在转换为日期时,会根据本地时区,转换为不同的日期值。

  • 1970 年之前的日期对应的时间戳为负数,比如 Mon Jan 01 1900 08:00:00 GMT+0805 (中国标准时间) 对应的时间戳为 -2208989143000。

物理学的时间换算

如下概念与物理学概念有关,而INFLUXDB无关:

  • 1世纪=100年(year)
  • 1年(year)=12月(month)
  • 1月(month)=30天(day)=4周(week)
  • 1周(week)=7天(day)
  • 1天(day)=24小时(hour)
  • 1小时(hour)=60分钟(min)
  • 1分钟(min)=60秒钟(s, second)
  • 1秒钟(s)=1000毫秒(ms, millisecond)
  • 1毫秒(ms)=1000微秒(μs, microseconds)
  • 1微秒(μs)=1000纳秒(ns, nanosecond)
  • 1纳秒(ns)=1000皮秒(ps, picosecond)
时间精度管理
  • InfluxDB 作为时间序列数据库,在处理时间数据时,有特殊的机制和规范,在使用时需要注意。

precision[ns,u,ms,s,m,h]

  • 在写入和读取 influxdb 中的数据时,时间戳默认单位是纳秒,可以通过 precision 参数来指定为其他格式,比如 rfc3339 (YYYY-MM-DDTHH:MM:SS.nnnnnnnnnZ), h (小时), m (分钟), s (秒), ms (毫秒), u (微妙), ns (纳秒)。
配置时间精度的方式
  • 案例**** : CURL写入Point时,通过 precision 指定时间戳单位

使用 curl 来请求写入时,通过 precision 指定时间戳单位:

curl -i -XPOST "http://localhost:8086/write?db=mydb&precision=s" --data-binary 'mymeas,mytag=1 myfield=90 1463683075'  

这样,命令后面的时间戳数据 1463683075,会按照秒来解析。

如果不指定 precision,默认会按照纳秒来解析,就需要将 1463683075 转换为 1463683075000000000

  • 案例****: 命令行模式中指定 precision
$ influx -precision ms  

这样,所有在该命令窗口中执行的写入和查询语句对应的时间戳单位,都变为毫秒。

  • 案例**: -execute 命令中指定 precision
$ influx -execute 'SELECT * FROM "h2o_feet" LIMIT 3' -database="NOAA_water_database" -precision=rfc3339 name: h2o_feet  --------------  time			        level description	    location	   water_level  2015-08-18T00:00:00Z	     below 3 feet		    santa_monica	 2.064  2015-08-18T00:00:00Z	     between 6 and 9 feet    coyote_creek  8.12  2015-08-18T00:06:00Z	     between 6 and 9 feet    coyote_creek  8.005  
epoch_time、duration
  • 在 InfluxDB Select 语句中, epoch 值就是时间戳,只不过 epoch 有更丰富的语义,使用起来更方便灵活。 now() : 本地服务器对应的纳秒时间戳。
Epoch
  • epoch 0 (1970-01-01T00:00:00Z)常被用来表示无意义的时间戳,也就是 null。比如,当请求的结果中不包含时间戳时,time 值就会置为 0:
> SELECT MEAN("index") FROM "h2o_quality" GROUP BY location,randtag name: h2o_quality  tags: location=coyote_creek, randtag=1  time         mean  ----         ----  1970-01-01T00:00:00Z 50.69033760186263   name: h2o_quality  tags: location=coyote_creek, randtag=2   time          mean  ----          ----  1970-01-01T00:00:00Z  49.661867544220485   name: h2o_quality  tags: location=coyote_creek, randtag=3   time          mean  ----          ----  1970-01-01T00:00:00Z  49.360939907550076   name: h2o_quality  tags: location=santa_monica, randtag=1  time          mean  ----          ----  1970-01-01T00:00:00Z  49.132712456344585  
Duration
  • 概念解释

duration = integer + duration unit: 用来指定一段时间长度。
duration units = "ns" | "u" | "µ" | "ms" | "s" | "m" | "h" | "d" | "w" .

  • InfluxDB支持的Duration时间单位
单位 解释 样例值
w 4w
d 天/日 7d
h 小时 7200h
m 分钟 60m
s 1439856720s
ms 毫秒 1726799024823ms
u/µ 微秒 1726799024823000u
ns 纳秒 1531992939634316937ns 1439856720000000000 (默认单位:纳秒)
  • 使用场景

场景1:Retention Policy

在 InfluxDB 中,配置数据库的 Retention Policy 中会用到 duration,比如:

-- Create a retention policy.  CREATE RETENTION POLICY "10m.events" ON "somedb" DURATION 60m **REPLICATION 2    -- Create a retention policy and set it as the DEFAULT.  CREATE RETENTION POLICY "10m.events" ON "somedb" DURATION 60m REPLICATION 2 DEFAULT  -- Create a retention policy and specify the shard group duration.  CREATE RETENTION POLICY "10m.events" ON "somedb" DURATION 60m REPLICATION 2 SHARD DURATION 30m 

场景2:Select语句

在 select 语句中也会用到 duration:

SELECT mean("value") FROM "cpu" GROUP BY region, time(1d) fill(0) tz('America/Chicago')  SELECT MEAN(value) FROM cpu GROUP BY time(10m) SELECT "water_level" FROM "h2o_feet" WHERE "location" = 'santa_monica' WHERE time > 24043524m - 6m  
epoch_time
  • epoch time 就是 UTC 时间戳,默认单位为纳秒,可以通过 epoch_time 值后面跟 duration unit 来指定 precision。

epoch_time 支持基本的算术运算,比如 + 或 -,特别需要注意的是,influxQL 要求运算符与 epoch_time 之间要有空格。

time format 为 rfc3339:

> SELECT "water_level" FROM "h2o_feet" WHERE "location" = 'santa_monica' AND time >= '2015-08-18T00:00:00.000000000Z' AND time <= '2015-08-18T00:12:00Z'  name: h2o_feet  time          water_level  ----          -----------  2015-08-18T00:00:00Z  2.064  2015-08-18T00:06:00Z  2.116  2015-08-18T00:12:00Z  2.028  
  • 没有指定 duration unit时,默认为纳秒
> SELECT "water_level" FROM "h2o_feet" WHERE "location" = 'santa_monica' AND time >= 1439856000000000000 AND time <= 1439856720000000000   name: h2o_feet  time          water_level  ----          -----------  2015-08-18T00:00:00Z  2.064  2015-08-18T00:06:00Z  2.116  2015-08-18T00:12:00Z  2.028  

通过添加 duration unit 来指定 precision

> SELECT "water_level" FROM "h2o_feet" WHERE "location" = 'santa_monica' AND time >= 1439856000s AND time <= 1439856720s  name: h2o_feet  time          water_level  ----          -----------  2015-08-18T00:00:00Z  2.064  2015-08-18T00:06:00Z  2.116  2015-08-18T00:12:00Z  2.028  

对 epoch_time 进行简单的运算:

> SELECT "water_level" FROM "h2o_feet" WHERE time > 24043524m - 6m  name: h2o_feet  time          water_level  ----          -----------  2015-09-18T21:24:00Z  5.013  2015-09-18T21:30:00Z  5.01  

使用 now() 函数来指定相对时间:

> SELECT "water_level" FROM "h2o_feet" WHERE time > now() - 1h  
参考文献

tag(标签)

  • tag(标签): kv结构,在database中, tag + measurement 一起构建索引,参与索引创建。

因此,适合作为查询的过滤条件

  • tag的数据量不要太多,最好能有典型的辨别性(和mysql的建立索引的原则差不多)
  • tag value只能为String类型
  • tags可选的,在measurement中不设置tag也是可以的,但是强烈建议你用上它,因为tag是有索引的,tags相当于SQL中的有索引的列。

field (普通字段/列)

  • field,存储数据: kv结构
数据类型
  • 支持的数据类型共计4种: long(integer), String, boolean, float

  • 在influxdb中,字段必须存在

注意,字段是没有索引的。如果使用字段作为查询条件,会扫描符合查询条件的所有字段值,性能不及tag。类比一下,fields相当于SQL中没有索引的列

类型转换
  • SELECT子句支持使用语法::指定field的类型和基本的类型转换操作。

  • 语法

SELECT _clause <field_key>**::<type>** FROM_clause  

语法描述

  • type可以是floatintegerstringboolean

在大多数情况下,如果field_key没有存储指定type的数据,那么TSDB For InfluxDB®将不会返回任何数据。
请参见 转换 获得更多相关信息。

  • 示例
> SELECT "water_level"::float FROM "h2o_feet" LIMIT 4 name: h2o_feet --------------  time water_level 2015-08-18T00:00:00Z 8.12  2015-08-18T00:00:00Z 2.064  2015-08-18T00:06:00Z 8.005  2015-08-18T00:06:00Z 2.116 

该查询返回field key water_level为浮点型的数据。

  • 类型转换

语法::允许用户在查询中执行基本的类型转换。目前,Aliyun TSDB For InfluxDB(2.0.3)®仅支持field value做如下转换:

Integer/Long --> float/Double
Float/Double --> integer/Long

其他的转换情况,只能在查询后通过程序进行转换了。

例如:将浮点型的field value转换成字符串(不支持该功能)

> SELECT "water_level"::string FROM "h2o_feet" LIMIT 4 

因为不支持将浮点型的field value转换成字符串,所以该查询不返回任何数据。

特色函数

  • influxdb函数分为聚合函数选择函数转换函数预测函数等。

除了与普通数据库一样提供了基本操作函数外,还提供了一些特色函数以方便数据统计计算,下面会一一介绍其中一些常用的特色函数。

  • 聚合函数:FILL(), INTEGRAL(),SPREAD(), STDDEV(),MEAN(), MEDIAN()等。
  • 选择函数: SAMPLE(), PERCENTILE(), FIRST(), LAST(), TOP(), BOTTOM()等。
  • 转换函数: DERIVATIVE(), DIFFERENCE()等。
  • 预测函数:HOLT_WINTERS()。

关系辨析: (Retention Policy) Duration != Shard Group Duration , 数据删除场景

  • 分片组的数据保留单位时长,最小为1h(retention policy duration must be at least 1h0m0s)。
  • 如果设置为0,数据永久保存(官方默认 RP ),否则过期清理。
  • InfluxDB没有提供直接删除更新数据的接口,数据只能通过RP进行删除
  • 此举还可加快数据按时间维度查找的效率
  • 实现了将数据按照时间分区的特性。将时序数据按照时间分区是时序数据库一个非常重要的特性,基本上所有时序数据查询操作都会带有时间的过滤条件,比如查询最近一小时或最近一天,数据分区可以有效根据时间维度选择部分目标分区,淘汰部分分区.

关系辨析: Shard = Cache + Wal + Tsm file + Compactor

[TSDB] InfluxDB 概述:主要特点、架构、核心原理

  • shard 主要由4部分组成: Cache、Wal、Tsm file、Compactor。

关系辨析:retention policy(保留策略)、shard(分片)、shard group(分片组)

  • retention policy duration must be greater than the shard duration
  • 译文:在一个retention policy(RP)中,如果指定保留时间(duration)为24小时,每个shard duration为1小时,即每个shard的时间跨度为1个小时,那么总共有24个时间跨度为1小时的shard,在触发数据的 RP 后,删除最早的时间跨度的shard。

则: shard group 中对应的就会存在24个shard,每次到达过期时间,就会删除最早的shard,并生成一个新的shard。

  • 解释:数据保留策略提供了一个简单高效的方法用来清除InfluxDB数据库中的过期数据,一旦shard数据超过过期时间,数据会自动从InfluxDB中清除,而过期数据清除的时间单位以shard group的duration为单位。

关系辨析: shard vs. measurement

在 InfluxDB 中,shard 和 measurement 是两个重要的概念。

  • Shard 是 InfluxDB 中用于存储数据的最小单元

每个 shard 都是一个独立的文件,存储一个时间范围内的时序数据。InfluxDB 会根据数据的写入时间自动将数据分配到不同的 shard 中。

  • Measurement 是 InfluxDB 中用于组织数据的概念,类比关系型数据库中的【表】。

它可以理解为一个指标或事件的类型。
例如,CPU 使用率、内存使用率、网络流量等都是不同的 measurement。

  • 数量关系
  • 一个 shard 可以存储多个 measurement 的数据。
  • 一个 measurement 的数据可以分散存储在多个 shard 中。
  • 具体来说,InfluxDB 是根据以下规则将Point(时间点数据)分配到 shard 中的:
  • 根据数据的写入时间。InfluxDB 会根据数据的写入时间戳将数据分配到不同的 shard 中。每个 shard 存储一个时间范围内的时序数据。
  • 根据数据所属的 measurement。InfluxDB 会根据数据的 measurement 将数据分配到不同的 shard 中。这样可以确保相同 measurement 的数据都存储在同一个 shard 中,以便于查询。
  • Shard 和 measurement 的作用
  • Shard 可以提高 InfluxDB 的存储性能和 scalability。通过将数据分散存储在多个 shard 中,InfluxDB 可以并行处理数据写入和查询请求,从而提高性能。
  • Measurement 可以帮助用户更好地组织和管理数据。通过将数据按 measurement 分类,用户可以更轻松地找到所需的数据。
  • 一些有关 InfluxDB 中 shard 和 measurement 的最佳实践:
  • 尽量使用不同的 measurement 来存储不同的指标或事件类型。这样可以提高数据的组织性和可管理性。
  • 对于写入量较大的 measurement,可以考虑使用更大的 shard size。这样可以减少 shard 的数量,提高查询性能。
  • 定期清理过期的 shard。这样可以释放存储空间,提高 InfluxDB 的性能。

关系辨析: shard vs. shard group

  • 将InfluxDB中时间序列化的数据按照时间的先后顺序存入到shard中,每个shard中都负责InfluxDB中一部分的数据存储工作,并以tsm文件的表现形式存储在物理磁盘上,每个存放了数据的shard都属于一个shard group

  • shard group可以理解为存放shard的容器,所有的shard在逻辑上都属于这个shard group,每个shard group中的shard都有一个对应的时间跨度过期时间,每一个shard group都有一个默认的时间跨度,叫做shard group duration

架构设计

[TSDB] InfluxDB 概述:主要特点、架构、核心原理

  • httpd:influxdb内部所有的api请求均通过httpd接口对外提供服务。

  • influxql:influxdb内部实现了一个sql parser 模块,在数据读取写入过程中会对输入的sql进行解析。

  • metainfo:metainfo记录了influxdb所有的元信息,并且dump到某一个file中,元信息包括database name, retention policy, shard groups, user等。

  • index:tags的索引信息

  • retention:自动清理过期数据功能。

  • tsdb:influxdb中最核心的模块:存储引擎层

influxdb引擎层的思路基本类似于lsm tree,influxdb将其称之为tsm tree, lsm tree的介绍文章非常的多,这里不详细进行描述。

  • shard
  • influxdb将数据按照时间分区存储,每个分区称之为shard, 每个shard有各自的存储引擎存储数据,且相互独立。

作者在官方文档注明这么做的原因是为了快速删除过期数据,拆分为shard后删除数据只需要直接清理shard所有的数据文件即可。

  • 删除过期数据是时序数据库一个比较重要的的特性,如性能数据只保持最近一个月或者几个月的数据的需求。

架构设计和数据布局

每个 InfluxDB 用例都是独一无二的,您的 schema 也反映了这种独特性。通常,为查询设计的架构可以简化查询并提高查询性能。我们为大多数用例推荐以下设计指南:

数据存储位置(标签或字段)

  • 您的查询应指导您在 标签(Tag) 中存储哪些数据,以及在 字段(Field) 中存储哪些数据

  • 将常用的查询和分组(group() 或 GROUP BY)元数据存储在标签中。

  • 如果每个数据点包含不同的值,则将数据存储在字段中。

  • 将数值存储为字段(标签值 仅支持字符串值)。

避免过多的序列

  • InfluxDB 为以下数据元素建立索引以加速读取
  • measurement(测量)
  • 标签
  • 标签值 已建立索引,而 字段值 未建立索引。

这意味着按标签查询比按字段查询性能更高。但是,当创建过多的索引时,写入和读取都可能开始减慢。

  • 每个唯一的索引数据元素集形成一个序列键
  • 标签(Tag) 包含高度可变的信息,如唯一 ID、哈希和随机字符串,会导致大量的【序列(series)】,也称为【高 序列基数】
  • 高序列基数是许多数据库工作负载中高内存使用率的主要驱动因素。

因此,为了减少内存消耗,请考虑将高基数值存储在字段值中,而不是标签或字段键中。

使用推荐的命名约定

  • 在命名标签和字段键时,请使用以下约定
  • 避免在标签和字段键中使用保留关键字
  • 避免相同的标签和字段名称
  • 避免在 measurement(测量)和键中编码数据
  • 避免在一个标签中包含多个信息

避免在标签和字段键中使用保留关键字

并非必需,但避免在标签和字段键中使用保留关键字可以简化编写查询的过程,因为您不必将键括在双引号中。请参阅 InfluxQLFlux 关键字 以避免使用。

此外,如果标签或字段键包含 [A-z,_] 以外的字符,则必须在 InfluxQL 中将其括在双引号中,或在 Flux 中使用 方括号表示法。

避免标签和字段使用相同的名称

  • 避免对标签和字段键使用相同的名称。这通常会导致查询数据时出现意外行为。

如果您不小心为标签和字段添加了相同的名称,请参阅 常见问题解答,了解有关如何可预测地查询数据以及如何解决此问题的信息。

避免在 measurement(测量)和键中编码数据

比较架构
  • 比较以下以行协议表示的有效架构。
  • 推荐:以下架构将元数据存储在单独的 cropplotregion 标签中。temp 字段包含可变的数值数据。
Good Measurements schema - Data encoded in tags (recommended) ------------- weather_sensor,crop=blueberries,plot=1,region=north temp=50.1 1472515200000000000 weather_sensor,crop=blueberries,plot=2,region=midwest temp=49.8 1472515200000000000 
  • 不推荐:以下架构将多个属性(crop、plot 和 region)连接(blueberries.plot-1.north)在 measurement(测量)中,类似于 Graphite 指标。
Bad Measurements schema - Data encoded in the measurement (not recommended) ------------- blueberries.plot-1.north temp=50.1 1472515200000000000 blueberries.plot-2.midwest temp=49.8 1472515200000000000 
  • 不推荐:以下架构将多个属性(crop、plot 和 region)连接(blueberries.plot-1.north)在字段键中。
Bad Keys schema - Data encoded in field keys (not recommended) ------------- weather_sensor blueberries.plot-1.north.temp=50.1 1472515200000000000 weather_sensor blueberries.plot-2.midwest.temp=49.8 1472515200000000000 
比较查询

Flux 查询计算 north 区域中蓝莓的平均 temp

// Query *Good Measurements*, data stored in separate tag values (recommended) from(bucket: "<database>/<retention_policy>")   |> range(start:2016-08-30T00:00:00Z)   |> filter(fn: (r) =>  r._measurement == "weather_sensor" and r.region == "north" and r._field == "temp")   |> mean() 
  • 难以查询:不良 Measurement(测量) 需要使用正则表达式从 measurement(测量)中提取 plot 和 region,如以下示例所示。
// Query *Bad Measurements*, data encoded in the measurement (not recommended) from(bucket: "<database>/<retention_policy>")   |> range(start:2016-08-30T00:00:00Z)   |> filter(fn: (r) =>  r._measurement =~ /.north$/ and r._field == "temp")   |> mean() 
  • 复杂的 measurement(测量)使某些查询变得不可能。

例如,使用 不良 Measurement(测量) 架构无法计算两个地块的平均温度。

InfluxQL 示例查询架构
# Query *Bad Measurements*, data encoded in the measurement (not recommended) > SELECT mean("temp") FROM /.north$/  # Query *Good Measurements*, data stored in separate tag values (recommended) > SELECT mean("temp") FROM "weather_sensor" WHERE "region" = 'north' 

避免在一个标签中放入多个信息

  • 将包含多个信息的单个标签拆分为单独的标签,可以简化您的查询,并通过减少对正则表达式的需求来提高性能。

考虑以下以行协议表示的架构。

示例行协议架构
Schema 1 - Multiple data encoded in a single tag ------------- weather_sensor,crop=blueberries,location=plot-1.north temp=50.1 1472515200000000000 weather_sensor,crop=blueberries,location=plot-2.midwest temp=49.8 1472515200000000000 

架构 1 数据将多个单独的参数(plot 和 region)编码为一个长标签值 (plot-1.north)。将其与以下以行协议表示的架构进行比较。

Schema 2 - Data encoded in multiple tags ------------- weather_sensor,crop=blueberries,plot=1,region=north temp=50.1 1472515200000000000 weather_sensor,crop=blueberries,plot=2,region=midwest temp=49.8 1472515200000000000 
  • 使用 Flux 或 InfluxQL 计算 north 区域中蓝莓的平均 temp。架构 2 更可取,因为使用多个标签,您不需要正则表达式。
Flux 示例查询架构
// Schema 1 -  Query for multiple data encoded in a single tag from(bucket:"<database>/<retention_policy>")   |> range(start:2016-08-30T00:00:00Z)   |> filter(fn: (r) =>  r._measurement == "weather_sensor" and r.location =~ /.north$/ and r._field == "temp")   |> mean()  // Schema 2 - Query for data encoded in multiple tags from(bucket:"<database>/<retention_policy>")   |> range(start:2016-08-30T00:00:00Z)   |> filter(fn: (r) =>  r._measurement == "weather_sensor" and r.region == "north" and r._field == "temp")   |> mean() 
InfluxQL 示例查询架构
# Schema 1 - Query for multiple data encoded in a single tag > SELECT mean("temp") FROM "weather_sensor" WHERE location =~ /.north$/  # Schema 2 - Query for data encoded in multiple tags > SELECT mean("temp") FROM "weather_sensor" WHERE region = 'north' 

分片组持续时间管理

分片组持续时间概述

  • InfluxDB 将数据存储在分片组(shard group)中。
  • 分片组保留策略 (RP) 组织,并存储时间戳落在称为 分片持续时间 的特定时间间隔内的数据。
  • 如果未提供分片组持续时间,则分片组持续时间由创建 RP 时的 RP 持续时间 确定。默认值为:
RP 持续时间 分片组持续时间
< 2 天 1 小时
>= 2 天且 <= 6 个月 1 天
> 6 个月 7 天
  • 分片组持续时间也可以为每个 RP 配置。要配置分片组持续时间,请参阅 保留策略管理

分片组持续时间权衡

  • 确定最佳分片组持续时间需要在以下两者之间找到平衡
  • 较长分片带来的更好的整体性能
  • 较短分片提供的灵活性
长分片组持续时间
  • 较长的分片组持续时间使 InfluxDB 可以在同一逻辑位置存储更多数据。这减少了数据重复,提高了压缩效率,并在某些情况下提高了查询速度。
短分片组持续时间
  • 较短的分片组持续时间允许系统更有效地删除数据和记录增量备份。当 InfluxDB 执行 RP 时,它会删除整个分片组,而不是单个数据点,即使这些点比 RP 持续时间更旧。只有当分片组的持续时间结束时间早于 RP 持续时间时,才会删除分片组。

例如,如果您的 RP 持续时间为一天,则 InfluxDB 将每小时删除一小时的数据,并且始终有 25 个分片组。一天中的每小时一个,以及一个部分过期的额外分片组,但在整个分片组早于 24 小时之前不会删除。

注意: 要考虑的特殊用例:按时间过滤架构数据(例如标签、序列、measurement(测量))。例如,如果您想在一个小时的时间间隔内过滤架构数据,则必须将分片组持续时间设置为 1 小时。有关更多信息,请参阅 按时间过滤架构数据

分片组持续时间建议

  • 默认分片组持续时间适用于大多数情况。但是,高吞吐量或长时间运行的实例将受益于使用较长的分片组持续时间。以下是一些较长分片组持续时间的建议
RP 持续时间 分片组持续时间
<= 1 天 6 小时
> 1 天且 <= 7 天 1 天
> 7 天且 <= 3 个月 7 天
> 3 个月 30 天
无限 52 周或更长

注意: 请注意,INF(无限)不是 有效的分片组持续时间。在数据覆盖数十年且永远不会删除的极端情况下,像 1040w(20 年)这样的长分片组持续时间是完全有效的。

  • 设置分片组持续时间之前要考虑的其他因素
  • 分片组应为最频繁查询的最长时间范围的两倍
  • 每个分片组应包含超过 100,000 个 点(Point)
  • 每个分片组应包含每个 序列 超过 1,000 个点
用于回填的分片组持续时间
  • 批量插入过去大时间范围内的历史数据将立即触发创建大量分片。

并发访问写入数百数千个分片的开销可能会迅速导致【性能下降】和【内存耗尽】。

  • 在写入历史数据时,我们强烈建议临时设置较长的分片组持续时间,以便创建较少的分片

通常,52 周的分片组持续时间非常适合回填。

2 工程源码编译

3 进程启动流程

4 配置加载流程

5 HTTP 数据写入流程

InfluxDB的数据写入流程的主要/关键步骤:

step1 客户端: 数据准备

  • 客户端将数据格式化为InfluxDB支持的格式,如 Line Protocol 或 Point 对象。
  • 数据需包含测量名称(measurement)、标签(tag)、字段(field)和时间戳(timestamp)。例如:
measurement,tag_key=tag_value field_key=field_value timestamp 

或通过API构造Point对象:

var point = PointData.Measurement("temperature")  .Tag("location", "workshop1")  .Field("value", 25.6)  .Timestamp(DateTime.UtcNow, WritePrecision.Ns); 

step2 客户端/服务端: 连接与认证

  • 客户端: 通过HTTP/UDP等协议连接到InfluxDB服务器,并提供认证信息(如Token),建立写入会话。
  • 服务端:
  • http 请求是由 httpd 服务处理的,httpd 服务的源码目录是 services/httpd, 通过 Golang 原生的包 net/http 实现。
  • cmd/influxd/run/server.goNewServer() 函数中对 httpd 服务进行配置,并在 cmd/influxd/run/server.go 中的 Open 函数中调用 s.appendHTTPService(s.config.HTTPD) 加载和配置 httpd 服务,然后调用 service.Open() 启动 httpd 服务。
  • Handler.serverWrite() 函数中:
  • 调用 r.URL.Query().Get("db")获取写请求中指定的目标数据库的名称,然后调用 h.MetaClient.Database(database) 判断该数据库是否存在,如果该数据库不存在,报错返回。
  • 如果开启了认证功能,则调用 h.WriteAuthrizer.AuthorizeWrite(user.ID(), database) 进行认证。
  • 调用 models.ParsePointsWithPrecision(buf.bytes(), time.Now().UTC(), r.URL.Query().Get("precision")) 解析写请求 HTTP 报体中的时序数据记录。
  • 调用 r.URL.Query().Get("consistency") 获取写请求中指定的写一致性级别。
  • 最后调用 h.PointsWrite.WriterPoints(database, r.URL.Query().Get("rp") , consistency, user, points) 执行时序数据记录的写入操作。(详情参见 step3 及之后步骤)

step3 服务端: Shard路由

  • InfluxDB根据数据的时间范围和标签信息,将数据路由到对应的Shard。

Shard是数据存储的基本单元,负责处理特定时间范围和数据分区的写入请求。

step4 服务端: 写入 WAL 文件

  • 数据首先被追加写入WAL(Write-Ahead Log)文件,用于保证数据的持久性和故障恢复。

WAL记录了所有写入操作,即使系统崩溃,也能通过WAL恢复未持久化的数据。

step5 服务端: 缓存写入

  • 数据同时写入内存缓存(如Data Cache或Inverted Index Cache)。

缓存用于提高写入性能,减少磁盘I/O操作。

step6 服务端: 持久化到磁盘

  • 缓存达到一定大小或时间阈值时,数据从缓存Flush到磁盘,形成TSM(Time-Structured Merge Tree)文件。

TSM文件是InfluxDB存储时序数据的底层文件格式,采用列式存储和压缩技术,优化数据读取和查询性能。

step7 服务端: 构建索引

  • 数据写入过程中,Inverted Index引擎会构建倒排索引,用于支持多维查询。

索引记录了标签与数据点的映射关系,加速基于标签的过滤和查询操作。

step8 服务端: 返回写入结果

  • InfluxDB完成数据写入后,向客户端返回写入成功或失败的响应。

  • 客户端: 可根据响应判断数据是否成功写入。

章节总结 & 注意事项

  • InfluxDB 3.x 版本引入了存算分离架构数据持久化到对象存储(如S3、本地磁盘)或内存中。
  • 批量写入数据时,建议使用批量API(如 WritePoints )提高写入效率。
  • 数据写入性能Shard配置、缓存大小WAL设置等因素影响,需根据业务需求进行调优。

consistency: 写入的一致性级别

  • consistency参数,由client在request中传入,标识了shard有N个replica的情况下,如何确定shard是否写入成功。

  • 如果client没有传入consistency参数,server端默认ConsistencyLevelOne,即只要一个replica写入OK,就返回client成功。

  • consistency参数:

  • all: 所有的replica都写入成功则返回成功;
  • quorum: 大多数的replica写入成功,则返回成功;
  • one: 任何一个replica写入成功,则返回成功;
  • any: 任何一个replica写入成功,或者被写入Hinted-Handoff缓存,则返回成功;
  • 案例:以3节点,2replica为例:
Level required
all 2
quorum 2
one 1
any 1
// cluster/points_writer.go func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPolicy string,     consistency models.ConsistencyLevel, points []models.Point) error {     // The required number of writes to achieve the requested consistency level     required := len(shard.Owners)     switch consistency {     case models.ConsistencyLevelAny, models.ConsistencyLevelOne:         required = 1     case models.ConsistencyLevelQuorum:         required = required/2 + 1     }     ...... } 

推荐文献

6 HTTP 数据查询流程

Z 最佳实践-附件

附件: JDBC操作封装 | MyInfluxDbConnection

package com.xxx.datasource;  import com.xxx.datasource.entity.DataSource; import okhttp3.OkHttpClient; import org.influxdb.InfluxDB; import org.influxdb.InfluxDBException; import org.influxdb.InfluxDBFactory; import org.influxdb.dto.BatchPoints; import org.influxdb.dto.Point; import org.influxdb.dto.Pong; import org.influxdb.dto.Query; import org.influxdb.dto.QueryResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory;  import javax.annotation.Nullable; import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.concurrent.TimeUnit;  public class InfluxDbConnection {     private final static Logger log = LoggerFactory.getLogger(InfluxDbConnection.class);      //数据库名     private String database;     //用户名     private String username;     //密码     private String password;     //连接地址     private String url;     //数据存贮策略     private String retentionPolicy;      private int actions;      private int flushDuration;      //连接instance     private InfluxDB influxDB;      //对 influxDb 连接,最近一次健康检测的时间     private Long latestHealthCheckTime = -1L;     private Boolean latestHealthCheckResult = true;//初始化状态: 健康      public InfluxDbConnection(String database, String username, String password, String url, String retentionPolicy,int actions, int flushDuration) {         this.database = database;         this.username = username;         this.password = password;         this.url = url;         this.retentionPolicy= retentionPolicy;         this.actions = actions;         this.flushDuration = flushDuration;         createConnection(actions, flushDuration);     }      public InfluxDbConnection(String database, String username, String password, String url, String retentionPolicy) {         this(database, username, password, url, retentionPolicy, DataSource.ACTIONS_DEFAULT, DataSource.FLUSH_DURATION_DEFAULT );     }      public InfluxDbConnection(DataSource dataSource) {         this(             dataSource.getDatabase(), dataSource.getUsername(), dataSource.getPassword(), dataSource.getUrl()             , dataSource.getRetentionPolicy() , dataSource.getActions() , dataSource.getFlushDuration()         );     }  //    private void createConnection(){ //        influxDB = InfluxDBFactory.connect(url, username, password); //        influxDB.setRetentionPolicy(retentionPolicy); //        influxDB.enableBatch(10000,1000, TimeUnit.MILLISECONDS); //    }      /**      * 创建连接      * @param actions 批量写入的 point 个数      * @param flushDuration 最晚刷新时间 (单位: 时间)      */     public static InfluxDB createConnection(         String url, String username,String password         , String database, String retentionPolicy         , int actions, int flushDuration     ){         InfluxDB influxDB = null;         //influxDB = InfluxDBFactory.connect(url, username, password);         OkHttpClient.Builder httpClient = new OkHttpClient.Builder();         //httpClient.connectTimeout(100, TimeUnit.SECONDS)         //        .readTimeout(100, TimeUnit.SECONDS)         //        .writeTimeout(100, TimeUnit.SECONDS);         influxDB = InfluxDBFactory.connect(url, username, password, httpClient);          influxDB.setDatabase(database);         influxDB.setRetentionPolicy(retentionPolicy);          influxDB.setConsistency( InfluxDB.ConsistencyLevel.ONE );         influxDB.setLogLevel(InfluxDB.LogLevel.BASIC);          influxDB.enableBatch(actions,flushDuration, TimeUnit.MILLISECONDS);          return influxDB;     }      private void createConnection(int actions, int flushDuration){         if(this.influxDB == null){             this.influxDB = createConnection(url, username, password, database, retentionPolicy, this.actions, this.flushDuration );             this.health( -1, 1, false );//创建完成后对其进行健康检测         }     }      private void createConnection(){         createConnection(this.actions, this.flushDuration);     }      //重建连接     private InfluxDB recreateConnection(){         if( this.influxDB != null ){             this.influxDB.close();//先尝试关闭原来的连接             this.influxDB = null; //置空         }         this.influxDB = createConnection(url, username, password, database, retentionPolicy, this.actions, this.flushDuration );         return influxDB;     }      /**      * 向数据库写数据      * @reference-doc      *  [1] https://github.com/influxdata/influxdb-java/blob/influxdb-java-2.22/src/main/java/org/influxdb/impl/InfluxDBImpl.java      *  [2] https://www.influxdata.com/blog/influxdb-java-client-performance-and-functionality-improvements/      * @note      *  1. [不支持-批量写] write(final String records) 方法 : 未利用到 InfluxDBImpl#enableBatch 的批量写特性      *  2. [支持-批量写] write(final Point point) 方法 : 能利用到 InfluxDBImpl#enableBatch 的批量写特性(可能会延后执行) [此处使用的方式]      *  3. [支持-批量写] write(BatchPoints batchPoints) 方法: 未利用 InfluxDBImpl#enableBatch 的批量写特性,而是由用户每次请求所控制,其每次请求的 points 即该批量写的所有数据 (请求时即执行)      *      BatchPoints.Builder builder = BatchPoints.database(database);      *      builder.points(points);      *      BatchPoints batchPoints = builder.build();      *      influxDB.write(batchPoints);      *      本方式,详情参见该方法: {@link #insertBatch(Collection) }      */     public void insert(Point point) {         this.influxDB.write(point);     }      public void insertBatch(Point point) {         this.influxDB.write(point);     }      /**      * 批量写入      * @note      * Point.Builder pointBuilder = Point.measurement( measurement );      * Point point = pointBuilder.build();      * Point[] points = new Point[]{ point };      * @param points      */     public void insertBatch(Point[] points){         insertBatch( Arrays.asList(points) );     }      public void insertBatch(Collection<Point> points){         InfluxDB influxDB = this.influxDB.enableGzip();         BatchPoints.Builder builder = BatchPoints.database(database);//BatchPoints.database(this.database);//静态方法,将对全局产生影响         builder.points(points);         BatchPoints batchPoints = builder.build();         influxDB.write(batchPoints);     }      //以行协议格式批量写入     public void insertBatchAsLineProtocol(List<String> points){         InfluxDB influxDBConnection = influxDB.enableGzip();         influxDBConnection.write(points);     }      /**      * 在健康检测后批量写入 point 数据      * @note      * @param points      * @param healthCheckMaxRetryCount      * @param throwExceptionWhenWriteFail 写失败时的是否抛异常      * @param errorMessageWhenWriteFailTemplate 写失败时的异常日志信息模板 (允许为 null)      * @param errorMessageWhenWriteFailArgs 写失败时的异常日志信息参数 (允许为 null)      * @return 是否写入成功(true:成功, false:失败)      */     public Boolean insertBatchAfterHealthCheck(         Collection<Point> points         , Integer healthCheckIntervalMs         , int healthCheckMaxRetryCount         , Boolean throwExceptionWhenWriteFail         , @Nullable String errorMessageWhenWriteFailTemplate         , @Nullable Object ... errorMessageWhenWriteFailArgs     ){         Boolean health = this.health( healthCheckIntervalMs, healthCheckMaxRetryCount, true);//健康检测,最大重试 healthCheckMaxRetryCount 次,如果最终仍检测失败将上抛异常         String errorMessage = null;         try {             this.insertBatch(points);         } catch (InfluxDBException exception){             //eg : org.influxdb.InfluxDBException$PointsBeyondRetentionPolicyException: partial write: points beyond retention policy dropped=1             String errorMessageWhenWriteFail = (errorMessageWhenWriteFailTemplate !=null) ? ( String.format( errorMessageWhenWriteFailTemplate, errorMessageWhenWriteFailArgs ) ): null;             errorMessage = (errorMessageWhenWriteFail != null? errorMessageWhenWriteFail + " " : "") + String.format(                 "Fail to write the points(size:%d) to influxdb when batch insert! health:%b, exception.isRetryWorth: %b, exception.message : %s"                 , points == null ? "null" : points.size(), health, exception.isRetryWorth(), exception.getMessage()             );             log.error( errorMessage + ", exception:" , exception.getMessage() );             if(throwExceptionWhenWriteFail) {                 throw new RuntimeException( errorMessage, exception );             }         }         return errorMessage == null ? true : false;     }      /**      * 在健康检测后批量写入 point 数据,且写失败时将重试      * @param maxRetryCount 最大重试次数      * @param retryIntervalMs 重试的间隔时间(单位:毫秒)      * @return      */     public Boolean insertBatchAfterHealthCheckWithRetry(         Integer maxRetryCount, Integer retryIntervalMs         , Collection<Point> points         , Integer healthCheckIntervalMs         , int healthCheckMaxRetryCount         , Boolean throwExceptionWhenWriteFail         , @Nullable String errorMessageWhenWriteFailTemplate         , @Nullable Object ... errorMessageWhenWriteFailArgs     ) throws InterruptedException {         Boolean executeResult = false;//执行是否完成及成功         Exception latestException = null;         String latestErrorMessage = null;//最近一次健康检查的异常日志         Integer retryCount = 0; //调用次数的计数器         while( (!executeResult) && retryCount < maxRetryCount) {//循环尝试             //尝试执行1次             try {                 executeResult = this.insertBatchAfterHealthCheck(points, healthCheckIntervalMs, healthCheckMaxRetryCount, throwExceptionWhenWriteFail, errorMessageWhenWriteFailTemplate, errorMessageWhenWriteFailArgs);             } catch (Exception exception) {                 latestErrorMessage = String.format(                     "Batch write fail!exception.message:%s, retryCount:%d, maxRetryCount:%d, connectionInfo:%s",                     exception.getMessage(), retryCount, maxRetryCount, toString()                 );                 latestException = exception;                 log.error( latestErrorMessage + ", exception :", exception );                  //判断是否继续重试?                 if( exception.getMessage().contains("point time is expired")) {// Point 是过期数据时,不予重试,直接上抛异常 | Caused by: org.influxdb.InfluxDBException: partial write: point time is expired, compared with rp duration dropped=3                     throw new RuntimeException(latestErrorMessage, latestException);                 } else {//继续重试                     //Q: 线程休眠的逻辑放在此处是否合理? A: 认为此处是合理的,因为该方法被设计为在健康检测后批量写入 point 数据,且写失败时将等待一段时间后再重试。因此,此处的线程休眠是合理的。                     Thread.sleep( retryIntervalMs );//线程休眠, //Thread.sleep( {入参的的时间单位毫秒级} );                 }             }             retryCount++;         }          if( (!executeResult) && (retryCount >= maxRetryCount) ){             throw new RuntimeException(latestErrorMessage, latestException);         }          return executeResult;     }      public QueryResult query(String measurement){         return influxDB.query(new Query("SELECT * FROM " + measurement, database));     }      public QueryResult queryResult(String sql){         return influxDB.query(new Query(sql, database,true));     }      public void close(){         if(this.influxDB != null){             influxDB.close();         }     }      /**      * 健康检测      * @return      */     public boolean health(InfluxDB influxDB){         Pong ping = influxDB.ping();         this.latestHealthCheckTime = System.currentTimeMillis();         this.latestHealthCheckResult = ping.isGood();         return this.latestHealthCheckResult;     }      public boolean health(){         return health(influxDB);     }      /**      * 健康检测      * @note 若检测失败,最大重试 maxRetryCount 次,如果最终仍检测失败将上抛异常      * @param healthCheckIntervalMs 健康检查的间隔时间(单位: ms,毫秒)      *   if healthCheckIntervalMs == -1: 不考虑健康间隔时间,每次请求本方法时,必须立即进行健康检测      *   else healthCheckIntervalMs != -1 : 考虑健康检测时间,但如果本次请求本方法时尚未达到健康检测触发条件时,将直接返回最近1次的健康检测结果。      * @param maxRetryCount 最大重试次数      * @return 健康检测结果      */     public Boolean health(Integer healthCheckIntervalMs, Integer maxRetryCount, Boolean throwExceptionWhenNotHealth){         InfluxDB influxDB = this.influxDB;         Boolean healthCheckResult = false;//检测结果是否为健康         if( healthCheckIntervalMs.equals( -1 )){//禁用 healthCheckIntervalMs 特性             Integer retryCount = 0; //连接检测的计数器             String latestErrorMessage = null;//最近一次健康检查的异常日志             while( (!healthCheckResult) && retryCount < maxRetryCount) {//循环尝试                 //判断连接是否断开,是则新建连接                 try {                     healthCheckResult = this.health(influxDB);                     if ( !healthCheckResult ) {                         influxDB = recreateConnection();//新建连接                         this.influxDB = influxDB; //更新当前 this.influxDB 屬性                     }                 } catch (Exception exception) {                     latestErrorMessage = String.format(                             "Health check fail!exception.message:%s, retryCount:%d, maxRetryCount:%d, connectionInfo:%s",                             exception.getMessage(), retryCount, maxRetryCount, toString()                     );                     log.error( latestErrorMessage + ", exception :", exception );                     if(throwExceptionWhenNotHealth) {                      }                 }                 retryCount++;             }             if( (!healthCheckResult) && throwExceptionWhenNotHealth ){                 throw new RuntimeException( "latestErrorMessage:" + latestErrorMessage );             }         } else {//启用 healthCheckIntervalMs 特性             Long currentTime = System.currentTimeMillis();             if( currentTime >= ( this.latestHealthCheckTime + healthCheckIntervalMs) ) {//达到健康检测的触发条件                 healthCheckResult = this.health( -1, maxRetryCount, throwExceptionWhenNotHealth);//立即进行健康检测             } else {                 healthCheckResult = this.latestHealthCheckResult;//直接返回上一次的健康检查结果             }         }         return healthCheckResult;     }      public Boolean health(Integer maxRetryCount){         return health(-1, 3, true);     }      public Pong ping(){         Pong ping = influxDB.ping();         return ping;     }      public InfluxDB getInfluxDB() {         return influxDB;     }      public Long getLatestHealthCheckTime() {         return latestHealthCheckTime;     }      public Boolean getLatestHealthCheckResult() {         return latestHealthCheckResult;     }      @Override     public String toString() {         return "InfluxDbConnection{" +                 "database='" + database + ''' +                 ", username='" + username + ''' +                 ", password=[" + (password==null?"null" : password.length()) + "]" +                 ", url='" + url + ''' +                 ", retentionPolicy='" + retentionPolicy + ''' +                 ", actions=" + actions +                 ", flushDuration=" + flushDuration +                 ", influxDB=" + influxDB +                 ", latestHealthCheckResult=" + latestHealthCheckResult +                 ", latestHealthCheckTime=" + latestHealthCheckTime +                 '}';     } } 

附件: influxdb-java | Point#concatenatedFields

  • 数据写入性能优化: 重写 influxdb-java:2.22{@link org.influxdb.dto.Point#concatenatedFields } 方法,针对 数值型字段,取消原 format to string 的方式 (NUMBER_FORMATTER.get().format(value));
package org.influxdb.dto;  import java.lang.annotation.Annotation; import java.lang.reflect.Field; import java.math.BigDecimal; import java.math.BigInteger; import java.math.RoundingMode; import java.text.NumberFormat; import java.time.Instant; import java.util.Locale; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.TreeMap; import java.util.Optional; import java.util.concurrent.TimeUnit; import org.influxdb.BuilderException; import org.influxdb.annotation.Column; import org.influxdb.annotation.Measurement; import org.influxdb.annotation.TimeColumn; import org.influxdb.impl.Preconditions;  /**  * @description 重写: org/influxdb/dto/Point.java#concatenatedFields  */  /**  * Representation of a InfluxDB database Point.  *  * @author stefan.majer [at] gmail.com  *  */ public class Point {     private String measurement;     private Map<String, String> tags;     private Number time;     private TimeUnit precision = TimeUnit.NANOSECONDS;     private Map<String, Object> fields;     private static final int MAX_FRACTION_DIGITS = 340;     private static final ThreadLocal<NumberFormat> NUMBER_FORMATTER =             ThreadLocal.withInitial(() -> {                 NumberFormat numberFormat = NumberFormat.getInstance(Locale.ENGLISH);                 numberFormat.setMaximumFractionDigits(MAX_FRACTION_DIGITS);                 numberFormat.setGroupingUsed(false);                 numberFormat.setMinimumFractionDigits(1);                 return numberFormat;             });      private static final int DEFAULT_STRING_BUILDER_SIZE = 1024;     private static final ThreadLocal<StringBuilder> CACHED_STRINGBUILDERS =             ThreadLocal.withInitial(() -> new StringBuilder(DEFAULT_STRING_BUILDER_SIZE));      Point() {     }      /**      * Create a new Point Build build to create a new Point in a fluent manner.      *      * @param measurement      *            the name of the measurement.      * @return the Builder to be able to add further Builder calls.      */      public static Builder measurement(final String measurement) {         return new Builder(measurement);     }      /**      * Create a new Point Build build to create a new Point in a fluent manner from a POJO.      *      * @param clazz Class of the POJO      * @return the Builder instance      */      public static Builder measurementByPOJO(final Class<?> clazz) {         Objects.requireNonNull(clazz, "clazz");         throwExceptionIfMissingAnnotation(clazz, Measurement.class);         String measurementName = findMeasurementName(clazz);         return new Builder(measurementName);     }      private static void throwExceptionIfMissingAnnotation(final Class<?> clazz,                                                           final Class<? extends Annotation> expectedClass) {         if (!clazz.isAnnotationPresent(expectedClass)) {             throw new IllegalArgumentException("Class " + clazz.getName() + " is not annotated with @"                     + Measurement.class.getSimpleName());         }     }      /**      * Builder for a new Point.      *      * @author stefan.majer [at] gmail.com      *      */     public static final class Builder {         private static final BigInteger NANOSECONDS_PER_SECOND = BigInteger.valueOf(1000000000L);         private final String measurement;         private final Map<String, String> tags = new TreeMap<>();         private Number time;         private TimeUnit precision;         private final Map<String, Object> fields = new TreeMap<>();          /**          * @param measurement          */         Builder(final String measurement) {             this.measurement = measurement;         }          /**          * Add a tag to this point.          *          * @param tagName          *            the tag name          * @param value          *            the tag value          * @return the Builder instance.          */         public Builder tag(final String tagName, final String value) {             Objects.requireNonNull(tagName, "tagName");             Objects.requireNonNull(value, "value");             if (!tagName.isEmpty() && !value.isEmpty()) {                 tags.put(tagName, value);             }             return this;         }          /**          * Add a Map of tags to add to this point.          *          * @param tagsToAdd          *            the Map of tags to add          * @return the Builder instance.          */         public Builder tag(final Map<String, String> tagsToAdd) {             for (Entry<String, String> tag : tagsToAdd.entrySet()) {                 tag(tag.getKey(), tag.getValue());             }             return this;         }          /**          * Add a field to this point.          *          * @param field          *            the field name          * @param value          *            the value of this field          * @return the Builder instance.          */         @SuppressWarnings("checkstyle:finalparameters")         @Deprecated         public Builder field(final String field, Object value) {             if (value instanceof Number) {                 if (value instanceof Byte) {                     value = ((Byte) value).doubleValue();                 } else if (value instanceof Short) {                     value = ((Short) value).doubleValue();                 } else if (value instanceof Integer) {                     value = ((Integer) value).doubleValue();                 } else if (value instanceof Long) {                     value = ((Long) value).doubleValue();                 } else if (value instanceof BigInteger) {                     value = ((BigInteger) value).doubleValue();                 }             }             fields.put(field, value);             return this;         }          public Builder addField(final String field, final boolean value) {             fields.put(field, value);             return this;         }          public Builder addField(final String field, final long value) {             fields.put(field, value);             return this;         }          public Builder addField(final String field, final double value) {             fields.put(field, value);             return this;         }          public Builder addField(final String field, final int value) {             fields.put(field, value);             return this;         }          public Builder addField(final String field, final float value) {             fields.put(field, value);             return this;         }          public Builder addField(final String field, final short value) {             fields.put(field, value);             return this;         }          public Builder addField(final String field, final Number value) {             fields.put(field, value);             return this;         }          public Builder addField(final String field, final String value) {             Objects.requireNonNull(value, "value");              fields.put(field, value);             return this;         }          /**          * Add a Map of fields to this point.          *          * @param fieldsToAdd          *            the fields to add          * @return the Builder instance.          */         public Builder fields(final Map<String, Object> fieldsToAdd) {             this.fields.putAll(fieldsToAdd);             return this;         }          /**          * Add a time to this point.          *          * @param timeToSet      the time for this point          * @param precisionToSet the TimeUnit          * @return the Builder instance.          */         public Builder time(final Number timeToSet, final TimeUnit precisionToSet) {             Objects.requireNonNull(timeToSet, "timeToSet");             Objects.requireNonNull(precisionToSet, "precisionToSet");             this.time = timeToSet;             this.precision = precisionToSet;             return this;         }          /**          * Add a time to this point as long.          * only kept for binary compatibility with previous releases.          *          * @param timeToSet      the time for this point as long          * @param precisionToSet the TimeUnit          * @return the Builder instance.          */         public Builder time(final long timeToSet, final TimeUnit precisionToSet) {             return time((Number) timeToSet, precisionToSet);         }          /**          * Add a time to this point as Long.          * only kept for binary compatibility with previous releases.          *          * @param timeToSet      the time for this point as Long          * @param precisionToSet the TimeUnit          * @return the Builder instance.          */         public Builder time(final Long timeToSet, final TimeUnit precisionToSet) {             return time((Number) timeToSet, precisionToSet);         }          /**          * Does this builder contain any fields?          *          * @return true, if the builder contains any fields, false otherwise.          */         public boolean hasFields() {             return !fields.isEmpty();         }          /**          * Adds field map from object by reflection using {@link org.influxdb.annotation.Column}          * annotation.          *          * @param pojo POJO Object with annotation {@link org.influxdb.annotation.Column} on fields          * @return the Builder instance          */         public Builder addFieldsFromPOJO(final Object pojo) {              Class<? extends Object> clazz = pojo.getClass();             while (clazz != null) {                  for (Field field : clazz.getDeclaredFields()) {                      Column column = field.getAnnotation(Column.class);                      if (column == null) {                         continue;                     }                      field.setAccessible(true);                     String fieldName = column.name();                     addFieldByAttribute(pojo, field, column, fieldName);                 }                 clazz = clazz.getSuperclass();             }              if (this.fields.isEmpty()) {                 throw new BuilderException("Class " + pojo.getClass().getName()                         + " has no @" + Column.class.getSimpleName() + " annotation");             }              return this;         }          private void addFieldByAttribute(final Object pojo, final Field field, final Column column,                                          final String fieldName) {             try {                 Object fieldValue = field.get(pojo);                  TimeColumn tc = field.getAnnotation(TimeColumn.class);                 if (tc != null && Instant.class.isAssignableFrom(field.getType())) {                     Optional.ofNullable((Instant) fieldValue).ifPresent(instant -> {                         TimeUnit timeUnit = tc.timeUnit();                         if (timeUnit == TimeUnit.NANOSECONDS || timeUnit == TimeUnit.MICROSECONDS) {                             this.time = BigInteger.valueOf(instant.getEpochSecond())                                     .multiply(NANOSECONDS_PER_SECOND)                                     .add(BigInteger.valueOf(instant.getNano()))                                     .divide(BigInteger.valueOf(TimeUnit.NANOSECONDS.convert(1, timeUnit)));                         } else {                             this.time = TimeUnit.MILLISECONDS.convert(instant.toEpochMilli(), timeUnit);                             this.precision = timeUnit;                         }                         this.precision = timeUnit;                     });                     return;                 }                  if (column.tag()) {                     if (fieldValue != null) {                         this.tags.put(fieldName, (String) fieldValue);                     }                 } else {                     if (fieldValue != null) {                         this.fields.put(fieldName, fieldValue);                     }                 }              } catch (IllegalArgumentException | IllegalAccessException e) {                 // Can not happen since we use metadata got from the object                 throw new BuilderException(                         "Field " + fieldName + " could not found on class " + pojo.getClass().getSimpleName());             }         }          /**          * Create a new Point.          *          * @return the newly created Point.          */         public Point build() {             Preconditions.checkNonEmptyString(this.measurement, "measurement");             Preconditions.checkPositiveNumber(this.fields.size(), "fields size");             Point point = new Point();             point.setFields(this.fields);             point.setMeasurement(this.measurement);             if (this.time != null) {                 point.setTime(this.time);                 point.setPrecision(this.precision);             }             point.setTags(this.tags);             return point;         }     }      /**      * @param measurement      *            the measurement to set      */     void setMeasurement(final String measurement) {         this.measurement = measurement;     }      /**      * @param time      *            the time to set      */     void setTime(final Number time) {         this.time = time;     }      /**      * @param tags      *            the tags to set      */     void setTags(final Map<String, String> tags) {         this.tags = tags;     }      /**      * @return the tags      */     Map<String, String> getTags() {         return this.tags;     }      /**      * @param precision      *            the precision to set      */     void setPrecision(final TimeUnit precision) {         this.precision = precision;     }      /**      * @return the fields      */     Map<String, Object> getFields() {         return this.fields;     }      /**      * @param fields      *            the fields to set      */     void setFields(final Map<String, Object> fields) {         this.fields = fields;     }      @Override     public boolean equals(final Object o) {         if (this == o) {             return true;         }         if (o == null || getClass() != o.getClass()) {             return false;         }         Point point = (Point) o;         return Objects.equals(measurement, point.measurement)                 && Objects.equals(tags, point.tags)                 && Objects.equals(time, point.time)                 && precision == point.precision                 && Objects.equals(fields, point.fields);     }      @Override     public int hashCode() {         return Objects.hash(measurement, tags, time, precision, fields);     }      /**      * {@inheritDoc}      */     @Override     public String toString() {         StringBuilder builder = new StringBuilder();         builder.append("Point [name=");         builder.append(this.measurement);         if (this.time != null) {             builder.append(", time=");             builder.append(this.time);         }         builder.append(", tags=");         builder.append(this.tags);         if (this.precision != null) {             builder.append(", precision=");             builder.append(this.precision);         }         builder.append(", fields=");         builder.append(this.fields);         builder.append("]");         return builder.toString();     }      /**      * Calculate the lineprotocol entry for a single Point.      * <p>      * NaN and infinity values are silently dropped as they are unsupported:      * https://github.com/influxdata/influxdb/issues/4089      *      * @see <a href="https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_reference/">      *     InfluxDB line protocol reference</a>      *      * @return the String without newLine, empty when there are no fields to write      */     public String lineProtocol() {         return lineProtocol(null);     }      /**      * Calculate the lineprotocol entry for a single point, using a specific {@link TimeUnit} for the timestamp.      * <p>      * NaN and infinity values are silently dropped as they are unsupported:      * https://github.com/influxdata/influxdb/issues/4089      *      * @see <a href="https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_reference/">      *     InfluxDB line protocol reference</a>      *      * @param precision the time precision unit for this point      * @return the String without newLine, empty when there are no fields to write      */     public String lineProtocol(final TimeUnit precision) {          // setLength(0) is used for reusing cached StringBuilder instance per thread         // it reduces GC activity and performs better then new StringBuilder()         StringBuilder sb = CACHED_STRINGBUILDERS.get();         sb.setLength(0);          escapeKey(sb, measurement);         concatenatedTags(sb);         int writtenFields = concatenatedFields(sb);         if (writtenFields == 0) {             return "";         }         formatedTime(sb, precision);          return sb.toString();     }      private void concatenatedTags(final StringBuilder sb) {         for (Entry<String, String> tag : this.tags.entrySet()) {             sb.append(',');             escapeKey(sb, tag.getKey());             sb.append('=');             escapeKey(sb, tag.getValue());         }         sb.append(' ');     }      /**      * 构建 fields 字段      * @description      *    1. 数据写入性能优化: 重写 {@link org.influxdb.dto.Point#concatenatedFields }方法,针对 数值型字段,取消原 format to string 的方式 (`NUMBER_FORMATTER.get().format(value)`);      * @param sb      * @return      */     private int concatenatedFields(final StringBuilder sb) {         int fieldCount = 0;         for (Entry<String, Object> field : this.fields.entrySet()) {             Object value = field.getValue();             if (value == null || isNotFinite(value)) {                 continue;             }             escapeKey(sb, field.getKey());             sb.append('=');             if (value instanceof Number) {                 if (value instanceof Double || value instanceof Float || value instanceof BigDecimal) {                     sb.append(value);                     //sb.append(NUMBER_FORMATTER.get().format(value));                 } else {                     sb.append(value).append('i');                 }             } else if (value instanceof String) {                 String stringValue = (String) value;                 sb.append('"');                 escapeField(sb, stringValue);                 sb.append('"');             } else {                 sb.append(value);             }              sb.append(',');              fieldCount++;         }          // efficiently chop off the trailing comma         int lengthMinusOne = sb.length() - 1;         if (sb.charAt(lengthMinusOne) == ',') {             sb.setLength(lengthMinusOne);         }          return fieldCount;     }      static void escapeKey(final StringBuilder sb, final String key) {         for (int i = 0; i < key.length(); i++) {             switch (key.charAt(i)) {                 case ' ':                 case ',':                 case '=':                     sb.append('\');                 default:                     sb.append(key.charAt(i));             }         }     }      static void escapeField(final StringBuilder sb, final String field) {         for (int i = 0; i < field.length(); i++) {             switch (field.charAt(i)) {                 case '\':                 case '"':                     sb.append('\');                 default:                     sb.append(field.charAt(i));             }         }     }      private static boolean isNotFinite(final Object value) {         return value instanceof Double && !Double.isFinite((Double) value)                 || value instanceof Float && !Float.isFinite((Float) value);     }      private void formatedTime(final StringBuilder sb, final TimeUnit precision) {         if (this.time == null) {             return;         }         TimeUnit converterPrecision = precision;          if (converterPrecision == null) {             converterPrecision = TimeUnit.NANOSECONDS;         }         if (this.time instanceof BigInteger) {             BigInteger time = (BigInteger) this.time;             long conversionFactor = converterPrecision.convert(1, this.precision);             if (conversionFactor >= 1) {                 time = time.multiply(BigInteger.valueOf(conversionFactor));             } else {                 conversionFactor = this.precision.convert(1, converterPrecision);                 time = time.divide(BigInteger.valueOf(conversionFactor));             }             sb.append(" ").append(time);         } else if (this.time instanceof BigDecimal) {             BigDecimal time = (BigDecimal) this.time;             long conversionFactor = converterPrecision.convert(1, this.precision);             if (conversionFactor >= 1) {                 time = time.multiply(BigDecimal.valueOf(conversionFactor));             } else {                 conversionFactor = this.precision.convert(1, converterPrecision);                 time = time.divide(BigDecimal.valueOf(conversionFactor), RoundingMode.HALF_UP);             }             sb.append(" ").append(time.toBigInteger());         } else {             sb.append(" ").append(converterPrecision.convert(this.time.longValue(), this.precision));         }     }       private static String findMeasurementName(final Class<?> clazz) {         return clazz.getAnnotation(Measurement.class).name();     } } 

附件: influxdb-java | BatchProcessor

  • 修改了influxdb-java:2.22 的下列方法,添加批量写的耗时观测日志:
  • write()
    ==== 该方法被如下调用:
  • put(final AbstractBatchEntry) : 核心场景1
  • BatchProcessor#constructor(...)#flushRunnable : 核心场景2
  • flush()
  • flushAndShutdown()
package org.influxdb.impl;  import lombok.extern.slf4j.Slf4j; import org.influxdb.InfluxDB; import org.influxdb.InfluxDB.ConsistencyLevel; import org.influxdb.dto.BatchPoints; import org.influxdb.dto.Point;  import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger;  /**  * A BatchProcessor can be attached to a InfluxDB Instance to collect single point writes and  * aggregates them to BatchPoints to get a better write performance.  *  * @author stefan.majer [at] gmail.com  * @note  *   1. 修改了下列方法,添加批量写的耗时日志:  *      write()  *      ==== 该方法被如下调用  *      put(final AbstractBatchEntry) : 核心场景1  *      BatchProcessor#constructor(...)#flushRunnable : 核心场景2  *      flush()  *      flushAndShutdown()  */ @Slf4j public final class BatchProcessor {      private static final Logger LOG = Logger.getLogger(BatchProcessor.class.getName());     protected final BlockingQueue<AbstractBatchEntry> queue;     private final ScheduledExecutorService scheduler;     private final BiConsumer<Iterable<Point>, Throwable> exceptionHandler;     final InfluxDB influxDB;     final int actions;     private final TimeUnit flushIntervalUnit;     private final int flushInterval;     private final ConsistencyLevel consistencyLevel;     private final int jitterInterval;     private final TimeUnit precision;     private final BatchWriter batchWriter;     private boolean dropActionsOnQueueExhaustion;     Consumer<Point> droppedActionHandler;     Supplier<Double> randomSupplier;      /**      * The Builder to create a BatchProcessor instance.      */     public static final class Builder {         private final InfluxDB influxDB;         private ThreadFactory threadFactory = Executors.defaultThreadFactory();         private int actions;         private TimeUnit flushIntervalUnit;         private int flushInterval;         private int jitterInterval;         // this is a default value if the InfluxDb.enableBatch(BatchOptions) IS NOT used         // the reason is backward compatibility         private int bufferLimit = 0;         private TimeUnit precision;          private BiConsumer<Iterable<Point>, Throwable> exceptionHandler = (entries, throwable) -> { };         private ConsistencyLevel consistencyLevel;          private boolean dropActionsOnQueueExhaustion;         private Consumer<Point> droppedActionsHandler;         /**          * @param threadFactory          *            is optional.          * @return this Builder to use it fluent          */         public Builder threadFactory(final ThreadFactory threadFactory) {             this.threadFactory = threadFactory;             return this;         }          /**          * @param influxDB          *            is mandatory.          */         public Builder(final InfluxDB influxDB) {             this.influxDB = influxDB;         }          /**          * The number of actions after which a batchwrite must be performed.          *          * @param maxActions          *            number of Points written after which a write must happen.          * @return this Builder to use it fluent          */         public Builder actions(final int maxActions) {             this.actions = maxActions;             return this;         }          /**          * The interval at which at least should issued a write.          *          * @param interval          *            the interval          * @param unit          *            the TimeUnit of the interval          *          * @return this Builder to use it fluent          */         public Builder interval(final int interval, final TimeUnit unit) {             this.flushInterval = interval;             this.flushIntervalUnit = unit;             return this;         }          /**          * The interval at which at least should issued a write.          *          * @param flushInterval          *            the flush interval          * @param jitterInterval          *            the flush jitter interval          * @param unit          *            the TimeUnit of the interval          *          * @return this Builder to use it fluent          */         public Builder interval(final int flushInterval, final int jitterInterval, final TimeUnit unit) {             this.flushInterval = flushInterval;             this.jitterInterval = jitterInterval;             this.flushIntervalUnit = unit;             return this;         }          /**          * A buffer for failed writes so that the writes will be retried later on. When the buffer is full and          * new points are written, oldest entries in the buffer are lost.          *          * @param bufferLimit maximum number of points stored in the buffer          * @return this Builder to use it fluent          */         public Builder bufferLimit(final int bufferLimit) {             this.bufferLimit = bufferLimit;             return this;         }          /**          * A callback to be used when an error occurs during a batchwrite.          *          * @param handler          *            the handler          *          * @return this Builder to use it fluent          */         public Builder exceptionHandler(final BiConsumer<Iterable<Point>, Throwable> handler) {             this.exceptionHandler = handler;             return this;         }          /**          * To define the behaviour when the action queue exhausts. If unspecified, will default to false which means that          * the {@link InfluxDB#write(Point)} will be blocked till the space in the queue is created.          * true means that the newer actions being written to the queue will dropped and          * {@link BatchProcessor#droppedActionHandler} will be called.          *          * @param dropActionsOnQueueExhaustion          *            the dropActionsOnQueueExhaustion          *          * @return this Builder to use it fluent          */         public Builder dropActionsOnQueueExhaustion(final boolean dropActionsOnQueueExhaustion) {             this.dropActionsOnQueueExhaustion = dropActionsOnQueueExhaustion;             return this;         }          /**          * A callback to be used when an actions are dropped on action queue exhaustion.          *          * @param handler          *            the handler          *          * @return this Builder to use it fluent          */         public Builder droppedActionHandler(final Consumer<Point> handler) {             this.droppedActionsHandler = handler;             return this;         }            /**          * Consistency level for batch write.          *          * @param consistencyLevel          *            the consistencyLevel          *          * @return this Builder to use it fluent          */         public Builder consistencyLevel(final ConsistencyLevel consistencyLevel) {             this.consistencyLevel = consistencyLevel;             return this;         }          /**          * Set the time precision to use for the batch.          *          * @param precision          *            the precision          *          * @return this Builder to use it fluent          */         public Builder precision(final TimeUnit precision) {             this.precision = precision;             return this;         }          /**          * Create the BatchProcessor.          *          * @return the BatchProcessor instance.          */         public BatchProcessor build() {             Objects.requireNonNull(this.influxDB, "influxDB");             Preconditions.checkPositiveNumber(this.actions, "actions");             Preconditions.checkPositiveNumber(this.flushInterval, "flushInterval");             Preconditions.checkNotNegativeNumber(jitterInterval, "jitterInterval");             Preconditions.checkNotNegativeNumber(bufferLimit, "bufferLimit");             Objects.requireNonNull(this.flushIntervalUnit, "flushIntervalUnit");             Objects.requireNonNull(this.threadFactory, "threadFactory");             Objects.requireNonNull(this.exceptionHandler, "exceptionHandler");             BatchWriter batchWriter;             if (this.bufferLimit > this.actions) {                 batchWriter = new RetryCapableBatchWriter(this.influxDB, this.exceptionHandler, this.bufferLimit, this.actions);             } else {                 batchWriter = new OneShotBatchWriter(this.influxDB);             }             return new BatchProcessor(this.influxDB, batchWriter, this.threadFactory, this.actions, this.flushIntervalUnit,                     this.flushInterval, this.jitterInterval, exceptionHandler, this.consistencyLevel,                     this.precision, this.dropActionsOnQueueExhaustion, this.droppedActionsHandler);         }     }      abstract static class AbstractBatchEntry {         private final Point point;          public AbstractBatchEntry(final Point point) {             this.point = point;         }          public Point getPoint() {             return this.point;         }     }      static class HttpBatchEntry extends AbstractBatchEntry {         private final String db;         private final String rp;          public HttpBatchEntry(final Point point, final String db, final String rp) {             super(point);             this.db = db;             this.rp = rp;         }          public String getDb() {             return this.db;         }          public String getRp() {             return this.rp;         }     }      static class UdpBatchEntry extends AbstractBatchEntry {         private final int udpPort;          public UdpBatchEntry(final Point point, final int udpPort) {             super(point);             this.udpPort = udpPort;         }          public int getUdpPort() {             return this.udpPort;         }     }      /**      * Static method to create the Builder for this BatchProcessor.      *      * @param influxDB      *            the influxdb database handle.      * @return the Builder to create the BatchProcessor.      */     public static Builder builder(final InfluxDB influxDB) {         return new Builder(influxDB);     }      BatchProcessor(final InfluxDB influxDB, final BatchWriter batchWriter, final ThreadFactory threadFactory,                    final int actions, final TimeUnit flushIntervalUnit, final int flushInterval, final int jitterInterval,                    final BiConsumer<Iterable<Point>, Throwable> exceptionHandler,                    final ConsistencyLevel consistencyLevel, final TimeUnit precision,                    final boolean dropActionsOnQueueExhaustion, final Consumer<Point> droppedActionHandler) {         super();         this.influxDB = influxDB;         this.batchWriter = batchWriter;         this.actions = actions;         this.flushIntervalUnit = flushIntervalUnit;         this.flushInterval = flushInterval;         this.jitterInterval = jitterInterval;         this.scheduler = Executors.newSingleThreadScheduledExecutor(threadFactory);         this.exceptionHandler = exceptionHandler;         this.consistencyLevel = consistencyLevel;         this.precision = precision;         this.dropActionsOnQueueExhaustion = dropActionsOnQueueExhaustion;         this.droppedActionHandler = droppedActionHandler;         if (actions > 1 && actions < Integer.MAX_VALUE) {             this.queue = new LinkedBlockingQueue<>(actions);         } else {             this.queue = new LinkedBlockingQueue<>();         }         this.randomSupplier = Math::random;          Runnable flushRunnable = new Runnable() {             @Override             public void run() {                 // write doesn't throw any exceptions                 write();                 int jitterInterval = (int) (randomSupplier.get() * BatchProcessor.this.jitterInterval);                 BatchProcessor.this.scheduler.schedule(this,                         BatchProcessor.this.flushInterval + jitterInterval, BatchProcessor.this.flushIntervalUnit);             }         };         // Flush at specified Rate         this.scheduler.schedule(flushRunnable,                 this.flushInterval + (int) (randomSupplier.get() * BatchProcessor.this.jitterInterval),                 this.flushIntervalUnit);     }      private static Long logBatchWriteTimeConsumingInterval = 5*60*1000*1000L;//单位:ns     private AtomicLong latestLogBatchWriteTimeConsumingTimestamp = new AtomicLong(logBatchWriteTimeConsumingInterval);//增加1个记录上一次打印日志的时间戳变量      void write() {         Long startTime = System.nanoTime();//1纳秒=0.00000 0001秒          List<Point> currentBatch = null;         try {             if (this.queue.isEmpty()) {                 BatchProcessor.this.batchWriter.write(Collections.emptyList());                 return;             }             //for batch on HTTP.             Map<String, BatchPoints> batchKeyToBatchPoints = new HashMap<>();             //for batch on UDP.             Map<Integer, List<String>> udpPortToBatchPoints = new HashMap<>();             List<AbstractBatchEntry> batchEntries = new ArrayList<>(this.queue.size());             this.queue.drainTo(batchEntries);             currentBatch = new ArrayList<>(batchEntries.size());              for (AbstractBatchEntry batchEntry : batchEntries) {                 Point point = batchEntry.getPoint();                 currentBatch.add(point);                 if (batchEntry instanceof HttpBatchEntry) {                     HttpBatchEntry httpBatchEntry = HttpBatchEntry.class.cast(batchEntry);                     String dbName = httpBatchEntry.getDb();                     String rp = httpBatchEntry.getRp();                     String batchKey = dbName + "_" + rp;                     if (!batchKeyToBatchPoints.containsKey(batchKey)) {                         BatchPoints batchPoints = BatchPoints.database(dbName)                                 .retentionPolicy(rp).consistency(getConsistencyLevel())                                 .precision(getPrecision()).build();                         batchKeyToBatchPoints.put(batchKey, batchPoints);                     }                     batchKeyToBatchPoints.get(batchKey).point(point);                 } else if (batchEntry instanceof UdpBatchEntry) {                     UdpBatchEntry udpBatchEntry = UdpBatchEntry.class.cast(batchEntry);                     int udpPort = udpBatchEntry.getUdpPort();                     if (!udpPortToBatchPoints.containsKey(udpPort)) {                         List<String> batchPoints = new ArrayList<String>();                         udpPortToBatchPoints.put(udpPort, batchPoints);                     }                     udpPortToBatchPoints.get(udpPort).add(point.lineProtocol());                 }             }              BatchProcessor.this.batchWriter.write(batchKeyToBatchPoints.values());              for (Entry<Integer, List<String>> entry : udpPortToBatchPoints.entrySet()) {                 for (String lineprotocolStr : entry.getValue()) {                     BatchProcessor.this.influxDB.write(entry.getKey(), lineprotocolStr);                 }             }         } catch (Throwable t) {             // any exception wouldn't stop the scheduler             exceptionHandler.accept(currentBatch, t);             LOG.log(Level.SEVERE, "Batch could not be sent. Data will be lost", t);         }          Long endTime = System.nanoTime();         if( (endTime - latestLogBatchWriteTimeConsumingTimestamp.get() ) >= logBatchWriteTimeConsumingInterval ){             Long timDiffNs = (endTime-startTime);             log.info("Success to batch write points to tsdb by `BatchProcessor#write`!time-consuming:{}ns={}ms, actions:{}, queue.size:{}", timDiffNs, timDiffNs/1000000L, actions, queue.size());             latestLogBatchWriteTimeConsumingTimestamp.set( endTime );         }     }      /**      * Put a single BatchEntry to the cache for later processing.      *      * @param batchEntry      *            the batchEntry to write to the cache.      */     void put(final AbstractBatchEntry batchEntry) {         try {             if (this.dropActionsOnQueueExhaustion) {                 if (!this.queue.offer(batchEntry)) {                     this.droppedActionHandler.accept(batchEntry.getPoint());                     return;                 }             } else {                 this.queue.put(batchEntry);             }         } catch (InterruptedException e) {             throw new RuntimeException(e);         }         if (this.queue.size() >= this.actions) {             this.scheduler.submit(new Runnable() {                 @Override                 public void run() {                     write();                 }             });         }     }      /**      * Flush the current open writes to influxdb and end stop the reaper thread. This should only be      * called if no batch processing is needed anymore.      *      */     void flushAndShutdown() {         this.write();         this.scheduler.shutdown();         this.batchWriter.close();     }      /**      * Flush the current open writes to InfluxDB. This will block until all pending points are written.      */     void flush() {         this.write();     }      public ConsistencyLevel getConsistencyLevel() {         return consistencyLevel;     }      public TimeUnit getPrecision() {         return precision;     }      BatchWriter getBatchWriter() {         return batchWriter;     }      public boolean isDropActionsOnQueueExhaustion() {         return dropActionsOnQueueExhaustion;     }      public Consumer<Point> getDroppedActionHandler() {         return droppedActionHandler;     } } 

附件: influxdb-java | InfluxDBImpl#ping()

  • 重写 influxdb-java:2.22InfluxDBImpl#ping() 方法的逻辑,以兼容 OpenGemini(v1.2.0)
package org.influxdb.impl;  import com.squareup.moshi.JsonAdapter; import com.squareup.moshi.Moshi; import lombok.extern.slf4j.Slf4j; import okhttp3.*; import okhttp3.logging.HttpLoggingInterceptor; import okhttp3.logging.HttpLoggingInterceptor.Level; import okio.BufferedSource; import org.influxdb.BatchOptions; import org.influxdb.InfluxDB; import org.influxdb.InfluxDBException; import org.influxdb.InfluxDBIOException; import org.influxdb.dto.*; import org.influxdb.impl.*; import org.influxdb.impl.BatchProcessor.HttpBatchEntry; import org.influxdb.impl.BatchProcessor.UdpBatchEntry; import org.influxdb.msgpack.MessagePackConverterFactory; import org.influxdb.msgpack.MessagePackTraverser; import retrofit2.Call; import retrofit2.Callback; import retrofit2.Converter.Factory; import retrofit2.Response; import retrofit2.Retrofit; import retrofit2.converter.moshi.MoshiConverterFactory;  import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.net.*; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.LongAdder; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.regex.Matcher; import java.util.regex.Pattern;  /**  * Implementation of a InluxDB API.  * @note  *  重写 {InfluxDBImpl#ping() } 方法的逻辑,以兼容 OpenGemini  * @author  *  stefan.majer [at] gmail.com  *  johnny zen  * @reference-doc  *  [1] https://github.com/influxdata/influxdb-java/tree/influxdb-java-2.22/src/main/java/org/influxdb  *  [2] https://github.com/influxdata/influxdb-java/blob/influxdb-java-2.22/src/main/java/org/influxdb/impl/InfluxDBImpl.java  */ @Slf4j public class InfluxDBImpl implements InfluxDB {      private static final String APPLICATION_MSGPACK = "application/x-msgpack";      static final MediaType MEDIA_TYPE_STRING = MediaType.parse("text/plain");      private static final String SHOW_DATABASE_COMMAND_ENCODED = Query.encode("SHOW DATABASES");      /**      * This static constant holds the http logging log level expected in DEBUG mode      * It is set by System property {@code org.influxdb.InfluxDB.logLevel}.      *      * @see InfluxDB#LOG_LEVEL_PROPERTY      */     private static final LogLevel LOG_LEVEL = LogLevel.parseLogLevel(System.getProperty(LOG_LEVEL_PROPERTY));      private final String hostName;     private String version;     private final Retrofit retrofit;     private final OkHttpClient client;     private final InfluxDBService influxDBService;     private BatchProcessor batchProcessor;     private final AtomicBoolean batchEnabled = new AtomicBoolean(false);     private final LongAdder writeCount = new LongAdder();     private final LongAdder unBatchedCount = new LongAdder();     private final LongAdder batchedCount = new LongAdder();     private volatile DatagramSocket datagramSocket;     private final HttpLoggingInterceptor loggingInterceptor;     private final GzipRequestInterceptor gzipRequestInterceptor;     private LogLevel logLevel = LogLevel.NONE;     private String database;     private String retentionPolicy = "autogen";     private ConsistencyLevel consistency = ConsistencyLevel.ONE;     private final boolean messagePack;     private Boolean messagePackSupport;     private final ChunkProccesor chunkProccesor;      /**      * Constructs a new {@code InfluxDBImpl}.      *      * @param url      *          The InfluxDB server API URL      * @param username      *          The InfluxDB user name      * @param password      *          The InfluxDB user password      * @param okHttpBuilder      *          The OkHttp Client Builder      * @param responseFormat      *          The {@code ResponseFormat} to use for response from InfluxDB      *          server      */     public InfluxDBImpl(final String url, final String username, final String password,                         final OkHttpClient.Builder okHttpBuilder, final ResponseFormat responseFormat) {         this(url, username, password, okHttpBuilder, new Retrofit.Builder(), responseFormat);     }      /**      * Constructs a new {@code InfluxDBImpl}.      *      * @param url      *          The InfluxDB server API URL      * @param username      *          The InfluxDB user name      * @param password      *          The InfluxDB user password      * @param okHttpBuilder      *          The OkHttp Client Builder      * @param retrofitBuilder      *          The Retrofit Builder      * @param responseFormat      *          The {@code ResponseFormat} to use for response from InfluxDB      *          server      */     public InfluxDBImpl(final String url, final String username, final String password,                         final OkHttpClient.Builder okHttpBuilder, final Retrofit.Builder retrofitBuilder,                         final ResponseFormat responseFormat) {         this.messagePack = ResponseFormat.MSGPACK.equals(responseFormat);         this.hostName = parseHost(url);          this.loggingInterceptor = new HttpLoggingInterceptor();         setLogLevel(LOG_LEVEL);          this.gzipRequestInterceptor = new GzipRequestInterceptor();         OkHttpClient.Builder clonedOkHttpBuilder = okHttpBuilder.build().newBuilder()                 .addInterceptor(loggingInterceptor)                 .addInterceptor(gzipRequestInterceptor);         if (username != null && password != null) {             clonedOkHttpBuilder.addInterceptor(new BasicAuthInterceptor(username, password));         }         Factory converterFactory = null;         switch (responseFormat) {             case MSGPACK:                 clonedOkHttpBuilder.addInterceptor(chain -> {                     Request request = chain.request().newBuilder().addHeader("Accept", APPLICATION_MSGPACK).build();                     return chain.proceed(request);                 });                  converterFactory = MessagePackConverterFactory.create();                 chunkProccesor = new MessagePackChunkProccesor();                 break;             case JSON:             default:                 converterFactory = MoshiConverterFactory.create();                  Moshi moshi = new Moshi.Builder().build();                 JsonAdapter<QueryResult> adapter = moshi.adapter(QueryResult.class);                 chunkProccesor = new JSONChunkProccesor(adapter);                 break;         }          this.client = clonedOkHttpBuilder.build();         Retrofit.Builder clonedRetrofitBuilder = retrofitBuilder.baseUrl(url).build().newBuilder();         this.retrofit = clonedRetrofitBuilder.client(this.client)                 .addConverterFactory(converterFactory).build();         this.influxDBService = this.retrofit.create(InfluxDBService.class);      }      public InfluxDBImpl(final String url, final String username, final String password,                         final OkHttpClient.Builder client) {         this(url, username, password, client, ResponseFormat.JSON);      }      InfluxDBImpl(final String url, final String username, final String password, final OkHttpClient.Builder client,                  final InfluxDBService influxDBService, final JsonAdapter<QueryResult> adapter) {         super();         this.messagePack = false;         this.hostName = parseHost(url);          this.loggingInterceptor = new HttpLoggingInterceptor();         setLogLevel(LOG_LEVEL);          this.gzipRequestInterceptor = new GzipRequestInterceptor();         OkHttpClient.Builder clonedBuilder = client.build().newBuilder()                 .addInterceptor(loggingInterceptor)                 .addInterceptor(gzipRequestInterceptor)                 .addInterceptor(new BasicAuthInterceptor(username, password));         this.client = clonedBuilder.build();         this.retrofit = new Retrofit.Builder().baseUrl(url)                 .client(this.client)                 .addConverterFactory(MoshiConverterFactory.create()).build();         this.influxDBService = influxDBService;          chunkProccesor = new JSONChunkProccesor(adapter);     }      public InfluxDBImpl(final String url, final String username, final String password, final OkHttpClient.Builder client,                         final String database, final String retentionPolicy, final ConsistencyLevel consistency) {         this(url, username, password, client);          setConsistency(consistency);         setDatabase(database);         setRetentionPolicy(retentionPolicy);     }      private String parseHost(final String url) {         String hostName;         try {             URI uri = new URI(url);             hostName = uri.getHost();         } catch (URISyntaxException e1) {             throw new IllegalArgumentException("Unable to parse url: " + url, e1);         }          if (hostName == null) {             throw new IllegalArgumentException("Unable to parse url: " + url);         }          try {             InetAddress.getByName(hostName);         } catch (UnknownHostException e) {             throw new InfluxDBIOException(e);         }         return hostName;     }      @Override     public InfluxDB setLogLevel(final LogLevel logLevel) {         switch (logLevel) {             case NONE:                 this.loggingInterceptor.setLevel(Level.NONE);                 break;             case BASIC:                 this.loggingInterceptor.setLevel(Level.BASIC);                 break;             case HEADERS:                 this.loggingInterceptor.setLevel(Level.HEADERS);                 break;             case FULL:                 this.loggingInterceptor.setLevel(Level.BODY);                 break;             default:                 break;         }         this.logLevel = logLevel;         return this;     }      /**      * {@inheritDoc}      */     @Override     public InfluxDB enableGzip() {         this.gzipRequestInterceptor.enable();         return this;     }      /**      * {@inheritDoc}      */     @Override     public InfluxDB disableGzip() {         this.gzipRequestInterceptor.disable();         return this;     }      /**      * {@inheritDoc}      */     @Override     public boolean isGzipEnabled() {         return this.gzipRequestInterceptor.isEnabled();     }      @Override     public InfluxDB enableBatch() {         enableBatch(BatchOptions.DEFAULTS);         return this;     }      @Override     public InfluxDB enableBatch(final BatchOptions batchOptions) {         if (this.batchEnabled.get()) {             throw new IllegalStateException("BatchProcessing is already enabled.");         }         this.batchProcessor = BatchProcessor                 .builder(this)                 .actions(batchOptions.getActions())                 .exceptionHandler(batchOptions.getExceptionHandler())                 .interval(batchOptions.getFlushDuration(), batchOptions.getJitterDuration(), TimeUnit.MILLISECONDS)                 .threadFactory(batchOptions.getThreadFactory())                 .bufferLimit(batchOptions.getBufferLimit())                 .consistencyLevel(batchOptions.getConsistency())                 .precision(batchOptions.getPrecision())                 .dropActionsOnQueueExhaustion(batchOptions.isDropActionsOnQueueExhaustion())                 .droppedActionHandler(batchOptions.getDroppedActionHandler())                 .build();         this.batchEnabled.set(true);         return this;     }      @Override     public InfluxDB enableBatch(final int actions, final int flushDuration,                                 final TimeUnit flushDurationTimeUnit) {         enableBatch(actions, flushDuration, flushDurationTimeUnit, Executors.defaultThreadFactory());         return this;     }      @Override     public InfluxDB enableBatch(final int actions, final int flushDuration,                                 final TimeUnit flushDurationTimeUnit, final ThreadFactory threadFactory) {         enableBatch(actions, flushDuration, flushDurationTimeUnit, threadFactory, (points, throwable) -> { });         return this;     }      @Override     public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit,                                 final ThreadFactory threadFactory,                                 final BiConsumer<Iterable<Point>, Throwable> exceptionHandler,                                 final ConsistencyLevel consistency) {         enableBatch(actions, flushDuration, flushDurationTimeUnit, threadFactory, exceptionHandler)                 .setConsistency(consistency);         return this;     }      @Override     public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit,                                 final ThreadFactory threadFactory,                                 final BiConsumer<Iterable<Point>, Throwable> exceptionHandler) {         enableBatch(actions, flushDuration, 0, flushDurationTimeUnit, threadFactory, exceptionHandler, false, null);         return this;     }      private InfluxDB enableBatch(final int actions, final int flushDuration, final int jitterDuration,                                  final TimeUnit durationTimeUnit, final ThreadFactory threadFactory,                                  final BiConsumer<Iterable<Point>, Throwable> exceptionHandler,                                  final boolean dropActionsOnQueueExhaustion, final Consumer<Point> droppedActionHandler) {         if (this.batchEnabled.get()) {             throw new IllegalStateException("BatchProcessing is already enabled.");         }         this.batchProcessor = BatchProcessor                 .builder(this)                 .actions(actions)                 .exceptionHandler(exceptionHandler)                 .interval(flushDuration, jitterDuration, durationTimeUnit)                 .threadFactory(threadFactory)                 .consistencyLevel(consistency)                 .dropActionsOnQueueExhaustion(dropActionsOnQueueExhaustion)                 .droppedActionHandler(droppedActionHandler)                 .build();         this.batchEnabled.set(true);         return this;     }      @Override     public void disableBatch() {         this.batchEnabled.set(false);         if (this.batchProcessor != null) {             this.batchProcessor.flushAndShutdown();         }     }      @Override     public boolean isBatchEnabled() {         return this.batchEnabled.get();     }  // @demo Gemini //    curl -v -XGET http://xx.yy.zz.kk:8086/ping //    Note: Unnecessary use of -X or --request, GET is already inferred. //            *   Trying xx.yy.zz.kk... //            * TCP_NODELAY set //* Connected to xx.yy.zz.kk (xx.yy.zz.kk) port 8086 (#0) //            > GET /ping HTTP/1.1 //            > Host: xx.yy.zz.kk:8086 //            > User-Agent: curl/7.55.1 //            > Accept: */* //> //< HTTP/1.1 204 No Content //< Content-Type: application/json //< Request-Id: 50754f89-56f8-11ef-818b-fa163e301ea5 //< X-Geminidb-Build: OSS //< X-Geminidb-Version: v1.2.0 //< X-Request-Id: 50754f89-56f8-11ef-818b-fa163e301ea5 //< Date: Sat, 10 Aug 2024 09:09:57 GMT //< //* Connection #0 to host xx.yy.zz.kk left intact  // @demo Influxdb //> curl -v -XGET http://xx.yy.zz.mm:8635/ping //Note: Unnecessary use of -X or --request, GET is already inferred. //        *   Trying xx.yy.zz.mm... //        * TCP_NODELAY set //* Connected to xx.yy.zz.mm (xx.yy.zz.mm) port 8635 (#0) //        > GET /ping HTTP/1.1 //        > Host: xx.yy.zz.mm:8635 //        > User-Agent: curl/7.55.1 //        > Accept: */* //> //< HTTP/1.1 204 No Content //< Content-Type: application/json //< Request-Id: 736a0d01-56f8-11ef-a859-fa163e42c8ae //< X-Influxdb-Build: OSS //< X-Influxdb-Version: 1.7.4 //< X-Request-Id: 736a0d01-56f8-11ef-a859-fa163e42c8ae //< Date: Sat, 10 Aug 2024 09:10:55 GMT //< //* Connection #0 to host xx.yy.zz.mm left intact      /**      * ping      * @note Gemini & InfluxDB      * 核心用途: 健康检测      *      * 为兼容 OpenGemini (Gemini Http Response[X-Geminidb-Version] 中使用的 Header 与 InfluxDB [X-Influxdb-Version] 的完全不同,无法识别出 version 字段,      * 1. 重写了 {@link InfluxDBImpl#ping() }      * 2. 故感兴趣的可以了解下 ping.isGood()(isGood方法本质上是判断 version == "unknown" ) 作为健康监测的判断逻辑      * @reference-doc      *  [1] {@link InfluxDBImpl#ping() }      *  [2] {@link InfluxDBImpl#InfluxDBImpl(String, String, String, OkHttpClient.Builder, Retrofit.Builder, ResponseFormat) }      * @return      */     @Override     public Pong ping() {         final long started = System.currentTimeMillis();         Call<ResponseBody> call = this.influxDBService.ping();         try {             Response<ResponseBody> response = call.execute();             Headers headers = response.headers();             String version = "unknown";             for (String name : headers.toMultimap().keySet()) {                 //if (null != name && ( "X-Influxdb-Version".equalsIgnoreCase(name)) {                 if (null != name && ( "X-Influxdb-Version".equalsIgnoreCase(name) || "X-Geminidb-Version".equalsIgnoreCase(name))) {                     //version = headers.get(name);                     version = name + ":" + headers.get(name);                     break;                 }             }             Pong pong = new Pong();             pong.setVersion(version);             pong.setResponseTime(System.currentTimeMillis() - started);             return pong;         } catch (IOException e) {             throw new InfluxDBIOException(e);         }     }      @Override     public String version() {         if (version == null) {             this.version = ping().getVersion();         }         return this.version;     }      @Override     public void write(final Point point) {         write(database, retentionPolicy, point);     }      @Override     public void write(final String records) {         write(database, retentionPolicy, consistency, records);     }      @Override     public void write(final List<String> records) {         write(database, retentionPolicy, consistency, records);     }      @Override     public void write(final String database, final String retentionPolicy, final Point point) {         if (this.batchEnabled.get()) {             HttpBatchEntry batchEntry = new HttpBatchEntry(point, database, retentionPolicy);             this.batchProcessor.put(batchEntry);         } else {             BatchPoints batchPoints = BatchPoints.database(database)                     .retentionPolicy(retentionPolicy).build();             batchPoints.point(point);             this.write(batchPoints);             this.unBatchedCount.increment();         }         this.writeCount.increment();     }      /**      * {@inheritDoc}      */     @Override     public void write(final int udpPort, final Point point) {         if (this.batchEnabled.get()) {             UdpBatchEntry batchEntry = new UdpBatchEntry(point, udpPort);             this.batchProcessor.put(batchEntry);         } else {             this.write(udpPort, point.lineProtocol());             this.unBatchedCount.increment();         }         this.writeCount.increment();     }      @Override     public void write(final BatchPoints batchPoints) {         this.batchedCount.add(batchPoints.getPoints().size());         //Long currentTime = System.currentTimeMillis();         String lineProtocolStr = batchPoints.lineProtocol();         RequestBody lineProtocol = RequestBody.create(MEDIA_TYPE_STRING, lineProtocolStr);         //Long timeConsuming = System.currentTimeMillis() - currentTime;         //log.info("batchPoints to lineProtocol | time-consuming: {}ms, lineProtocol.length:{}", timeConsuming, lineProtocolStr.length() );         String db = batchPoints.getDatabase();         if (db == null) {             db = this.database;         }         execute(this.influxDBService.writePoints(                 db,                 batchPoints.getRetentionPolicy(),                 TimeUtil.toTimePrecision(batchPoints.getPrecision()),                 batchPoints.getConsistency().value(),                 lineProtocol             )         );     }      @Override     public void writeWithRetry(final BatchPoints batchPoints) {         if (isBatchEnabled()) {             batchProcessor.getBatchWriter().write(Collections.singleton(batchPoints));         } else {             write(batchPoints);         }     }      @Override     public void write(final String database, final String retentionPolicy, final ConsistencyLevel consistency,                       final TimeUnit precision, final String records) {         RequestBody requestBody = RequestBody.create(MEDIA_TYPE_STRING, records);          execute(             this.influxDBService.writePoints(                 database,                 retentionPolicy,                 TimeUtil.toTimePrecision(precision),                 consistency.value(),                 requestBody             )         );     }      @Override     public void write(final String database, final String retentionPolicy, final ConsistencyLevel consistency,                       final String records) {         write(database, retentionPolicy, consistency, TimeUnit.NANOSECONDS, records);     }      @Override     public void write(final String database, final String retentionPolicy, final ConsistencyLevel consistency,                       final List<String> records) {         write(database, retentionPolicy, consistency, TimeUnit.NANOSECONDS, records);     }       @Override     public void write(final String database, final String retentionPolicy, final ConsistencyLevel consistency,                       final TimeUnit precision, final List<String> records) {         write(database, retentionPolicy, consistency, precision, String.join("n", records));     }       /**      * {@inheritDoc}      */     @Override     public void write(final int udpPort, final String records) {         initialDatagramSocket();         byte[] bytes = records.getBytes(StandardCharsets.UTF_8);         try {             datagramSocket.send(new DatagramPacket(bytes, bytes.length, new InetSocketAddress(hostName, udpPort)));         } catch (IOException e) {             throw new InfluxDBIOException(e);         }     }      private void initialDatagramSocket() {         if (datagramSocket == null) {             synchronized (InfluxDBImpl.class) {                 if (datagramSocket == null) {                     try {                         datagramSocket = new DatagramSocket();                     } catch (SocketException e) {                         throw new InfluxDBIOException(e);                     }                 }             }         }     }      /**      * {@inheritDoc}      */     @Override     public void write(final int udpPort, final List<String> records) {         write(udpPort, String.join("n", records));     }      /**      * {@inheritDoc}      */     @Override     public QueryResult query(final Query query) {         return executeQuery(callQuery(query));     }      /**      * {@inheritDoc}      */     @Override     public void query(final Query query, final Consumer<QueryResult> onSuccess, final Consumer<Throwable> onFailure) {         final Call<QueryResult> call = callQuery(query);         call.enqueue(new Callback<QueryResult>() {             @Override             public void onResponse(final Call<QueryResult> call, final Response<QueryResult> response) {                 if (response.isSuccessful()) {                     onSuccess.accept(response.body());                 } else {                     Throwable t = null;                     String errorBody = null;                      try {                         if (response.errorBody() != null) {                             errorBody = response.errorBody().string();                         }                     } catch (IOException e) {                         t = e;                     }                      if (t != null) {                         onFailure.accept(new InfluxDBException(response.message(), t));                     } else if (errorBody != null) {                         onFailure.accept(new InfluxDBException(response.message() + " - " + errorBody));                     } else {                         onFailure.accept(new InfluxDBException(response.message()));                     }                 }             }              @Override             public void onFailure(final Call<QueryResult> call, final Throwable throwable) {                 onFailure.accept(throwable);             }         });     }      /**      * {@inheritDoc}      */     @Override     public void query(final Query query, final int chunkSize, final Consumer<QueryResult> onNext) {         query(query, chunkSize, onNext, () -> { });     }      /**      * {@inheritDoc}      */     @Override     public void query(final Query query, final int chunkSize, final BiConsumer<Cancellable, QueryResult> onNext) {         query(query, chunkSize, onNext, () -> { });     }      /**      * {@inheritDoc}      */     @Override     public void query(final Query query, final int chunkSize, final Consumer<QueryResult> onNext,                       final Runnable onComplete) {         query(query, chunkSize, (cancellable, queryResult) -> onNext.accept(queryResult), onComplete);     }      @Override     public void query(final Query query, final int chunkSize, final BiConsumer<Cancellable, QueryResult> onNext,                       final Runnable onComplete) {         query(query, chunkSize, onNext, onComplete, null);     }      /**      * {@inheritDoc}      */     @Override     public void query(final Query query, final int chunkSize, final BiConsumer<Cancellable, QueryResult> onNext,                       final Runnable onComplete, final Consumer<Throwable> onFailure) {         Call<ResponseBody> call;         if (query instanceof BoundParameterQuery) {             BoundParameterQuery boundParameterQuery = (BoundParameterQuery) query;             call = this.influxDBService.query(getDatabase(query), query.getCommandWithUrlEncoded(), chunkSize,                     boundParameterQuery.getParameterJsonWithUrlEncoded());         } else {             if (query.requiresPost()) {                 call = this.influxDBService.query(getDatabase(query), query.getCommandWithUrlEncoded(), chunkSize, null);             } else {                 call = this.influxDBService.query(getDatabase(query), query.getCommandWithUrlEncoded(), chunkSize);             }         }          call.enqueue(new Callback<ResponseBody>() {             @Override             public void onResponse(final Call<ResponseBody> call, final Response<ResponseBody> response) {                  Cancellable cancellable = new Cancellable() {                     @Override                     public void cancel() {                         call.cancel();                     }                      @Override                     public boolean isCanceled() {                         return call.isCanceled();                     }                 };                  try {                     if (response.isSuccessful()) {                         ResponseBody chunkedBody = response.body();                         chunkProccesor.process(chunkedBody, cancellable, onNext, onComplete);                     } else {                         // REVIEW: must be handled consistently with IOException.                         ResponseBody errorBody = response.errorBody();                         if (errorBody != null) {                             InfluxDBException influxDBException = new InfluxDBException(errorBody.string());                             if (onFailure == null) {                                 throw influxDBException;                             } else {                                 onFailure.accept(influxDBException);                             }                         }                     }                 } catch (IOException e) {                     QueryResult queryResult = new QueryResult();                     queryResult.setError(e.toString());                     onNext.accept(cancellable, queryResult);                     //passing null onFailure consumer is here for backward compatibility                     //where the empty queryResult containing error is propagating into onNext consumer                     if (onFailure != null) {                         onFailure.accept(e);                     }                 } catch (Exception e) {                     call.cancel();                     if (onFailure != null) {                         onFailure.accept(e);                     }                 }              }              @Override             public void onFailure(final Call<ResponseBody> call, final Throwable t) {                 if (onFailure == null) {                     throw new InfluxDBException(t);                 } else {                     onFailure.accept(t);                 }             }         });     }      /**      * {@inheritDoc}      */     @Override     public QueryResult query(final Query query, final TimeUnit timeUnit) {         Call<QueryResult> call;         if (query instanceof BoundParameterQuery) {             BoundParameterQuery boundParameterQuery = (BoundParameterQuery) query;             call = this.influxDBService.query(getDatabase(query),                     TimeUtil.toTimePrecision(timeUnit), query.getCommandWithUrlEncoded(),                     boundParameterQuery.getParameterJsonWithUrlEncoded());         } else {             if (query.requiresPost()) {                 call = this.influxDBService.query(getDatabase(query),                         TimeUtil.toTimePrecision(timeUnit), query.getCommandWithUrlEncoded(), null);             } else {                 call = this.influxDBService.query(getDatabase(query),                         TimeUtil.toTimePrecision(timeUnit), query.getCommandWithUrlEncoded());             }         }         return executeQuery(call);     }      /**      * {@inheritDoc}      */     @Override     public void createDatabase(final String name) {         Preconditions.checkNonEmptyString(name, "name");         String createDatabaseQueryString = String.format("CREATE DATABASE "%s"", name);         executeQuery(this.influxDBService.postQuery(Query.encode(createDatabaseQueryString)));     }      /**      * {@inheritDoc}      */     @Override     public void deleteDatabase(final String name) {         executeQuery(this.influxDBService.postQuery(Query.encode("DROP DATABASE "" + name + """)));     }      /**      * {@inheritDoc}      */     @Override     public List<String> describeDatabases() {         QueryResult result = executeQuery(this.influxDBService.postQuery(SHOW_DATABASE_COMMAND_ENCODED));         // {"results":[{"series":[{"name":"databases","columns":["name"],"values":[["mydb"]]}]}]}         // Series [name=databases, columns=[name], values=[[mydb], [unittest_1433605300968]]]         List<List<Object>> databaseNames = result.getResults().get(0).getSeries().get(0).getValues();         List<String> databases = new ArrayList<>();         if (databaseNames != null) {             for (List<Object> database : databaseNames) {                 databases.add(database.get(0).toString());             }         }         return databases;     }      /**      * {@inheritDoc}      */     @Override     public boolean databaseExists(final String name) {         List<String> databases = this.describeDatabases();         for (String databaseName : databases) {             if (databaseName.trim().equals(name)) {                 return true;             }         }         return false;     }      /**      * Calls the influxDBService for the query.      */     private Call<QueryResult> callQuery(final Query query) {         Call<QueryResult> call;         if (query instanceof BoundParameterQuery) {             BoundParameterQuery boundParameterQuery = (BoundParameterQuery) query;             call = this.influxDBService.postQuery(getDatabase(query), query.getCommandWithUrlEncoded(),                     boundParameterQuery.getParameterJsonWithUrlEncoded());         } else {             if (query.requiresPost()) {                 call = this.influxDBService.postQuery(getDatabase(query), query.getCommandWithUrlEncoded());             } else {                 call = this.influxDBService.query(getDatabase(query), query.getCommandWithUrlEncoded());             }         }         return call;     }      static class ErrorMessage {         public String error;     }      private boolean checkMessagePackSupport() {         Matcher matcher = Pattern.compile("(\d+\.*)+").matcher(version());         if (!matcher.find()) {             return false;         }         String s = matcher.group();         String[] versionNumbers = s.split("\.");         final int major = Integer.parseInt(versionNumbers[0]);         final int minor = Integer.parseInt(versionNumbers[1]);         final int fromMinor = 4;         return (major >= 2) || ((major == 1) && (minor >= fromMinor));     }      private QueryResult executeQuery(final Call<QueryResult> call) {         if (messagePack) {             if (messagePackSupport == null) {                 messagePackSupport = checkMessagePackSupport();             }              if (!messagePackSupport) {                 throw new UnsupportedOperationException(                         "MessagePack format is only supported from InfluxDB version 1.4 and later");             }         }         return execute(call);     }      private <T> T execute(final Call<T> call) {         try {             Response<T> response = call.execute();             if (response.isSuccessful()) {                 return response.body();             }             try (ResponseBody errorBody = response.errorBody()) {                 if (messagePack) {                     throw InfluxDBException.buildExceptionForErrorState(errorBody.byteStream());                 } else {                     throw InfluxDBException.buildExceptionForErrorState(errorBody.string());                 }             }         } catch (IOException e) {             throw new InfluxDBIOException(e);         }     }      /**      * {@inheritDoc}      */     @Override     public void flush() {         if (!batchEnabled.get()) {             throw new IllegalStateException("BatchProcessing is not enabled.");         }         batchProcessor.flush();     }      /**      * {@inheritDoc}      */     @Override     public void close() {         try {             this.disableBatch();         } finally {             if (datagramSocket != null && !datagramSocket.isClosed()) {                 datagramSocket.close();             }         }         this.client.dispatcher().executorService().shutdown();         this.client.connectionPool().evictAll();     }      @Override     public InfluxDB setConsistency(final ConsistencyLevel consistency) {         this.consistency = consistency;         return this;     }      @Override     public InfluxDB setDatabase(final String database) {         this.database = database;         return this;     }      @Override     public InfluxDB setRetentionPolicy(final String retentionPolicy) {         this.retentionPolicy = retentionPolicy;         return this;     }      /**      * {@inheritDoc}      */     @Override     public void createRetentionPolicy(final String rpName, final String database, final String duration,                                       final String shardDuration, final int replicationFactor, final boolean isDefault) {         Preconditions.checkNonEmptyString(rpName, "retentionPolicyName");         Preconditions.checkNonEmptyString(database, "database");         Preconditions.checkNonEmptyString(duration, "retentionDuration");         Preconditions.checkDuration(duration, "retentionDuration");         if (shardDuration != null && !shardDuration.isEmpty()) {             Preconditions.checkDuration(shardDuration, "shardDuration");         }         Preconditions.checkPositiveNumber(replicationFactor, "replicationFactor");          StringBuilder queryBuilder = new StringBuilder("CREATE RETENTION POLICY "");         queryBuilder.append(rpName)                 .append("" ON "")                 .append(database)                 .append("" DURATION ")                 .append(duration)                 .append(" REPLICATION ")                 .append(replicationFactor);         if (shardDuration != null && !shardDuration.isEmpty()) {             queryBuilder.append(" SHARD DURATION ");             queryBuilder.append(shardDuration);         }         if (isDefault) {             queryBuilder.append(" DEFAULT");         }         executeQuery(this.influxDBService.postQuery(Query.encode(queryBuilder.toString())));     }      /**      * {@inheritDoc}      */     @Override     public void createRetentionPolicy(final String rpName, final String database, final String duration,                                       final int replicationFactor, final boolean isDefault) {         createRetentionPolicy(rpName, database, duration, null, replicationFactor, isDefault);     }      /**      * {@inheritDoc}      */     @Override     public void createRetentionPolicy(final String rpName, final String database, final String duration,                                       final String shardDuration, final int replicationFactor) {         createRetentionPolicy(rpName, database, duration, null, replicationFactor, false);     }      /**      * {@inheritDoc}      * @param rpName the name of the retentionPolicy      * @param database the name of the database      */     @Override     public void dropRetentionPolicy(final String rpName, final String database) {         Preconditions.checkNonEmptyString(rpName, "retentionPolicyName");         Preconditions.checkNonEmptyString(database, "database");         StringBuilder queryBuilder = new StringBuilder("DROP RETENTION POLICY "");         queryBuilder.append(rpName)                 .append("" ON "")                 .append(database)                 .append(""");         executeQuery(this.influxDBService.postQuery(Query.encode(queryBuilder.toString())));     }      private String getDatabase(final Query query) {         String db = query.getDatabase();         if (db == null) {             return this.database;         }         return db;     }      private interface ChunkProccesor {         void process(ResponseBody chunkedBody, Cancellable cancellable,                      BiConsumer<Cancellable, QueryResult> consumer, Runnable onComplete) throws IOException;     }      private class MessagePackChunkProccesor implements ChunkProccesor {         @Override         public void process(final ResponseBody chunkedBody, final Cancellable cancellable,                             final BiConsumer<Cancellable, QueryResult> consumer, final Runnable onComplete)                 throws IOException {             MessagePackTraverser traverser = new MessagePackTraverser();             try (InputStream is = chunkedBody.byteStream()) {                 for (Iterator<QueryResult> it = traverser.traverse(is).iterator(); it.hasNext() && !cancellable.isCanceled();) {                     QueryResult result = it.next();                     consumer.accept(cancellable, result);                 }             }             if (!cancellable.isCanceled()) {                 onComplete.run();             }         }     }      private class JSONChunkProccesor implements ChunkProccesor {         private JsonAdapter<QueryResult> adapter;          public JSONChunkProccesor(final JsonAdapter<QueryResult> adapter) {             this.adapter = adapter;         }          @Override         public void process(final ResponseBody chunkedBody, final Cancellable cancellable,                             final BiConsumer<Cancellable, QueryResult> consumer, final Runnable onComplete)                 throws IOException {             try {                 BufferedSource source = chunkedBody.source();                 while (!cancellable.isCanceled()) {                     QueryResult result = adapter.fromJson(source);                     if (result != null) {                         consumer.accept(cancellable, result);                     }                 }             } catch (EOFException e) {                 QueryResult queryResult = new QueryResult();                 queryResult.setError("DONE");                 consumer.accept(cancellable, queryResult);                 if (!cancellable.isCanceled()) {                     onComplete.run();                 }             } finally {                 chunkedBody.close();             }         }     } }  

Y 推荐文献

  • Influxdb

X 参考文献

发表评论

评论已关闭。

相关文章

当前内容话题