商品中心—5.商品消息处理系统的技术文档

 

大纲

1.商品消息处理系统

(1)商品消息处理系统架构设计

(2)商品中心内部需消费的消息

(3)商品中心外部需消费的消息

(4)消息处理相关表

(5)消息处理流程设计

2.消费和处理binlog消息

(1)消费Canal发送到MQ的binlog消息

(2)解析MQ消息字符串成binlog对象

(3)处理binlog消息

(4)自研缓存组件Redis + DB读写实现

(5)将binlog对象封装为数据变更对象

(6)根据数据变更对象组装成内部消息对象并发送

(7)如果存在外部消息配置则保存数据变更对象详情

(8)消息编号的监听处理 + 发送外部消息

 

1.商品消息处理系统

(1)商品消息处理系统架构设计

(2)商品中心内部需要消费的消息

(3)商品中心外部需要消费的消息

(4)消息处理相关表

(5)消息处理流程设计

 

(1)商品消息处理系统架构设计

商品中心—5.商品消息处理系统的技术文档

(2)商品系统内部需要消费的消息

一.item_info表变更

topic:data_change_topic

MQ消息体:

{     "action": "UPDATE",//执⾏发送消息的触发动作:INSERT、UPDATE、DELETE     "table": "item_info",//更新表的表名     "updateColumn": [         "item_status"//item_info表item_status变更     ],     "data": [{         "column": "id",//item_info表主键id         "value": 1     }, {         "column": "itemId",//item_info表item_id         "value": "100000476748"     }] }

二.sku_info表变更

topic:data_change_topic

MQ消息体:

{     "action": "UPDATE",//执⾏发送消息的触发动作:INSERT、UPDATE、DELETE     "table": "sku_info",//更新表的表名     "updateColumn": [         "sku_name"//sku_info表sku_name变更     ],     "data": [{         "column": "id",//sku_info表主键id         "value": 1     }, {         "column": "itemId",//sku_info表item_id         "value": "100000476748"     }, {         "column": "skuId",//sku_info表sku_id         "value": "8000476872"     }] }

三.attribute_extend表变更

topic:data_change_topic

MQ消息体:

{     "action": "INSERT",//执⾏发送消息的触发动作:INSERT、UPDATE、DELETE     "table": "attribute_extend",//更新表的表名     "data": [{         "column": "id",//attribute_extend表主键id         "value": 1     }] }

四.front_category_relation表变更

topic:data_change_topic

MQ消息体:

{     "action": "INSERT",//执⾏发送消息的触发动作:INSERT、UPDATE、DELETE     "table": "front_category_relation",//更新表的表名     "data": [{         "column": "id",//front_category_relation表主键id         "value": 1     }] }

五.sku_seller_relation表变更

topic:data_change_topic

MQ消息体:

{     "action": "INSERT",//执⾏发送消息的触发动作:INSERT、UPDATE、DELETE     "table": "sku_seller_relation",//更新表的表名     "data": [{         "column": "id",//sku_seller_relation表主键id         "value": 1     }] }

六.item_period_stage表变更

topic:interior_item_expri_result_topic

MQ消息体:

{     "action": "update",//执⾏发送消息的触发动作:INSERT、UPDATE、DELETE     "table": "item_period_stage",//更新表的表名     "data": [{         "column": "period_stage",//sku_seller_relation表主键id         "value": 2     }] }

(3)商品系统外部需要消费的消息

新建或编辑商品发送MQ消息

topic:open_product_topic

MQ消息体:

{     "action": "INSERT", //执⾏发送消息的触发动作:INSERT、UPDATE、DELETE     "data": {         "itemId": "100000476748",         "itemStatus": 3,         "skuId": "8000476872"     } }

(4)消息处理相关表

一.数据变更监听表

数据变更监听表中配置哪些表需要监听。

create table data_change_listen_config (     id int unsigned auto_increment comment '主键' primary key,     table_name varchar(40) null comment '数据表名称',     key_column varchar(40) default 'id' null comment '数据表对应的主键或业务id列名',     filter_flag tinyint(1) null comment '是否过滤',     del_flag tinyint(1) null comment '是否删除',     create_user int null comment '创建⼈',     create_time datetime null comment '创建时间',     update_user int null comment '更新⼈',     update_time datetime null comment '更新时间' ) comment '数据变更监听表';//一个表只有一条记录

二.监听表变化字段配置表

监听表变化字段配置表中配置哪些字段变更需要监听,只要满⾜其中配置的有⼀个字段值变更,就需要发送内部消息通知订阅⽅。

create table data_change_column_config (     id int unsigned auto_increment primary key,     listen_id int null comment '监听表id',     listen_column varchar(40) null comment '监听字段',     del_flag tinyint(1) null comment '删除标记',     create_user int null comment '创建⼈',     create_time datetime null comment '创建时间',     update_user int null comment '更新⼈',     update_time datetime null comment '更新时间' ) comment '监听表变化字段配置表';

三.监听表消息模型表

监听表消息模型表中配置的是数据变更后,消息是内部消息还是外部消息,消息发送的topic,消息延迟等级,消息需要通知的字段。

create table data_change_message_config (     id int unsigned null comment '主键',     listen_id int null comment '监听表id',     notify_column varchar(2000) null comment '变更通知字段,逗号分隔',     message_topic varchar(256) null comment '变更通知消息主题',     delay_level int null comment '延迟等级',     message_type tinyint(3) null comment '消息类型',     del_flag tinyint(1) null comment '删除标记',     create_user int null comment '创建⼈',     create_time datetime null comment '创建时间',     update_user int null comment '更新⼈',     update_time datetime null comment '更新时间' ) comment '监听表消息模型表';

四.外部消息记录表

外部消息记录表中记录的是内部消息发送后⽣成的消息编号,通过回调消息查找外部消息的消息内容。

create table data_message_detail (     id int unsigned auto_increment comment '主键' primary key,     message_no varchar(64) null comment '消息编号',     table_data_json text null comment '变化的表信息内容',     diff_data_arr varchar(2000) null comment '消息变化字段数组,多个,分割',     table_name varchar(64) null comment '更新表的表名 ',     action varchar(64) null comment '执⾏发送消息的触发动作:INSERT、UPDATE、DELETE ',     del_flag tinyint(1) null comment '删除标记',     create_user int null comment '创建⼈',     create_time datetime null comment '创建时间',     update_user int null comment '更新⼈',     update_time datetime null comment '更新时间' ) comment '外部消息记录表';

(5)消息处理流程设计

商品中心—5.商品消息处理系统的技术文档

 

2.消费和处理binlog消息

(1)消费Canal发送到MQ的binlog消息

(2)解析MQ消息字符串成binlog对象

(3)处理binlog消息

(4)自研缓存组件Redis + DB读写实现

(5)将binlog对象封装为数据变更对象

(6)根据数据变更对象组装成内部消息对象并发送

(7)如果存在外部消息配置则保存数据变更对象详情

(8)消息编号的监听处理 + 发送外部消息

 

(1)消费Canal发送到MQ的binlog消息

@Configuration public class ConsumerBeanConfig {     //配置内容对象     @Autowired     private RocketMQProperties rocketMQProperties;          //消费系统内部消息——处理binlog消息     @Bean("dataChangeTopic")     public DefaultMQPushConsumer createItemStageConsumer(DataChangeListener dataChangeListener) throws MQClientException {         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketMqConstant.DATA_CHANGE_CONSUMER_GROUP);         consumer.setNamesrvAddr(rocketMQProperties.getNameServer());         consumer.subscribe(RocketMqConstant.DATA_CHANGE_TOPIC, "*");         consumer.registerMessageListener(dataChangeListener);         consumer.start();         return consumer;     }     ... }  @Component public class DataChangeListener implements MessageListenerConcurrently {     @Autowired     private MessageService messageService;          @Override     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {         try {             for (MessageExt messageExt : list) {                 String msg = new String(messageExt.getBody());                 log.info("数据变更消息通知,消息内容:{}", msg);                    //获取binlog对象                 BinlogData binlogData = BinlogUtils.getBinlogData(msg);                 if (Objects.isNull(binlogData) || Objects.isNull(binlogData.getDataMap())) {                     return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;                 }                    //操作类型不是insert,delete,update的,不作处理                 String operateType = binlogData.getOperateType();                 if (!BinlogType.INSERT.getValue().equals(operateType)                         && !BinlogType.DELETE.getValue().equals(operateType)                         && !BinlogType.UPDATE.getValue().equals(operateType)) {                     return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;                 }                 //处理binlog消息                 messageService.processBinlogMessage(binlogData);             }         } catch (Exception e) {             log.error("consume error, 消费数据变更消息失败", e);             //本次消费失败,下次重新消费             return ConsumeConcurrentlyStatus.RECONSUME_LATER;         }         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;     } }

(2)解析MQ消息字符串成binlog对象

//MySQL的binlog对象 @Data public class BinlogData implements Serializable {     //binlog对应的表名     private String tableName;     //操作时间     private Long operateTime;     //操作类型     private String operateType;     //data节点转换成的Map,key对应的是bean里的属性名,value一律为字符串     private List<Map<String, Object>> dataMap;     //data节点转换成的Map,key对应的是bean里的属性名,value一律为字符串     private List<Map<String, Object>> oldMap; }  //MySQL binlog解析工具类 public abstract class BinlogUtils {     //解析binlog json字符串     public static BinlogData getBinlogData(String binlogStr) {         //isJson方法里面会判断字符串是不是为空,所以这里不需要重复判断         if (JSONUtil.isJson(binlogStr)) {             JSONObject binlogJson = JSONUtil.parseObj(binlogStr);             //不处理DDL的binlog,只处理数据变更             if (binlogJson.getBool("isDdl")) {                 return null;             }                          BinlogData binlogData = new BinlogData();             //表名             String tableName = binlogJson.getStr("table");             binlogData.setTableName(tableName);             //操作类型             String operateType = binlogJson.getStr("type");             binlogData.setOperateType(operateType);             //操作时间             Long operateTime = binlogJson.getLong("ts");             binlogData.setOperateTime(operateTime);                //data数据             JSONArray dataArray = binlogJson.getJSONArray("data");             List<Map<String, Object>> dataMap = jsonArrayToMapList(dataArray);             binlogData.setDataMap(dataMap);             if (!binlogJson.isNull("old")) {                 //old数据                 JSONArray oldArray = binlogJson.getJSONArray("old");                 List<Map<String, Object>> oldMap = jsonArrayToMapList(oldArray);                 binlogData.setOldMap(oldMap);             }             return binlogData;         }         return null;     }          private static List<Map<String, Object>> jsonArrayToMapList(JSONArray jsonArray) {         if (null != jsonArray) {             Iterable<JSONObject> arrayIterator = jsonArray.jsonIter();             //遍历data节点或old节点并返回Map             if (null != arrayIterator) {                 //binlog的data数组或old数组里数据的类型为Map                 List<Map<String, Object>> dataMap = new ArrayList<>();                 while (arrayIterator.iterator().hasNext()) {                     JSONObject jsonObject = arrayIterator.iterator().next();                     Map<String, Object> data = new HashMap<>(jsonObject.size());                     jsonObject.keySet().forEach(key -> {                         data.put(key, jsonObject.get(key));                     });                     dataMap.add(data);                 }                 return dataMap;             }         }         return null;     } }

(3)处理binlog消息

步骤一:通过缓存组件获取当前表的配置监听信息

步骤二:将Binlog对象封装为数据变更对象

步骤三:通过缓存组件获取配置的消息模型对象

步骤四:组装需要发送的数据变更消息对象

步骤五:发送内部消息

@Service public class MessageServiceImpl implements MessageService {     ...     //处理binlog消息     @Override     public void processBinlogMessage(BinlogData binlogData) {         //1.通过缓存组件获取当前表的配置监听信息         DataChangeListenConfigDO listenConfigDO = dataChangeRepository.getListenConfigByTable(binlogData.getTableName());         //未配置监听信息的表,不作处理         if (Objects.isNull(listenConfigDO)) {             return;         }                  //2.将binlog对象封装为数据变更对象         List<DataChangeMessage> dataChangeMessages = getDataChangeMessage(binlogData, listenConfigDO);         //不需要监听,或者要监听的字段值未变动         if (CollectionUtils.isEmpty(dataChangeMessages)) {             return;         }                  //3.通过缓存组件获取配置的消息模型对象         List<DataChangeMessageConfigBO> messageConfigBOS = dataChangeRepository.getMessageConfigBOByListenId(listenConfigDO.getId());         //不需要发送消息         if (CollectionUtils.isEmpty(messageConfigBOS)) {             return;         }                  //4.组装需要发送的数据变更消息对象         List<DataSendMessageBO> sendDataMessageList = getInternalSendDataMessage(dataChangeMessages, binlogData.getDataMap(), messageConfigBOS);         //待发送的消息为空,无需处理         if (CollectionUtils.isEmpty(sendDataMessageList)) {             return;         }                  //5.发送内部消息         sendDataMessage(sendDataMessageList);                  //6.如果存在外部消息配置则保存数据变更对象详情         if (messageConfigBOS.stream().anyMatch(messageConfigBO -> MessageTypeEnum.EXTERNAL_MESSAGE.getCode().equals(messageConfigBO.getMessageType()))) {             //保存外部消息详细信息             saveDataMessageDetail(dataChangeMessages, binlogData);         }     }     ... }

(4)商品系统自研缓存组件Redis + DB读写实现

一.通过缓存组件获取监听的配置信息

二.缓存组件RedisReadWriteManager的实现

 

一.通过缓存组件获取监听的配置信息

@Repository public class DataChangeRepository {     @Resource     private RedisReadWriteManager redisReadWriteManager;     ...          //根据表名获取监听配置信息     public DataChangeListenConfigDO getListenConfigByTable(String tableName) {         Optional<DataChangeListenConfigDO> optional = redisReadWriteManager.getRedisStringDataByCache(             tableName,             DataChangeListenConfigDO.class,             AbstractRedisKeyConstants::getListenConfigStringKey,             this::getListenConfigByTableFromDB         );         return optional.orElse(null);     }          //获取监听变更字段配置表信息     public List<DataChangeColumnConfigDO> getColumnConfigByListenId(Long id) {         Optional<List<DataChangeColumnConfigDO>> optional = redisReadWriteManager.listRedisStringDataByCache(             id,             DataChangeColumnConfigDO.class,             AbstractRedisKeyConstants::getColumnConfigStringKey,             this::getColumnConfigByListenIdFromDB         );         return optional.orElse(null);     }          //获取监听表消息模型配置     public List<DataChangeMessageConfigDO> getMessageConfigByListenId(Long id, MessageTypeEnum messageTypeEnum) {         //获取监听表消息模型配置         Optional<List<DataChangeMessageConfigDO>> optional = redisReadWriteManager.listRedisStringDataByCache(             id,             DataChangeMessageConfigDO.class,             AbstractRedisKeyConstants::getMessageConfigStringKey,             this::getMessageConfigByListenIdFromDB         );                  //如果未指定是内部消息还是外部消息,则不需要过滤         if (Objects.isNull(messageTypeEnum)) {             return optional.orElse(null);         }         return optional.map(dataChangeMessageConfigDOS -> dataChangeMessageConfigDOS.stream()             .filter(messageConfigBO -> messageTypeEnum.getCode().equals(messageConfigBO.getMessageType()))             .collect(Collectors.toList())).orElse(null);     }          //根据表名获取监听配置信息     public Optional<DataChangeListenConfigDO> getListenConfigByTableFromDB(String tableName) {         LambdaQueryWrapper<DataChangeListenConfigDO> queryWrapper = Wrappers.lambdaQuery();         queryWrapper.eq(DataChangeListenConfigDO::getTableName, tableName).eq(BaseEntity::getDelFlag, DelFlagEnum.EFFECTIVE.getCode());         DataChangeListenConfigDO listenConfigDO = dataChangeListenConfigMapper.selectOne(queryWrapper);         return Objects.isNull(listenConfigDO) ? Optional.empty() : Optional.of(listenConfigDO);     }          //获取监听变更字段配置表信息     public Optional<List<DataChangeColumnConfigDO>> getColumnConfigByListenIdFromDB(Long id) {         LambdaQueryWrapper<DataChangeColumnConfigDO> queryWrapper = Wrappers.lambdaQuery();         queryWrapper.eq(DataChangeColumnConfigDO::getListenId, id).eq(BaseEntity::getDelFlag, DelFlagEnum.EFFECTIVE.getCode());         List<DataChangeColumnConfigDO> columnConfigDOS = dataChangeColumnConfigMapper.selectList(queryWrapper);         return CollectionUtils.isEmpty(columnConfigDOS) ? Optional.empty() : Optional.of(columnConfigDOS);     }          //查询数据变更对象列表     public Optional<List<DataChangeMessageConfigDO>> getMessageConfigByListenIdFromDB(Long id) {         LambdaQueryWrapper<DataChangeMessageConfigDO> queryWrapper = Wrappers.lambdaQuery();         queryWrapper.eq(DataChangeMessageConfigDO::getListenId, id).eq(BaseEntity::getDelFlag, DelFlagEnum.EFFECTIVE.getCode());         List<DataChangeMessageConfigDO> messageConfigDOS = dataChangeMessageConfigMapper.selectList(queryWrapper);         return CollectionUtils.isEmpty(messageConfigDOS) ? Optional.empty() : Optional.of(messageConfigDOS);     }     ... }

二.缓存组件RedisReadWriteManager的实现

//缓存读写管理 @Service public class RedisReadWriteManager {     @Resource     private RedisCache redisCache;          @Resource     private RedisLock redisLock;          //批量获取缓存数据     //@param key                 关键字列表     //@param clazz               需要将缓存JSON转换的对象     //@param getRedisKeyFunction 获取Redis key的方法     //@param getDbFunction       获取数据源对象的方法     //@return java.util.Optional<java.util.List<T>>     public <T> Optional<List<T>> listRedisStringDataByCache(Long key, Class<T> clazz, Function<Long, String> getRedisKeyFunction, Function<Long, Optional<List<T>>> getDbFunction) {         try {             String redisKey = getRedisKeyFunction.apply(key);             //过滤无效缓存             String cache = redisCache.get(redisKey);             if (EMPTY_OBJECT_STRING.equals(cache)) {                 return Optional.empty();             }             if (StringUtils.isNotBlank(cache)) {                 List<T> list = JSON.parseArray(cache, clazz);                 return Optional.of(list);             }             //缓存没有则读库             return listRedisStringDataByDb(key, getRedisKeyFunction, getDbFunction);         } catch (Exception e) {             log.error("获取缓存数据异常 key={},clazz={}", key, clazz, e);             throw e;         }     }          //读取数据库表数据赋值到Redis     public <T> Optional<List<T>> listRedisStringDataByDb(Long key, Function<Long, String> getRedisKeyFunction, Function<Long, Optional<List<T>>> getDbFunction) {         if (Objects.isNull(key) || Objects.isNull(getDbFunction)) {             return Optional.empty();         }         try {             if (!redisLock.lock(String.valueOf(key))) {                 return Optional.empty();             }             String redisKey = getRedisKeyFunction.apply(key);             Optional<List<T>> optional = getDbFunction.apply(key);             putCacheString(redisKey, optional);             return optional;         } finally {             redisLock.unlock(String.valueOf(key));         }     }          private void putCacheString(String redisKey, Optional optional) {         if (!optional.isPresent()) {             //把空对象暂存到Redis             redisCache.setex(redisKey, EMPTY_OBJECT_STRING, RedisKeyUtils.redisKeyRandomTime(INT_EXPIRED_ONE_DAY, TimeUnit.HOURS, NUMBER_24));             log.warn("发生缓存穿透 redisKey={}", redisKey);             return;         }         //把表数据对象存到Redis         redisCache.setex(redisKey, JSON.toJSONString(optional.get()), RedisKeyUtils.redisKeyRandomTime(INT_EXPIRED_SEVEN_DAYS));         log.info("表数据对象存到Redis redisKey={}, data={}", redisKey, JSON.toJSONString(optional.get()));     }          //批量获取缓存数据     //@param key                 关键字列表     //@param clazz               需要将缓存JSON转换的对象     //@param getRedisKeyFunction 获取redis key的方法     //@param getDbFunction       获取数据源对象的方法     //@return java.util.Optional<java.util.List < T>>     public <T> Optional<T> getRedisStringDataByCache(String key, Class<T> clazz, Function<String, String> getRedisKeyFunction, Function<String, Optional<T>> getDbFunction) {         try {             String redisKey = getRedisKeyFunction.apply(key);             String cache = redisCache.get(redisKey);             //过滤无效缓存             if (EMPTY_OBJECT_STRING.equals(cache)) {                 return Optional.empty();             }             if (StringUtils.isNotBlank(cache)) {                 T t = JSON.parseObject(cache, clazz);                 return Optional.of(t);             }             //缓存没有则读库             return getRedisStringDataByDb(key, getRedisKeyFunction, getDbFunction);         } catch (Exception e) {             log.error("获取缓存数据异常 key={},clazz={}", key, clazz, e);             throw e;         }     }          //读取数据库表数据赋值到Redis     public <T> Optional<T> getRedisStringDataByDb(String key, Function<String, String> getRedisKeyFunction, Function<String, Optional<T>> getDbFunction) {         if (StringUtils.isBlank(key) || Objects.isNull(getDbFunction)) {             return Optional.empty();         }         try {             if (!redisLock.lock(key)) {                 return Optional.empty();             }             String redisKey = getRedisKeyFunction.apply(key);             Optional<T> optional = getDbFunction.apply(key);             putCacheString(redisKey, optional);             return optional;         } finally {             redisLock.unlock(key);         }     } }  @Component public class RedisCache {     private RedisTemplate redisTemplate;          public RedisCache(RedisTemplate redisTemplate) {         this.redisTemplate = redisTemplate;     }     ...          //缓存获取     public String get(String key) {         ValueOperations<String, String> vo = redisTemplate.opsForValue();         return vo.get(key);     }          //缓存存储并设置过期时间     public void setex(String key, String value, long time) {         redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);     }     ... }

(5)将binlog对象封装为数据变更对象

@Service public class MessageServiceImpl implements MessageService {     ...     //处理binlog消息     @Override     public void processBinlogMessage(BinlogData binlogData) {         //1.通过缓存组件获取当前表的配置监听信息         DataChangeListenConfigDO listenConfigDO = dataChangeRepository.getListenConfigByTable(binlogData.getTableName());         //未配置监听信息的表,不作处理         if (Objects.isNull(listenConfigDO)) {             return;         }                  //2.将binlog对象封装为数据变更对象         List<DataChangeMessage> dataChangeMessages = getDataChangeMessage(binlogData, listenConfigDO);         //不需要监听,或者要监听的字段值未变动         if (CollectionUtils.isEmpty(dataChangeMessages)) {             return;         }                  //3.通过缓存组件获取配置的消息模型对象         List<DataChangeMessageConfigBO> messageConfigBOS = dataChangeRepository.getMessageConfigBOByListenId(listenConfigDO.getId());         //不需要发送消息         if (CollectionUtils.isEmpty(messageConfigBOS)) {             return;         }                  //4.组装需要发送的数据变更消息对象         List<DataSendMessageBO> sendDataMessageList = getInternalSendDataMessage(dataChangeMessages, binlogData.getDataMap(), messageConfigBOS);         //待发送的消息为空,无需处理         if (CollectionUtils.isEmpty(sendDataMessageList)) {             return;         }                  //5.发送内部消息         sendDataMessage(sendDataMessageList);                  //6.如果存在外部消息配置则保存数据变更消息对象详情         if (messageConfigBOS.stream().anyMatch(messageConfigBO -> MessageTypeEnum.EXTERNAL_MESSAGE.getCode().equals(messageConfigBO.getMessageType()))) {             //保存外部消息详细信息             saveDataMessageDetail(dataChangeMessages, binlogData);         }     }          //将binlog对象封装为数据变更对象     public List<DataChangeMessage> getDataChangeMessage(BinlogData binlogData, DataChangeListenConfigDO listenConfigDO) {         //获取监听变更字段配置表信息         List<DataChangeColumnConfigDO> columnConfigDOS = dataChangeRepository.getColumnConfigByListenId(listenConfigDO.getId());         //要监听的字段为空,不作处理         if (CollectionUtils.isEmpty(columnConfigDOS)) {             return null;         }         //封装数据变更对象         return buildChangeColumn(binlogData, columnConfigDOS, listenConfigDO);     }          //封装数据变更对象     private List<DataChangeMessage> buildChangeColumn(BinlogData binlogData, List<DataChangeColumnConfigDO> columnConfigDOS, DataChangeListenConfigDO listenConfigDO) {         List<DataChangeMessage> dataChangeMessages = new ArrayList<>();         //操作类型         String operateType = binlogData.getOperateType();         for (int i = 0; i < binlogData.getDataMap().size(); i++) {             Map<String, Object> data = binlogData.getDataMap().get(i);//新值             if (BinlogType.INSERT.getValue().equals(operateType) || BinlogType.DELETE.getValue().equals(operateType)) {                 //如果是新增或者删除,则所有监听字段都变更                 List<String> updateColumns = columnConfigDOS.stream().map(DataChangeColumnConfigDO::getListenColumn).collect(Collectors.toList());                 DataChangeMessage dataChangeMessage = buildDataChangeMessage(binlogData, updateColumns, data.get(listenConfigDO.getKeyColumn()));                 dataChangeMessages.add(dataChangeMessage);             } else {                 Map<String, Object> old = binlogData.getOldMap().get(i);//旧值                 List<String> updateColumns = new ArrayList<>();                 for (DataChangeColumnConfigDO columnConfigDO : columnConfigDOS) {                     String column = columnConfigDO.getListenColumn();                     Object columnOldValue = old.get(column);                     //旧的字段值有数据,就表示该字段变更了,添加至修改的字段集合                     if (!Objects.isNull(columnOldValue)) {                         updateColumns.add(column);                     }                 }                 //监听的字段有数据变更                 if (!CollectionUtils.isEmpty(updateColumns)) {                     DataChangeMessage dataChangeMessage = buildDataChangeMessage(binlogData, updateColumns, data.get(listenConfigDO.getKeyColumn()));                     dataChangeMessages.add(dataChangeMessage);                 }             }         }         return dataChangeMessages;     }          //构建数据变更对象     private DataChangeMessage buildDataChangeMessage(BinlogData binlogData, List<String> updateColumns, Object keyId) {         DataChangeMessage dataChangeMessage = new DataChangeMessage(binlogData.getOperateType(), binlogData.getTableName(), updateColumns);         dataChangeMessage.setMessageNo(SnowflakeIdWorker.getCode());//雪花算法设置消息编号         dataChangeMessage.setKeyId(keyId);         return dataChangeMessage;     }     ... }  //数据变更对象 @Data public class DataChangeMessage implements Serializable {     //内部消息编号     private String messageNo;     //操作行为,INSERT、UPDATE、DELETE     private String action;     //表名     private String tableName;     //主键或业务id     private Object keyId;     //变更的列     private List<String> updateColumns;     //唯一确定当前数据的字段以及字段值     private List<ColumnValue> columnValues;     //消息处理成功之后的回调topic     private String callbackTopic = RocketMqConstant.DATA_EXTERNAL_CHANGE_TOPIC;          @Data     @NoArgsConstructor     @AllArgsConstructor     public static class ColumnValue {         private String column;//列         private Object value;//值     }          public DataChangeMessage(String operateType, String tableName, List<String> updateColumns) {         this.action = operateType;         this.tableName = tableName;         this.updateColumns = updateColumns;     } }

(6)根据数据变更对象组装成内部消息对象并发送

@Service public class MessageServiceImpl implements MessageService {     ...     //组装需要发送的数据变更消息对象     public List<DataSendMessageBO> getInternalSendDataMessage(List<DataChangeMessage> dataChangeMessages, List<Map<String, Object>> dataMap, List<DataChangeMessageConfigBO> dataChangeMessageConfigBOS) {         List<DataSendMessageBO> dataSendMessageBOS = new ArrayList<>();         for (DataChangeMessageConfigBO messageConfigBO : dataChangeMessageConfigBOS) {             //不是内部消息的不处理             if (!MessageTypeEnum.INTERNAL_MESSAGE.getCode().equals(messageConfigBO.getMessageType())) {                 continue;             }             String notifyColumn = messageConfigBO.getNotifyColumn();             String[] columns = notifyColumn.split(CoreConstant.COMMA);             for (int i = 0; i < dataChangeMessages.size(); i++) {                 DataChangeMessage dataChangeMessage = dataChangeMessages.get(i);                 List<DataChangeMessage.ColumnValue> columnValues = new ArrayList<>();                 dataChangeMessage.setColumnValues(columnValues);                 Map<String, Object> data = dataMap.get(i);                 for (String column : columns) {                     columnValues.add(new DataChangeMessage.ColumnValue(column, data.get(column)));                 }                 dataSendMessageBOS.add(new DataSendMessageBO(messageConfigBO, dataChangeMessage));             }         }         return dataSendMessageBOS;     }          //发送内部消息     private void sendDataMessage(List<DataSendMessageBO> sendDataMessageList) {         for (DataSendMessageBO dataChangeMessage : sendDataMessageList) {             DataChangeMessage dataMessage = dataChangeMessage.getDataChangeMessage();             DataChangeMessageConfigBO dataChangeMessageConfigBO = dataChangeMessage.getDataChangeMessageConfigBO();             //发送一个延迟队列的消息出去             dataMessageProducer.send(dataMessage, dataChangeMessageConfigBO.getMessageTopic(), dataChangeMessageConfigBO.getDelayLevel());         }     }     ... }

(7)如果存在外部消息配置则保存数据变更对象详情

//消息业务实现类 @Service public class MessageServiceImpl implements MessageService {     @Resource     private DataChangeRepository dataChangeRepository;     ...          //处理binlog消息     @Override     public void processBinlogMessage(BinlogData binlogData) {         //获取当前表的监听信息         DataChangeListenConfigDO listenConfigDO = dataChangeRepository.getListenConfigByTable(binlogData.getTableName());         //未配置监听信息的表,不作处理         if (Objects.isNull(listenConfigDO)) {             return;         }            //获取数据变更对象列表,也就是将一条binlog数据转换成可能多个的数据变更对象         List<DataChangeMessage> dataChangeMessages = getDataChangeMessage(binlogData, listenConfigDO);         //不需要监听,或者要监听的字段值未变动         if (CollectionUtils.isEmpty(dataChangeMessages)) {             return;         }            //获取配置的消息对象         //对这个表的多条数据变更对象会封装成配置的消息对象,然后发送到RocketMQ的topic里         List<DataChangeMessageConfigBO> messageConfigBOS = dataChangeRepository.getMessageConfigBOByListenId(listenConfigDO.getId());         //不需要发送消息         if (CollectionUtils.isEmpty(messageConfigBOS)) {             return;         }            //封装成需要发送的消息对象         //对这个表的多条数据变更对象会封装成配置的消息对象,然后发送到RocketMQ的topic里         List<DataSendMessageBO> sendDataMessageList = getInternalSendDataMessage(dataChangeMessages, binlogData.getDataMap(), messageConfigBOS);         //待发送的消息为空,无需处理         if (CollectionUtils.isEmpty(sendDataMessageList)) {             return;         }            //发送消息         sendDataMessage(sendDataMessageList);            //配置的消息对象列表中,如果包含外部消息类型的消息对象,就需要保存         if (messageConfigBOS.stream().anyMatch(messageConfigBO -> MessageTypeEnum.EXTERNAL_MESSAGE.getCode().equals(messageConfigBO.getMessageType()))) {             //保存外部消息详细信息             saveDataMessageDetail(dataChangeMessages, binlogData);         }     }          //保存消息详细信息     public void saveDataMessageDetail(List<DataChangeMessage> dataChangeMessages, BinlogData binlogData) {         List<DataMessageBO> dataMessageBOS = converterDataMessageBOList(dataChangeMessages, binlogData);         dataChangeRepository.saveDataMessageDetail(dataMessageBOS);     }          //转换消息详细信息     private List<DataMessageBO> converterDataMessageBOList(List<DataChangeMessage> dataChangeMessages, BinlogData binlogData) {         List<DataMessageBO> dataMessageBOS = new ArrayList<>(dataChangeMessages.size());         for (int i = 0; i < dataChangeMessages.size(); i++) {             DataChangeMessage dataChangeMessage = dataChangeMessages.get(i);             DataMessageBO dataMessageBO = dataMessageConverter.converterBO(dataChangeMessage);             dataMessageBO.setDiffDataArr(String.join(CoreConstant.COMMA, dataChangeMessage.getUpdateColumns()));             dataMessageBO.setTableDataJson(JSON.toJSONString(binlogData.getDataMap().get(i)));             dataMessageBOS.add(dataMessageBO);         }         return dataMessageBOS;     }     ... }  @Repository public class DataChangeRepository {     ...     //存储外部消息的数据信息     public void saveDataMessageDetail(List<DataMessageBO> dataMessageBOS) {         List<DataMessageDetailDO> dataMessageDetailDOS = dataMessageConverter.converterDOList(dataMessageBOS);         int count = dataMessageDetailMapper.insertBatch(dataMessageDetailDOS);         if (count <= 0) {             throw new BaseBizException(CommonErrorCodeEnum.SQL_ERROR);         }     } }  //外部消息处理对象 @Data public class DataMessageBO implements Serializable {     //内部消息编号     private String messageNo;     //变化的表信息内容     private String tableDataJson;     //消息变化字段数组     private String diffDataArr;     //表名     private String tableName;     //操作类型     private String action; }

(8)消息编号的监听处理 + 发送外部消息

整个内部消息和外部消息的流程如下:

 

说明一:商品中心系统的开发人员会对商品中心的表配置内部消息对象。

 

说明二:非商品中心系统的开发人员会对商品中心的表配置外部消息对象。

 

说明三:商品中心的表发生变更后,binlog消息应该先被商品中心系统自己消费。

 

说明四:binlog消息被商品中心系统自己消费后,再继续被非商品中心系统消费。

 

说明五:当商品消息处理系统消费binlog消息时,会先获取配置的内部消息对象。然后根据配置的信息,将binlog消息对应的变更数据发送到指定topic。接着获取配置的外部消息对象,把这次发送的变更数据保存到数据库中。此过程中,发送到指定topic和保存到DB的变更数据会由消息编号来关联。

 

说明六:商品中心的系统消费完binlog消息对应的变更数据消息后,会将消息编号发送到MQ中以外部消息的形式由商品消息系统去消费处理。

 

说明七:当商品消息系统在消费外部消息时,首先会将消息编号提取出来,然后根据消息编号去数据库查询关联的binlog消息对应的变更数据。由于这些保存的变更数据已经配置好完整的外部消息,包括发到那些topic。所以接着可以把变更数据发送到非商品中心系统的开发人员配置的topic,从而实现非商品中心系统对商品中心的表的binlog变化的监听,而且严格保证了binlog消息的消费顺序:先商品中心->再非商品中心。发送消息后,最后便会把消息编号相关的变更消息数据从数据库中删除。

@Configuration public class ConsumerBeanConfig {     //配置内容对象     @Autowired     private RocketMQProperties rocketMQProperties;          //消费系统外部消息——处理业务消息     @Bean("dataExternalChangeTopic")     public DefaultMQPushConsumer dataExternalChangeTopic(DataExternalChangeListener dataExternalChangeListener) throws MQClientException {         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketMqConstant.DATA_EXTERNAL_CHANGE_CONSUMER_GROUP);         consumer.setNamesrvAddr(rocketMQProperties.getNameServer());         consumer.subscribe(RocketMqConstant.DATA_EXTERNAL_CHANGE_TOPIC, "*");         consumer.registerMessageListener(dataExternalChangeListener);         consumer.start();         return consumer;     } }  @Component public class DataExternalChangeListener implements MessageListenerConcurrently {     @Autowired     private MessageService messageService;          @Autowired     private DataMessageProducer dataMessageProducer;          @Autowired     private RedisLock redisLock;          @Override     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {         for (MessageExt messageExt : list) {             ...             try {                 DataMessageBO dataMessageDetail = messageService.getDataMessageDetail(messageNo);                 //未命中到外部消息的数据,默认不处理                 if (Objects.isNull(dataMessageDetail)) {                     return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;                 }                 //获取外部消息的报文对象                 List<DataSendMessageBO> sendDataMessageList = messageService.getSendDataMessage(dataMessageDetail);                 if (CollectionUtils.isEmpty(sendDataMessageList)) {                     return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;                 }                 //发送外部消息                 sendDataMessage(sendDataMessageList);                 //删除这条记录                 messageService.deleteMessage(dataMessageDetail);             } catch (Exception e) {                 log.error("consume error, 消费外部数据变更消息失败", e);                 //本次消费失败,下次重新消费                 return ConsumeConcurrentlyStatus.RECONSUME_LATER;             } finally {                 redisLock.unlock(messageNo);             }         }         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;     }          //发送外部的消息     private void sendDataMessage(List<DataSendMessageBO> sendDataMessageList) {         for (DataSendMessageBO dataChangeMessage : sendDataMessageList) {             DataChangeMessage dataMessage = dataChangeMessage.getDataChangeMessage();             DataChangeMessageConfigBO dataChangeMessageConfigBO = dataChangeMessage.getDataChangeMessageConfigBO();             //发送一个延迟队列的消息出去             dataMessageProducer.send(dataMessage, dataChangeMessageConfigBO.getMessageTopic(), dataChangeMessageConfigBO.getDelayLevel());         }     } }  @Service public class MessageServiceImpl implements MessageService {     ...     //获取对应的外部消息对象信息     @Override     public DataMessageBO getDataMessageDetail(String messageNo) {         DataMessageDetailDO dataMessageDetail = dataChangeRepository.getDataMessageDetail(messageNo);         return dataMessageConverter.converterBO(dataMessageDetail);     }          //获取需要发送的消息对象     @Override     public List<DataSendMessageBO> getSendDataMessage(DataMessageBO messageBO) {         List<DataSendMessageBO> dataChangeMessageConfig = dataChangeRepository.getDataChangeMessageConfig(messageBO);         return dataChangeMessageConfig;     }     ... }  @Repository public class DataChangeRepository {     ...     //获取某个消息对应的外部消息对象     public DataMessageDetailDO getDataMessageDetail(String messageNo) {         LambdaQueryWrapper<DataMessageDetailDO> queryWrapper = Wrappers.lambdaQuery();         queryWrapper.eq(DataMessageDetailDO::getMessageNo, messageNo);         return dataMessageDetailMapper.selectOne(queryWrapper);     }          //获取需要发送的外部消息对象     public List<DataSendMessageBO> getDataChangeMessageConfig(DataMessageBO dataMessageBO) {         List<DataSendMessageBO> dataChangeMessageList = new ArrayList<>();         DataChangeListenConfigDO dataChangeListenConfig = getDataChangeListenConfig(dataMessageBO.getTableName());         if (!Objects.isNull(dataChangeListenConfig)) {             //获取配置的需要发送的外部消息信息             List<DataChangeMessageConfigDO> dataChangeMessageConfigList = getMessageConfigByListenId(dataChangeListenConfig.getId(), MessageTypeEnum.EXTERNAL_MESSAGE);             if (!CollectionUtils.isEmpty(dataChangeMessageConfigList)) {                 DataSendMessageBO dataSendMessageBO = new DataSendMessageBO();                 JSONObject tableDataJson = JSONObject.parseObject(dataMessageBO.getTableDataJson());                 List<String> updateColumns = converterList(dataMessageBO.getDiffDataArr());                 for (DataChangeMessageConfigDO messageConfigDO : dataChangeMessageConfigList) {                     DataChangeMessage dataChangeMessage = dataMessageConverter.converter(dataMessageBO);                     dataChangeMessage.setUpdateColumns(updateColumns);                     //获取得到需要发送的字段信息                     String[] notifyColumnArr = messageConfigDO.getNotifyColumn().split(CoreConstant.COMMA);                     List<DataChangeMessage.ColumnValue> columnValueList = new ArrayList<>();                     for (String notifyColumn : notifyColumnArr) {                         columnValueList.add(new DataChangeMessage.ColumnValue(notifyColumn, tableDataJson.getString(notifyColumn)));                     }                     dataChangeMessage.setColumnValues(columnValueList);                     dataSendMessageBO.setDataChangeMessage(dataChangeMessage);                     dataSendMessageBO.setDataChangeMessageConfigBO(dataMessageConverter.converterBO(messageConfigDO));                     dataChangeMessageList.add(dataSendMessageBO);                 }             }         }         return dataChangeMessageList;     }          //获取对应的表配置信息     public DataChangeListenConfigDO getDataChangeListenConfig(String tableName) {         LambdaQueryWrapper<DataChangeListenConfigDO> queryWrapper = Wrappers.lambdaQuery();         queryWrapper.eq(DataChangeListenConfigDO::getTableName, tableName);         queryWrapper.eq(DataChangeListenConfigDO::getFilterFlag, DelFlagEnum.EFFECTIVE.getCode());         return dataChangeListenConfigMapper.selectOne(queryWrapper);     }          //获取监听表消息模型配置     public List<DataChangeMessageConfigDO> getMessageConfigByListenId(Long id, MessageTypeEnum messageTypeEnum) {         //获取监听表消息模型配置         Optional<List<DataChangeMessageConfigDO>> optional = redisReadWriteManager.listRedisStringDataByCache(             id,             DataChangeMessageConfigDO.class,             AbstractRedisKeyConstants::getMessageConfigStringKey,             this::getMessageConfigByListenIdFromDB         );         //如果未指定是内部消息还是外部消息,则不需要过滤         if (Objects.isNull(messageTypeEnum)) {             return optional.orElse(null);         }         return optional.map(dataChangeMessageConfigDOS -> dataChangeMessageConfigDOS.stream()             .filter(messageConfigBO -> messageTypeEnum.getCode().equals(messageConfigBO.getMessageType()))             .collect(Collectors.toList())).orElse(null);     }     ... }

 

发表评论

评论已关闭。

相关文章

当前内容话题
  • 0