Micrometer监控指标上报Starrocks

一、 背景与目标

监控最主要还是上报到Prometheus,可惜成本实在是高昂,特别是存储时间要求得越久,我们这里探索了下micrometer转换成json,然后存储到starrocks这类mpp olap引擎,通过starrocks的存算分离架构,降低成本。


二、 总体架构

Spring Boot + Micrometer --指标采集--> Kafka --消息流--> Flink --清洗/聚合/分流--> StarRocks (metric_data / metric_series) 

三、Spring Boot MeterRegister 发送至kafka

在此之前,我们上报Prometheus的时候,一般都是再Spring Boot中配置micrometer-registry-prometheus即可,这种方式主要是Pull模式,暴露/actuator/prometheus的指标,然后由Prometheus抓取。同时还有一种Step模式,即:定义一个周期T,周期结束时相关指标清零。

特性 Step(Delta)模式 Cumulative(累计)模式
值含义 当前时间窗的增量 从启动至今的总和
是否清零 是(周期切换后清零)
场景 Push 型采集系统 Pull 型采集系统
数据延迟影响 延迟会导致丢失当前周期数据 延迟只影响分辨率,不丢数据
计算速率方式 注册表已按步长计算 采集端函数计算
如果采用累计模式,存储到数据库里则是不断增加的值,如果要计算某一段分钟或者小时的值,则需要使用窗口函数,对数据库压力巨大。相比之下,直接在采集的时候就做好差分,对数据库压力很小,故Step步长模式更适合我们。

3.1 代码样例

先是定义export的配置,也可以做成通用的spring-boot-starter给别的服务用。

@Data   @Component   @ConfigurationProperties(prefix = "management.metrics.export.kafka")   public class KafkaMeterRegistryConfig implements StepRegistryConfig {       private String topic = "default_topic";       private Duration step = Duration.ofSeconds(30);          @Override       @NonNull    public String prefix() {           return "management.metrics.export.kafka";       }          @Override       public String get(@NotNull String key) {           return null;       }          @Override       @NonNull        public Duration step() {           return step;       }   } 

其次,通过扩展定义您自己的计量表注册表StepMeterRegistry,如下所示:

public class KafkaMeterRegistry extends StepMeterRegistry {       private final KafkaTemplate<String, String> kafkaTemplate;       private final String topic;          public KafkaMeterRegistry(KafkaMeterRegistryConfig config,                                 Clock clock,                                 KafkaTemplate<String, String> kafkaTemplate) {           super(config, clock);           this.kafkaTemplate = kafkaTemplate;           this.topic = config.getTopic();           start(new NamedThreadFactory("kafka-metrics-publisher"));              log.info("KafkaMeterRegistry created");          }          @Override       protected void publish() {           // 每个step结束后该方法被调用           long ts = System.currentTimeMillis();           for (Meter meter : this.getMeters()) {               //关键字:窗口切换               //不要在 publish() 里重复调用同一个 meter.measure() 多次,否则第一次调用就清零了,后面会拿到零值。               //不要依赖 publish() 之后还去别处用同一个 StepMeterRegistry 的 Meter 做跨周期累计,因为 step 运行就是为时间窗统计设计的,跨周期值会被清零。               for (Measurement ms : meter.measure()) {   	            ...准备数据                 kafkaTemplate.send(topic, jsonString);               }           }       }          @NotNull       @Override         protected TimeUnit getBaseTimeUnit() {           return TimeUnit.MILLISECONDS; // 发送数据时间单位       }   } 

最后,创建注册表配置和计量表注册表相关的BEAN。如果您使用 Spring Boot,可以按如下方式操作:

@Configuration   public class MetricsConfig {          @Bean       public KafkaMeterRegistry customMeterRegistry(KafkaMeterRegistryConfig kafkaMeterRegistryConfig,                                                     Clock clock,                                                     KafkaTemplate<String, String> kafkaTemplate) {           return new KafkaMeterRegistry(kafkaMeterRegistryConfig, clock, kafkaTemplate);       }      } 

3.2 底层如何实现Step模式

目前,核心疑问点在于是如何清零的?是否有丢数据的可能?
在 Micrometer 中,步长周期(Step Interval)的处理逻辑主要由 StepMeterRegistry 完成,这类 Registry(例如 DatadogMeterRegistryAtlasMeterRegistry)会确保每个时间窗口的指标值是该窗口的增量数据,而不是自应用启动以来的累计值。

Micrometer监控指标上报Starrocks

┌─ StepMeterRegistry.publish()  ← 定时任务   │   ├─ meter.measure()  ← 对 StepCounter 调用 measure   │   ├─ StepCounter.poll()   │    ├─ double v = count.get()   │    ├─ count.set(0.0)           ← 清零   │    ├─ lastStepTime.set(now)    ← 标记窗口开始时间   │    └─ 返回当前窗口的值给 publish   

简单的说,就是:

  1. StepMeterRegistry 定时(按 step 配置)调用 publish()
  2. publish() 会遍历所有的 meter,包括 Counter。
  3. 对 StepCounter 而言,poll() 会返回当前窗口增量并清零。
  4. 下一个步骤周期继续从 0 开始累加。

因此,清零是在采集周期触发,其条件是:

  • now - lastStepTime >= stepMillis

处理完后发送到kafka就是类似:

{   "ts": "2024-06-01 12:00:00.000",   "name": "http_server_requests_total_time",   "env": "prod",   "labels": { "uri": "/api/orders", "method": "POST" },   "value": 3.14159 }  

四、 存储方案对比

我们以一串json为例

{     "metricName": "cpu_usage",     "labels": {       "host": "server-01",       "region": "us-east"     },     "timestamp": "2025-10-15T08:01:12.123Z",     "value": 0.73   }      

直接可以对第一层key建表,metricName、timestamp、value都可以直接约定为列,目前问题的核心主要有两个:
(1)labels如何存储
(2)针对Histogram、Summary、Gauge的上报如何处理。
我们先来解决第一个问题,labels如何存储。

4.1 方案一:字符串拼接

如果维度比较少的情况下,确实可以考虑通过字符串截取来实现,比如上面的存储为server-01|us-east,通过split竖线来获取响应的字段,例如:

SPLIT_PART('server-01|us-east', '|', 1) AS host_value, SPLIT_PART('server-01|us-east', '|', 2) AS region_value 

这种如果labels多的情况下,需要自行约定字符串的顺序,而且数据量的情况下,SPLIT_PART性能消耗也比较大。

4.2 方案二:指标表 + 标签维表 + 倒排索引

flowchart LR A[Micrometer采集] --> B[Kafka队列] B --> C[Flink ETL] C --> D[StarRocks metric_data] C --> E[StarRocks metric_series] E -->|倒排索引| F[标签快速检索] D --> F

这种主要是想把指标和labels拆出来放到不同的两个表里,同时使用一个唯一id关联为一批数据,一开始有点豁然开朗,后来一想,(1)维度如果很多,但是只查一个维度的数据,这种关联不准确;(2)如果要关联查询,特别是维度多的情况下,sql复杂度翻好几倍,维护性大大降低。

4.3 方案三:Flat JSON 存储

考虑了一下简单粗暴的方法,labels直接存储为JSON列,刚好看到starrocks的新特性Flat JSON,Flat JSON的核心原理是在导入时检测JSON数据,并从JSON数据中提取常用字段,作为标准类型数据存储。在查询JSON时,这些常用字段优化了JSON的查询速度,刚好符合我们的条件。

CREATE TABLE monitor_data   (       ts      DATETIME COMMENT '指标时间',       name STRING COMMENT '指标名称',       env STRING COMMENT '环境',       dc STRING COMMENT '数据中心',       biz_id BIGINT COMMENT '业务 ID',       labels  JSON COMMENT '指标标签(JSON 格式)',       value   DOUBLE COMMENT '指标值'   )       ENGINE = OLAP DUPLICATE KEY(`ts`,`name`,`env`,`dc`,`biz_id`)   COMMENT "监控"   PARTITION BY date_trunc('day', ts)   DISTRIBUTED BY HASH(`name`,`env`,`dc`,`biz_id`)   ORDER BY (`name`,`ts`)   PROPERTIES (   "compression" = "LZ4",   "flat_json.enable" = "true", "fast_schema_evolution" = "false",   "partition_live_number" = "366",   "replicated_storage" = "true",   "replication_num" = "3"   ); 

五、Flink入库

starrocks是可以直接通过jdbc方式入库的,但是JDBC 适合低并发、小批量,容易出现性能问题,对于复杂多表、多源、字段更新场景,Kafka + Flink + StarRocks 是最安全高效的方案。具体也不展开讲解,这个配合下公司基建来即可。

六、入库后样例

  • 写入数据后展示:
ts name env dc biz_id labels value
2024-06-01 12:00:00 http_requests_count prod eu 0 3.14159
  • 最近 5 分钟各接口请求数(以 http_requests_count 为例,按 method/status 维度聚合):
SELECT   JSON_VALUE(labels, '$.method') AS method,   JSON_VALUE(labels, '$.status') AS status,   SUM(value) AS total_requests FROM monitor_data WHERE name = 'http_requests_count'   AND ts >= NOW() - INTERVAL 5 MINUTE   AND env = 'prod' GROUP BY method, status ORDER BY total_requests DESC; 

参考:

Custom Meter Registry

发表评论

您必须 [ 登录 ] 才能发表留言!

相关文章