RocketMQ实战—9.营销系统代码初版
技术分享
1个月前 (02-11)
0
999+
大纲
1.基于条件和画像筛选用户的业务分析和实现
2.全量用户促销活动数据模型分析以及创建操作
3.Producer和Consumer的工程代码实现
4.基于抽象工厂模式的消息推送实现
5.全量用户促销活动消息推送的流程和缺陷分析
6.全量用户促销活动推送引入MQ进行削峰
7.全量用户发优惠券业务流程实现
8.全量用户发优惠券引入MQ削峰
9.激活不活跃用户发券流程分析
10.推送系统对营销系统发起的推送任务的处理
11.立即推送模式的流程、缺陷和削峰
12.XXLJob驱动定时推送模式的运行原理
13.不活跃用户领取优惠券流程
14.热门商品根据用户画像定时推送以及MQ削峰
15.营销的四大业务场景MQ削峰方案经验总结
接下来实现营销系统四大促销场景的代码初版:全量用户推送促销活动、全量用户发放优惠券、特定用户推送领取优惠券消息、热门商品定时推送
1.基于条件和画像筛选用户的业务分析和实现
(1)获取全部⽤户信息接⼝
(2)根据条件获取⽤户信息接⼝
(3)查询画像匹配的用户接口
一般进行Push时,都是对那些活跃度比较低的用户进行推送活动消息,以激发他们登录APP来查看商品进行购物。会员系统需要提供给推送系统、运营系统如下需要支持分页的RPC接口。
@DubboService(version = "1.0.0", interfaceClass = AccountApi.class, retries = 0) public class AccountApiImpl implements AccountApi{ @Autowired private MembershipAccountService membershipAccountService; @Override public JsonResult<List<MembershipAccountDTO>> listAccount() { try { // RPC接口返回的数据就是DTO List<MembershipAccountDTO> accountDTOS = membershipAccountService.listAll(); return JsonResult.buildSuccess(accountDTOS); } catch (BaseBizException e) { return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg()); } catch (Exception e) { return JsonResult.buildError(e.getMessage()); } } @Override public JsonResult<List<MembershipAccountDTO>> listAccountByConditions(MembershipFilterDTO membershipFilterDTO) { try { List<MembershipAccountDTO> dtos = membershipAccountService.listAccountByConditions(membershipFilterDTO); return JsonResult.buildSuccess(dtos); } catch (BaseBizException e) { return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg()); } catch (Exception e) { return JsonResult.buildError(e.getMessage()); } } } @DubboService(version = "1.0.0", interfaceClass = MembershipPointApi.class, retries = 0) public class MembershipPointApiImpl implements MembershipPointApi { @Autowired private MembershipPointService membershipPointService; @Override public JsonResult<List<MembershipPointDTO>> listMembershipPointByConditions(MembershipFilterConditionDTO conditionDTO) { try { List<MembershipPointDTO> dtos = membershipPointService.listMembershipPointByConditions(conditionDTO); return JsonResult.buildSuccess(dtos); } catch (BaseBizException e) { return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg()); } catch (Exception e) { return JsonResult.buildError(e.getMessage()); } } }
(1)获取全部⽤户信息接⼝
com.demo.eshop.membership.api.impl.AccountApiImpl#listAccount()
用于获取所有⽤户数据信息,如果⽤户量超⼤,⼀般采取分⻚ + 多次调⽤的⽅式,以便当推送系统、运营系统发起全员活动和推送任务时可以查询全部账号。如果活动推送选择全部用户,此时这个接口就会被调用,需要提供分页支持实现一批一批的查询来推送。
从数据库查出来的是DO数据对象,DO对象和数据库表是一一对应的,在DAO层使用的。在Service层,会将DO对象转换为DTO对象,也就是数据传输对象,比如RPC调用返回的数据就是DTO。
@Service public class MembershipAccountServiceImpl implements MembershipAccountService { @Autowired private MembershipAccountDAO membershipAccountDAO; @Resource private MembershipAccountConverter membershipAccountConverter; @Autowired private MembershipFilterDAO membershipFilterDAO; //查询所有用户信息 @Override public List<MembershipAccountDTO> listAll() { //通过mybatis-plus框架自动生成的sql语句,select * from xx //从数据库查出来的是DO数据对象,DO对象和数据库表是一一对应的,在DAO层使用的 //在Service层,会将DO对象转换为DTO对象,也就是数据传输对象,比如RPC调用返回的数据就是DTO return membershipAccountConverter.listEntityToDTO(membershipAccountDAO.list()); } ... }
(2)根据条件获取⽤户信息接⼝
com.demo.eshop.membership.api.impl.AccountApiImpl#listAccountByConditions()
用于按照查询条件查询出用户数据信息,如果⽤户量超⼤⼀般采取的是分⻚⽅式、多次调⽤获取的⽅式。主要参数如下:
有些活动是针对不活跃的用户来推送的,以激发他们的活跃度。有些活动是针对活跃忠实用户来推送的,保持住他们对APP的忠诚度和粘度。
@Service public class MembershipAccountServiceImpl implements MembershipAccountService { @Autowired private MembershipAccountDAO membershipAccountDAO; @Resource private MembershipAccountConverter membershipAccountConverter; @Autowired private MembershipFilterDAO membershipFilterDAO; ... //根据条件查询用户 @Override public List<MembershipAccountDTO> listAccountByConditions(MembershipFilterDTO membershipFilterDTO) { //构造出查询条件 LambdaQueryWrapper<MembershipFilterDO> queryWrapper = buildQueryWrapper(membershipFilterDTO); //查询符合条件的用户,自动生成sql,select xx from xx where xx=xx and xx=xx List<MembershipFilterDO> membershipFilterDOs = membershipFilterDAO.list(queryWrapper); //从筛选出的记录中依次拿到accountId,去查询用户数据 return membershipFilterDOs.stream().map(membershipFilterDO -> { Long accountId = membershipFilterDO.getAccountId(); return membershipAccountConverter.entityToDTO(membershipAccountDAO.getById(accountId)); }).collect(Collectors.toList()); } //构造查询条件 private LambdaQueryWrapper<MembershipFilterDO> buildQueryWrapper(MembershipFilterDTO membershipFilterDTO) { //mybatis-plus这种框架做的一些封装 LambdaQueryWrapper<MembershipFilterDO> queryWrapper = Wrappers.lambdaQuery(); return queryWrapper .and(Objects.nonNull(membershipFilterDTO.getAccountType()), wrapper -> wrapper.eq(MembershipFilterDO::getAccountType, membershipFilterDTO.getAccountType())) .and(Objects.nonNull(membershipFilterDTO.getActiveCount()), wrapper -> wrapper.ge(MembershipFilterDO::getActiveCount, membershipFilterDTO.getActiveCount())) .and(Objects.nonNull(membershipFilterDTO.getMembershipLevel()), wrapper -> wrapper.ge(MembershipFilterDO::getMembershipLevel, membershipFilterDTO.getMembershipLevel())) .and(Objects.nonNull(membershipFilterDTO.getTotalActiveCount()), wrapper -> wrapper.ge(MembershipFilterDO::getTotalActiveCount, membershipFilterDTO.getTotalActiveCount())) .and(Objects.nonNull(membershipFilterDTO.getTotalActiveCount()), wrapper -> wrapper.ge(MembershipFilterDO::getTotalAmount, membershipFilterDTO.getTotalAmount())); } }
(3)查询画像匹配的用户接口
用户画像,就是每个用户都可以给他一些标签,比如王牌会员、吃货、男性、收入中等、喜欢看历史故事。根据用户在这个APP里的各种行为,浏览、购物、评论、活跃、登录、金额,通过一定的算法给用户去打上一些标签。这样该用户就有了一个在APP里的用户画像,user profile就是对一个用户的描述。当每个用户都有一个自己的用户画像后,在进行活动推送时,可以直接根据用户画像的标签来筛选用户。
针对某一类用户来进行活动推送,比如全场运动服饰要搞一个5折的清凉夏日活动。此时需要查找在用户画像标签里,对运动服饰浏览过/购买过/感兴趣的用户,进行专门推送专项活动。或者推测什么样的用户可能会喜欢运动类服饰,对用户画像里的多个标签进行组合,比如18~35之间、男性/女性这种群体。
@Service public class MembershipPointServiceImpl implements MembershipPointService { @Autowired private MembershipPointDAO membershipPointDAO; @Resource private MembershipPointConverter membershipPointConverter; @Override public List<MembershipPointDTO> listMembershipPointByConditions(MembershipFilterConditionDTO conditionDTO) { MembershipPointDO membershipPointDO = membershipPointConverter.dtoToEntity(conditionDTO); QueryWrapper<MembershipPointDO> wrapper = new QueryWrapper<>(); wrapper.setEntity(membershipPointDO); return membershipPointConverter.listEntityToDTO(membershipPointDAO.list(wrapper)); } } @Data @Builder public class MembershipFilterConditionDTO implements Serializable { //这里的用户画像标签就两个维度:会员等级和会员积分 private Integer memberLevel;//会员等级 -> 用户画像标签 private Long memberPoint;//会员积分 -> 用户画像标签 }
2.全量用户促销活动数据模型分析以及创建操作
(1)首先需要提供一个创建促销活动的HTTP接口
(2)然后提供一个创建促销活动的服务接口
假设运营人员需要创建一个促销活动:针对所有用户全场打8折,那么该促销活动会针对所有用户进行Push推送。
(1)首先需要提供一个创建促销活动的HTTP接口
运营人员对促销活动的创建通常是在Web界面里进行的,通过发送HTTP请求到Controller来处理,这个请求不是服务之间调用的RPC请求。
运营⼈员在维护⼀个促销活动时,需要通知所有⽤户这个活动的活动时间和活动规则,所以该HTTP接口的接收参数如下:
该HTTP接口的返回值如下:
用于处理HTTP请求的Controller接口如下:
com.demo.eshop.promotion.controlller.PromotionController#saveOrUpdatePromotion
@RestController @RequestMapping("/demo/promotion") public class PromotionController { @Autowired private PromotionService promotionService; //新增一个促销活动 @PostMapping public JsonResult<SaveOrUpdatePromotionDTO> saveOrUpdatePromotion(@RequestBody SaveOrUpdatePromotionRequest request){ try { SaveOrUpdatePromotionDTO saveOrUpdatePromotionDTO = promotionService.saveOrUpdatePromotion(request); return JsonResult.buildSuccess(saveOrUpdatePromotionDTO); } catch (BaseBizException e) { return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg()); } catch (Exception e) { return JsonResult.buildError(e.getMessage()); } } }
通过HTTP请求发送过来的请求对象如下:
//创建或更新促销活动 //这个对象里的数据,就是运营人员使用营销系统(运营管理后台)在Web界面创建促销活动时,录入的信息 @Data @Builder public class SaveOrUpdatePromotionRequest implements Serializable { //促销活动名称 private String name; //活动开始时间 private Date startTime; //活动结束时间 private Date endTime; //促销活动说明备注 private String remark; //活动状态:1启用,2停用 private Integer status; //活动类型:1满减,2折扣,3优惠券,4会员积分 private Integer type; //活动规则 private PromotionRulesValue rule; //活动规则 @Data @Builder public static class PromotionRulesValue implements Serializable { //规则名,1:满减,2:折扣,3:优惠券,4:会员积分 private String key; //规则值,其中key为条件,value为活动规则值 //例如: //满减规则,key=200,value=30,则代表满200减30 //折扣规则,key=200,value=0.95,则代表满200打0.95折 //优惠券规则,key=200,value=10,则代表满200送一张10元优惠券 //会员积分规则,key=200,value=100,则代表满200,额外送100积分 private Map<String,String> value; } //活动创建/修改人 private Integer createUser; }
(2)然后提供一个创建促销活动的服务接口
该接口不仅需要将促销活动实体对象入库,还需要发送MQ信息为所有用户推送促销信息。当营销系统有了一个促销活动后,而且该促销活动的状态还是启用的,那么就要立即触发对用户的推送。也就是说,创建一个促销活动就推送给所有的用户。
@Service public class PromotionServiceImpl implements PromotionService { //开启促销活动DAO @Autowired private SalesPromotionDAO salesPromotionDAO; //RocketMQ生产者 @Resource private DefaultProducer defaultProducer; @Resource private PromotionConverter promotionConverter; //会员服务 @DubboReference(version = "1.0.0") private AccountApi accountApi; //新增或修改一个运营活动 @Transactional(rollbackFor = Exception.class) @Override public SaveOrUpdatePromotionDTO saveOrUpdatePromotion(SaveOrUpdatePromotionRequest request) { //活动规则 String rule = JsonUtil.object2Json(request.getRule()); //构造促销活动实体 SalesPromotionDO salesPromotionDO = promotionConverter.convertPromotionDO(request); salesPromotionDO.setRule(rule); //促销活动实体落库 salesPromotionDAO.saveOrUpdatePromotion(salesPromotionDO); //当营销系统有了一个促销活动后,而且该促销活动的状态还是启用的,那么就要立即触发对用户的Push推送 //也就是创建一个促销活动就Push推送给所有的用户 //为所有用户推送促销活动,发MQ sendPlatformPromotionMessage(salesPromotionDO); //构造响应数据 SaveOrUpdatePromotionDTO dto = new SaveOrUpdatePromotionDTO(); dto.setName(request.getName()); dto.setType(request.getType()); dto.setRule(rule); dto.setCreateUser(request.getCreateUser()); dto.setSuccess(true); return dto; } //为所有用户发推送促销活动 private void sendPlatformPromotionMessage(SalesPromotionDO promotionDO) { //PlatformPromotionMessage表示的是发到MQ的电商平台促销活动消息 PlatformPromotionMessage message = PlatformPromotionMessage.builder() .promotionId(promotionDO.getId()) .promotionType(promotionDO.getType()) .mainMessage(promotionDO.getName()) .message("您已获得活动资格,打开APP进入活动页面") .informType(promotionDO.getType()) .build(); //获取所有账户信息,默认就是针对平台所有的用户进行促销活动的推送push JsonResult<List<MembershipAccountDTO>> jsonResult = accountApi.listAccount(); if (!jsonResult.getSuccess()) { throw new BaseBizException(jsonResult.getErrorCode(), jsonResult.getErrorMessage()); } // 循环发送消息,这个地方后面可以优化成线程池,因为消息并不需要顺序性,全部发出去即可 List<MembershipAccountDTO> accounts = jsonResult.getData(); for (MembershipAccountDTO membershipAccountDTO : accounts) { //修改同一个message数据对象的userId,对同一个数据对象设置不同的userId,避免每个用户都有一个自己的message数据对象 message.setUserAccountId(membershipAccountDTO.getId()); String msgJson = JsonUtil.object2Json(message); //消息推送时,使用的就是RocketMQ的Producer defaultProducer.sendMessage(RocketMqConstant.PLATFORM_PROMOTION_SEND_TOPIC, msgJson, "平台发放促销活动消息"); } } }
写入到数据库的促销活动实体对象如下:
//优惠活动实体类DO @Data @TableName("sales_promotion") public class SalesPromotionDO { //主键ID @TableId(value = "id", type = IdType.AUTO) private Integer id; //促销活动名称 private String name; //促销活动开始时间 private Date startTime; //促销活动结束时间 private Date endTime; //促销活动说明备注 private String remark; //促销活动状态,1:启用;2:停用 private Integer status; //促销活动类型,1满减,2折扣,3优惠券,4会员积分 private Integer type; //促销活动规则,已经由Map转换为JSON字符串 private String rule; private Integer createUser; private Date createTime; private Integer updateUser; private Date updateTime; }
发送到MQ的促销活动消息如下:
//平台促销活动消息,一个用户一条消息 @Data @Builder public class PlatformPromotionMessage implements Serializable{ //活动id private Integer promotionId; //主题,消息标题 private String mainMessage; //消息内容 private String message; //活动类型 private Integer promotionType; //消息类型,通知类型:立即通知,定时通知 private Integer informType; //用户id,每条促销活动消息,都是针对一个用户去进行推送的 private Long userAccountId; @Tolerate public PlatformPromotionMessage() { } }
3.Producer和Consumer的工程代码实现
(1)Producer工程代码实现
(2)Consumer工程代码实现
(1)Producer工程代码实现
一.RocketMQ配置信息处理
RocketMQProperties使用了Spring的注解@ConfigurationProperties,该注解会把标记的这个类实例化成一个Bean。RocketMQProperties这个Bean主要会设置在application.yml自定义的一些配置数据。
那么会放哪些配置数据呢?其实会根据@ConfigurationProperties注解里指定的前缀prefix,把前缀对应的配置数据从配置文件里加载出来,注入到RocketMQProperties这个Bean里。也就是会把application.yml里前缀为rocketmq下的name-server值,设置到RocketMQProperties的nameServer属性里。
@ConfigurationProperties(prefix = "rocketmq") public class RocketMQProperties { private String nameServer; public String getNameServer() { return nameServer; } public void setNameServer(String nameServer) { this.nameServer = nameServer; } }
二.自定义MQ的Producer
DefaultProducer是自定义的MQ Producer,它是Spring的一个Bean。对Spring Bean组件的一个方法添加@Autowired进行注解后,在Spring容器进行初始化时,Spring容器会根据方法入参类型,把需要的Bean给注入进来。所以这里的DefaultProducer在初始化时需要的入参RocketMQProperties,会在Spring容器初始化时给注入进来,而且RocketMQProperties也要是一个Spring Bean才能被注入到DefaultProducer中。
RocketMQ配置数据Bean——RocketMQProperties,会经历:加载application.yml里的配置 -> 配置值注入到RocketMQProperties这个Bean实例 -> RocketMQProperties这个Bean实例纳入Spring容器管理。
而Spring容器在初始化DefaultProducer这个Bean实例时,看到需要注入RocketMQ配置数据Bean—RocketMQProperties,便会进行注入。
@Component public class DefaultProducer { private final TransactionMQProducer producer; @Autowired public DefaultProducer(RocketMQProperties rocketMQProperties) { //通过RocketMQ API构建一个producer对象实例,设置配置的nameServer地址 producer = new TransactionMQProducer(RocketMqConstant.PUSH_DEFAULT_PRODUCER_GROUP); producer.setNamesrvAddr(rocketMQProperties.getNameServer()); start(); } ... //发送消息 public void sendMessage(String topic, String message, Integer delayTimeLevel, String type) { //在进行RocketMQ消息发送时,会把topic、message内容(bytes数组)封装成Message对象 Message msg = new Message(topic, message.getBytes(StandardCharsets.UTF_8)); try { //是否进行延迟消息的发送 if (delayTimeLevel > 0) { msg.setDelayTimeLevel(delayTimeLevel); } //基于RocketMQ Producer API发送消息 SendResult send = producer.send(msg); if (SendStatus.SEND_OK == send.getSendStatus()) { log.info("发送MQ消息成功, type:{}, message:{}", type, message); } else { throw new BaseBizException(send.getSendStatus().toString()); } } catch (Exception e) { log.error("发送MQ消息失败:", e); throw new BaseBizException("消息发送失败"); } } ... }
三.使用自定义的MQ Producer进行消息发送
@Service public class PromotionServiceImpl implements PromotionService { ... //RocketMQ生产者 @Resource private DefaultProducer defaultProducer; ... //为所有用户发推送促销活动 private void sendPlatformPromotionMessage(SalesPromotionDO promotionDO) { ... //消息推送时,使用的就是RocketMQ的Producer defaultProducer.sendMessage(RocketMqConstant.PLATFORM_PROMOTION_SEND_TOPIC, msgJson, "平台发放促销活动消息"); ... } ... }
(2)Consumer工程代码实现
当前的版本中,当营销系统创建一个促销活动后,会查询和遍历所有用户,给每个用户创建创建一条推送消息发送到MQ中。然后推送系统需要监听和消费这些推送到MQ的消息,发起真正的推送。
RocketMQ消费者的工程代码实现步骤如下:
步骤一:首先准备一个RocketMQ配置数据的Bean,即RocketMQProperties
步骤二:然后准备一个@Configuration标记的Bean,这个Bean被Spring接管时会将RocketMQ配置数据的Bean注入进去
步骤三:在@Configuration标记的Bean里,通过不同的被@Bean注解标记方法定义一个RocketMQ消费者Bean
RocketMQ的消费者工程实现,通常是在工程代码里加入一个@Configuration注解。ConsumerBeanConfig类一旦加了这个Spring提供的注解,那么系统启动时,这个类就会被Spring容器接管(实例化)。ConsumerBeanConfig类在被Spring容器接管的过程中,Spring也会对被@Autowired标记的属性进行注入。比如RocketMQProperties作为RocketMQ配置数据的Bean,加了@Autowired标记,就会先被Spring容器接管(实例化)。
ConsumerBeanConfig里面有很多方法,每个方法都加了一个@Bean注解。这表示着Spring容器在执行这些方法时能拿到自定义的Bean实例,然后把实例纳入到Spring容器管理中去。所以,ConsumerBeanConfig类里的每个方法和一个@Bean注解就定义了一个系统里的RocketMQ消费者Bean。
Spring在调用被@Bean标记的platformPromotionSendConsumer()方法实例化一个Bean时,会把监听的Bean注入进去。
@Configuration public class ConsumerBeanConfig { //配置内容对象 @Autowired private RocketMQProperties rocketMQProperties; //平台活动推送消息消费者,Spring在调用这个方法实例化一个Bean时,会把监听的Bean注入进去 @Bean("platformPromotionSendTopic") public DefaultMQPushConsumer platformPromotionSendConsumer(PlatFormPromotionListener platFormPromotionListener) throws MQClientException { //基于RocketMQ API创建Consumer消费者实例对象,设置nameServer地址,定义要监听的Topic,设置对消息进行消费监听的Listener DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PLATFORM_PROMOTION_SEND_CONSUMER_GROUP); consumer.setNamesrvAddr(rocketMQProperties.getNameServer()); consumer.subscribe(PLATFORM_PROMOTION_SEND_TOPIC, "*"); consumer.registerMessageListener(platFormPromotionListener); return consumer; } }
其中PlatFormPromotionListener是自定义的Spring Bean,它实现了RocketMQ消费监听接口。在ConsumerBeanConfig里自定义的一个名为platformPromotionSendTopic的Bean,会针对指定的Topic消息通过PlatFormPromotionListener Bean实例来进行监听和处理,PlatFormPromotionListener和名为platformPromotionSendTopic的Bean都会纳入到Spring容器管理中。消费者platformPromotionSendTopic获取到每一条消息都会交给platFormPromotionListener来处理。
4.基于抽象工厂模式的消息推送实现
消费者platformPromotionSendTopic获取到每一条消息都会交给platFormPromotionListener处理。
根据不同通知类型的消息执行不同的推送,这里运用了抽象工厂设计模式,对于每一种渠道的消息推送,都设计了特定渠道消息推送的组件,并由特定渠道消息推送组件的工厂来进行创建。FactoryProducer作为工厂生产者,会根据传入的一些参数来选择返回对应的工厂。获取到具体的工厂后,使用具体的工厂创建出工厂对应的消息发送组件,然后再使用这个组件去进行特定渠道的消息推送。
其中在处理消息时,首先会把MQ的促销推送消息模型转换为通用平台推送消息模型,然后把通用平台推送消息模型,通过具体消息推送工厂转换为具体的消息推送模型(短信、邮件、APP)。
@Component public class PlatFormPromotionListener implements MessageListenerConcurrently { //消息推送工厂提供者,负责对接第三方Push平台,把消息真正地推送到短信、邮箱、APP里去 @Autowired private FactoryProducer factoryProducer; //并发消费消息 @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) { try { List<PlatformPromotionMessage> list = new ArrayList<>(); for (MessageExt messageExt : msgList) { log.debug("执行平台发送通知消息逻辑,消息内容:{}", messageExt.getBody()); String msg = new String(messageExt.getBody()); PlatformPromotionMessage platformPromotionMessage = JSON.parseObject(msg , PlatformPromotionMessage.class); //TODO 增加幂等逻辑防止重复消费 list.add(platformPromotionMessage); } //推送通知 inform(list); } catch (Exception e){ log.error("consume error,平台优惠券消费失败", e); //本次消费失败,下次重新消费 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } //根据不同通知类型的消息执行不同的推送 private void inform(List<PlatformPromotionMessage> list) { for (PlatformPromotionMessage message : list) { //获取消息服务工厂,根据消息推送渠道,抽象工厂选择了具体的工厂 MessageSendServiceFactory messageSendServiceFactory = factoryProducer.getMessageSendServiceFactory(message.getInformType()); //消息发送服务组件 MessageSendService messageSendService = messageSendServiceFactory.createMessageSendService(); //构造消息:把具体的促销推送消息模型 转换为 通用平台推送消息模型 PlatformMessagePushMessage messagePushMessage = PlatformMessagePushMessage.builder() .informType(message.getInformType()) .mainMessage(message.getMainMessage()) .userAccountId(message.getUserAccountId()) .message(message.getMessage()) .build(); //把通用平台推送消息模型,通过具体消息推送工厂 转换为 具体的消息推送模型(短信、邮件、APP) MessageSendDTO messageSendDTO = messageSendServiceFactory.createMessageSendDTO(messagePushMessage); //具体的消息推送组件,推送你的具体消息模型 messageSendService.send(messageSendDTO); log.info("消息推送完成,messageSendDTO:{}", messageSendDTO); } } ... } @Component public class FactoryProducer { //短信发送服务工厂 @Autowired private SmsSendServiceFactory smsSendServiceFactory; //app消息通知发送服务工厂 @Autowired private AppSendServiceFactory appSendServiceFactory; //email消息通知发送服务工厂 @Autowired private EmailSendServiceFactory emailSendServiceFactory; //根据消息类型,获取消息发送服务工厂 public MessageSendServiceFactory getMessageSendServiceFactory(Integer informType) { if (Objects.isNull(informType)) { throw new BaseBizException("参数异常"); } if (InformTypeEnum.SMS.getCode().equals(informType)) { return smsSendServiceFactory; } else if (InformTypeEnum.APP.getCode().equals(informType)) { return appSendServiceFactory; } else if (InformTypeEnum.EMAIL.getCode().equals(informType)) { return emailSendServiceFactory; } throw new BaseBizException("参数异常"); } }
抽象工厂方法相关类如下:
//消息推送服务工厂 public interface MessageSendServiceFactory { //创建消息推送服务组件 MessageSendService createMessageSendService(); //创建消息推送DTO,不同消息类型,可以构建不同的消息推送DTO MessageSendDTO createMessageSendDTO(PlatformMessagePushMessage platformMessagePushMessage); } @Component public class SmsSendServiceFactory implements MessageSendServiceFactory { @Autowired private SmsSendServiceImpl smsSendServiceImpl; @Override public MessageSendService createMessageSendService() { return smsSendServiceImpl; } @Override public SmsMessageSendDTO createMessageSendDTO(PlatformMessagePushMessage platformMessagePushMessage) { SmsMessageSendDTO messageSendDTO = new SmsMessageSendDTO(); messageSendDTO.setMainMessage(platformMessagePushMessage.getMainMessage()); messageSendDTO.setMessage(platformMessagePushMessage.getMessage()); messageSendDTO.setInformType(platformMessagePushMessage.getInformType()); messageSendDTO.setUserAccountId(platformMessagePushMessage.getUserAccountId()); return messageSendDTO; } } @Component public class AppSendServiceFactory implements MessageSendServiceFactory { @Autowired private AppSendServiceImpl appSendServiceImpl; @Override public MessageSendService createMessageSendService() { return appSendServiceImpl; } @Override public MessageSendDTO createMessageSendDTO(PlatformMessagePushMessage platformMessagePushMessage) { AppMessageSendDTO messageSendDTO = new AppMessageSendDTO(); messageSendDTO.setMainMessage(platformMessagePushMessage.getMainMessage()); messageSendDTO.setMessage(platformMessagePushMessage.getMessage()); messageSendDTO.setInformType(platformMessagePushMessage.getInformType()); messageSendDTO.setUserAccountId(platformMessagePushMessage.getUserAccountId()); return messageSendDTO; } } @Component public class EmailSendServiceFactory implements MessageSendServiceFactory { @Autowired private EmailSendServiceImpl emailSendServiceImpl; @Override public MessageSendService createMessageSendService() { return emailSendServiceImpl; } @Override public EmailMessageSendDTO createMessageSendDTO(PlatformMessagePushMessage platformMessagePushMessage) { EmailMessageSendDTO messageSendDTO = new EmailMessageSendDTO(); messageSendDTO.setMainMessage(platformMessagePushMessage.getMainMessage()); messageSendDTO.setMessage(platformMessagePushMessage.getMessage()); messageSendDTO.setInformType(platformMessagePushMessage.getInformType()); messageSendDTO.setUserAccountId(platformMessagePushMessage.getUserAccountId()); return messageSendDTO; } } //平台消息发送消息,PlatformMessagePushMessage是泛化、通用的消息模型,把促销活动推送消息转换为通用消息模型,保留通用的字段 @Data @Builder public class PlatformMessagePushMessage implements Serializable { //主题 private String mainMessage; //消息内容 private String message; //消息类型 private Integer informType; //用户id private Long userAccountId; @Tolerate public PlatformMessagePushMessage() { } } @Data public class MessageSendDTO { //主题 private String mainMessage; //消息内容 private String message; //消息类型 private Integer informType; //用户id private Long userAccountId; }
5.全量用户促销活动消息推送的流程和缺陷分析
(1)完整流程分析
(2)当前版本的缺陷分析
(1)完整流程分析
(2)当前版本的缺陷分析
缺陷一:运营人员在后台界面创建促销活动时,点击提交按钮后,就直接生成所有用户推送消息发到MQ。如果从数据库查出来的用户数量很大,不仅获取大量的用户数据耗时,而且这些数据也会大量消耗JVM内存。
缺陷二:由于对每个用户都创建出一个JSON字符串消息发送到MQ,用户量一大也会对内存消耗巨大。
缺陷三:由于生成的每条消息都需要发送到MQ,每条消息都要进行一次网络传输,又会导致大量的网络通信开销。这种写法,只能用于前期项目开发时,进行快速测试来跑通流程。
6.全量用户促销活动推送引入MQ进行削峰
引入MQ进行削峰的情形:
一.消息生产者和消息消费者的并发处理量不一样
营销系统可能可以做到每秒生产1万条消息并调用推送系统的接口。但推送系统拿到消息后,要通过SDK交给第三方推送平台处理,此时就不一定能每秒推送1万条消息给用户了。
二.存在明显的高峰和低谷
当运营人员在不创建促销活动时,营销系统根本不会推送消息给推送系统。当运营人员突然创建促销活动时,短时间内就要推送大量消息给推送系统。
7.全量用户发优惠券业务流程实现
(1)创建发放优惠券活动的HTTP接口
(2)创建发放优惠券活动的服务接口
如果运营⼈员维护⼀个发放优惠券活动,需要为全部⽤户发放优惠券。那么这种对全量用户发放优惠券,一般采用全量用户静默的方式进行发放。也就是发放优惠券过程是后台操作,不需要发送通知类消息去通知⽤户,而是直接发送优惠券到MQ。然后消费者消费MQ后,⾃动保存到数据库的⽤户优惠券表中。
(1)创建发放优惠券活动的HTTP接口
该接口接收的参数如下:
该接口返回值如下:
该HTTP接口代码如下:
com.demo.eshop.promotion.controlller.PromotionCouponController#saveOrUpdateCoupon
//发起活动controller @RestController @RequestMapping("/demo/promotion/coupon") public class PromotionCouponController { //优惠活动service @Autowired private CouponService couponService; //新增一个优惠券活动 @PostMapping public JsonResult<SaveOrUpdateCouponDTO> saveOrUpdateCoupon(@RequestBody SaveOrUpdateCouponRequest request){ try { log.info("新增一条优惠券:{}", JSON.toJSONString(request)); SaveOrUpdateCouponDTO dto = couponService.saveOrUpdateCoupon(request); return JsonResult.buildSuccess(dto); } catch (BaseBizException e) { log.error("biz error: request={}", JSON.toJSONString(request), e); return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg()); } catch (Exception e) { log.error("system error: request={}", JSON.toJSONString(request), e); return JsonResult.buildError(e.getMessage()); } } ... } //创建/更新优惠券活动时提交的请求对象 @Data @Builder public class SaveOrUpdateCouponRequest implements Serializable { //优惠券名称 private String couponName; //优惠规则 private String couponRule; //活动开始时间 private Date activityStartTime; //活动结束时间 private Date activityEndTime; //优惠券发放数量,要发放多少张,最多只能有这么多用户领取到这个券 private Integer couponCount; //优惠券领取数量,已经有多少人领取了这个券 private Integer couponReceivedCount; //优惠券发放方式,1:仅自领取,2:仅系统发放 private Integer couponReceiveType; //优惠券类型:满减、折扣、立减 private Integer couponType; //优惠券状态:1:发放中,2:已发完,3:已过期 private Integer couponStatus; //活动创建/修改人 private Integer createUser; @Tolerate public SaveOrUpdateCouponRequest() { } }
(2)创建发放优惠券活动的服务接口
com.demo.eshop.promotion.service.impl.CouponServiceImpl#saveOrUpdateCoupon
//优惠券接口实现 @Service public class CouponServiceImpl implements CouponService { //开启优惠券活动DAO @Autowired private SalesPromotionCouponDAO salesPromotionCouponDAO; ... //保存/修改优惠券活动方法 @Transactional(rollbackFor = Exception.class) @Override public SaveOrUpdateCouponDTO saveOrUpdateCoupon(SaveOrUpdateCouponRequest request) { SalesPromotionCouponDO couponDO = couponConverter.convertCouponDO(request); couponDO.setCouponReceivedCount(0); salesPromotionCouponDAO.saveOrUpdateCoupon(couponDO); //判断优惠券类型,系统发放类型,则针对所有用户,发送优惠券到MQ if (CouponSendTypeEnum.PLATFORM_SEND.getCode().equals(request.getCouponReceiveType())) { sendPlatformCouponMessage(couponDO); } SaveOrUpdateCouponDTO dto = new SaveOrUpdateCouponDTO(); dto.setCouponName(request.getCouponName()); dto.setRule(request.getCouponRule()); dto.setSuccess(true); return dto; } //为所有用户发放优惠券 private void sendPlatformCouponMessage(SalesPromotionCouponDO promotionCouponDO) { PlatformCouponMessage message = PlatformCouponMessage.builder() .couponId(promotionCouponDO.getId()) .activityStartTime(promotionCouponDO.getActivityStartTime()) .activityEndTime(promotionCouponDO.getActivityEndTime()) .couponType(promotionCouponDO.getCouponType()) .build(); //获取所有账号信息 //TODO 优化获取用户数据逻辑 JsonResult<List<MembershipAccountDTO>> jsonResult = accountApi.listAccount(); if (!jsonResult.getSuccess()) { throw new BaseBizException(jsonResult.getErrorCode(), jsonResult.getErrorMessage()); } //循环发送消息 //可以优化成线程池,因为消息并不需要顺序性,全部发出去即可 List<MembershipAccountDTO> accounts = jsonResult.getData(); for (MembershipAccountDTO membershipAccountDTO : accounts) { message.setUserAccountId(membershipAccountDTO.getId()); String msgJson = JsonUtil.object2Json(message); defaultProducer.sendMessage(RocketMqConstant.PLATFORM_COUPON_SEND_TOPIC, msgJson, "平台发放优惠券消息"); } } ... }
8.全量用户发优惠券引入MQ削峰
(1)营销系统配置的RocketMQ消费者
(2)全量用户发券满足引入MQ进行削峰的情形
(3)目前这种方案实现的缺陷
营销系统向RocketMQ发送全量用户发优惠券的消息后,又会自己消费这些消息。
(1)营销系统配置的RocketMQ消费者
@Configuration public class ConsumerBeanConfig { //配置内容对象 @Autowired private RocketMQProperties rocketMQProperties; //平台发放优惠券领取消费者 @Bean("platformCouponReceiveTopic") public DefaultMQPushConsumer receiveCouponConsumer(PlatFormCouponListener platFormCouponListener) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PLATFORM_COUPON_SEND_CONSUMER_GROUP); consumer.setNamesrvAddr(rocketMQProperties.getNameServer()); consumer.subscribe(PLATFORM_COUPON_SEND_TOPIC, "*"); consumer.registerMessageListener(platFormCouponListener); return consumer; } } @Component public class PlatFormCouponListener implements MessageListenerConcurrently { //优惠券服务service @Autowired private CouponItemService couponItemService; //并发消费消息 @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) { try { List<SalesPromotionCouponItemDTO> list = new ArrayList<>(); for (MessageExt messageExt : msgList) { log.debug("执行平台发放优惠券消费消息逻辑,消息内容:{}", messageExt.getBody()); String msg = new String(messageExt.getBody()); PlatformCouponMessage platformCouponMessage = JSON.parseObject(msg , PlatformCouponMessage.class); log.info("开始发放平台优惠券,couponId:{}", platformCouponMessage.getCouponId()); //幂等逻辑防止重复消费 JsonResult<Long> result = couponItemService.selectByAccountIdAndCouponId(platformCouponMessage.getUserAccountId(), platformCouponMessage.getCouponId()); //如果已经存在,直接跳过循环,不再执行优惠券保存操作 if (result.getSuccess()) { continue; } SalesPromotionCouponItemDTO itemDTO = new SalesPromotionCouponItemDTO(); itemDTO.setCouponId(platformCouponMessage.getCouponId()); itemDTO.setCouponType(platformCouponMessage.getCouponType()); itemDTO.setUserAccountId(platformCouponMessage.getUserAccountId()); itemDTO.setIsUsed(0); itemDTO.setActivityStartTime(platformCouponMessage.getActivityStartTime()); itemDTO.setActivityEndTime(platformCouponMessage.getActivityEndTime()); list.add(itemDTO); } //优惠券保存到数据库,把一批用户对这个券的持有记录,批量插入到数据库里去 couponItemService.saveCouponBatch(list); } catch (Exception e){ log.error("consume error,平台优惠券消费失败", e); //本次消费失败,下次重新消费 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } //用户和优惠券领取关系的记录,这里的一条数据记录就代表了一个用户对一个优惠券的持有记录 @Data @Builder public class SalesPromotionCouponItemDTO { //主键ID private Long id; //优惠券ID:一个优惠券自己就一条记录,多个用户对同一个优惠券可以有自己的券持有记录 private Long couponId; //优惠券类型:1:现金券,2:满减券 private Integer couponType; //用户ID private Long userAccountId; //是否已经使用 private Integer isUsed; //使用时间 private Date usedTime; //有效期开始时间 private Date activityStartTime; //有效期开始时间 private Date activityEndTime; //创建人 private Long createUser; //创建时间 private Date createTime; //更新人 private Long updateUser; //更新时间 private Date updateTime; @Tolerate public SalesPromotionCouponItemDTO() { } }
(2)全量用户发券满足引入MQ进行削峰的情形
情形一:生产者和消费者的并发处理量不一样
营销系统可能可以做到每秒生产1万条消息,但将每个用户的优惠券信息写入数据库时,能支持每秒两千就很高了。一般8核16G的标配数据库机器,每秒TPS可以支持写入两三千。
情形二:存在明显的高峰和低谷
当运营人员在不创建全量用户发券活动时,营销系统不会写全量用户的优惠券信息到数据库。当运营人员突然创建全量用户发券活动时,短时间内就要写全量用户的优惠券信息到数据库。
(3)目前这种方案实现的缺陷
缺陷一:查询全量用户比较耗时,而且这些数据也会大量消耗JVM内存。
缺陷二:对每个用户都创建一个JSON字符串消息发送到MQ,用户量很大时会很消耗内存。
缺陷三:生成的每条消息都需要发送到MQ,每条消息都要进行一次网络传输,这会导致大量的网络通信开销。
9.激活不活跃用户发券流程分析
(1)给特定用户发送领取优惠券推送的HTTP接口
(2)给特定用户发送领取优惠券推送的服务接口
如果运营⼈员需要维护这么⼀个发放优惠券的活动:首先根据条件筛选出⽤户数据,然后给筛选出来的⽤户创建推送通知消息。接着调⽤pushService的接⼝,把推送通知消息推送给用户,通知⽤户去领取优惠券。最后用户领取优惠券后,将该用户领取到的优惠券保存到数据库的⽤户优惠券表中。
这其实就是一个既要进行领取优惠券的推送,也要发放优惠券的场景,对指定用户群体进行领取优惠券推送并发放优惠券。
(1)给特定用户发送领取优惠券推送的HTTP接口
该HTTP接口接收的参数如下:
该HTTP接口返回的响应如下:
具体实现如下:
com.demo.eshop.promotion.controlller.PromotionCouponController#saveOrUpdateCoupon
//发起活动controller @RestController @RequestMapping("/demo/promotion/coupon") public class PromotionCouponController { //优惠活动service @Autowired private CouponService couponService; ... @RequestMapping("/send") public JsonResult<SendCouponDTO> sendCouponByConditions(@RequestBody SendCouponRequest request){ try { log.info("发送优惠券给指定用户群体:{}", JSON.toJSONString(request)); SendCouponDTO dto = couponService.sendCouponByConditions(request); return JsonResult.buildSuccess(dto); } catch (BaseBizException e) { log.error("biz error: request={}", JSON.toJSONString(request), e); return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg()); } catch (Exception e) { log.error("system error: request={}", JSON.toJSONString(request), e); return JsonResult.buildError(e.getMessage()); } } ... } //指定用户群体发送领取优惠券推送的请求 @Data @Builder public class SendCouponRequest implements Serializable { //优惠券名称 private String couponName; //优惠券类型 private Integer couponType; //优惠规则 private String couponRule; //活动开始时间 private Date activityStartTime; //活动结束时间 private Date activityEndTime; //优惠券发放数量 private Integer couponCount; //优惠券领取数量 private Integer couponReceivedCount; //优惠券领取地址链接,领券url地址,领券是通过访问哪个url地址来领券的 private String activeUrl; //推送类型 1-定时发送,2-实时发送 private Integer pushType; //1:短信,2:app消息,3:邮箱 private Integer informType; //选人条件 private MembershipFilterDTO membershipFilterDTO; //定时发送消息任务开始时间 private Date pushStartTime; //定时发送任务结束时间 private Date pushEndTime; //每个发送周期内的发送次数,以此为依据发送消息 private Integer sendPeriodCount; //活动创建/修改人 private Integer createUser; @Tolerate public SendCouponRequest() { } } @Data @Builder @NoArgsConstructor @AllArgsConstructor public class MembershipFilterDTO implements Serializable { //账号类型 private Integer accountType; //会员等级 private Integer membershipLevel; //连续活跃天数 private Integer activeCount; //一个月内活跃天数 private Integer totalActiveCount; //订单总金额,单位:分 private Integer totalAmount; }
(2)给特定用户发送领取优惠券推送的服务接口
//优惠券接口实现 @Service public class CouponServiceImpl implements CouponService { //开启优惠券活动DAO @Autowired private SalesPromotionCouponDAO salesPromotionCouponDAO; //推送服务 @DubboReference(version = "1.0.0") private MessagePushApi messagePushApi; ... @Transactional(rollbackFor = Exception.class) @Override public SendCouponDTO sendCouponByConditions(SendCouponRequest sendCouponRequest) { //保存优惠券信息 SalesPromotionCouponDO couponDO = couponConverter.convertCouponDO(sendCouponRequest); couponDO.setCouponReceivedCount(0); couponDO.setCouponStatus(CouponStatusEnum.NORMAL.getCode()); couponDO.setCouponReceiveType(CouponSendTypeEnum.SELF_RECEIVE.getCode()); salesPromotionCouponDAO.saveOrUpdateCoupon(couponDO); //构建messageRequest SaveOrUpdateMessageRequest messageRequest = buildSaveOrUpdateMessageRequest(sendCouponRequest); //创建消息推送 JsonResult<SaveOrUpdateMessageDTO> messageResult = messagePushApi.saveOrUpdateMessage(messageRequest); SendCouponDTO sendCouponDTO = new SendCouponDTO(); sendCouponDTO.setSuccess(messageResult.getData().getSuccess()); sendCouponDTO.setCouponName(sendCouponRequest.getCouponName()); sendCouponDTO.setRule(sendCouponRequest.getCouponRule()); //TODO 发放数量 sendCouponDTO.setSendCount(0); return sendCouponDTO; } ... } //消息推送请求 @Data @Builder public class SaveOrUpdateMessageRequest { //推送类型:1定时推送,2直接推送 private Integer pushType; //消息主题 private String mainMessage; //1:短信,2:app消息,3:邮箱 private Integer informType; //推送消息内容 private String message; //选人条件 private MembershipFilterDTO membershipFilterDTO; //定时发送消息任务开始时间 private Date pushStartTime; //定时发送任务结束时间 private Date pushEndTime; //每个发送周期内的发送次数,以此为依据发送消息 private Integer sendPeriodCount; //推送消息创建/修改人 private Integer createUser; @Tolerate public SaveOrUpdateMessageRequest() { } }
10.推送系统对营销系统发起的推送任务的处理
营销系统会通过messagePushApi的saveOrUpdateMessage()方法调用推送系统对特定用户发送领取优惠券推送。
推送系统的saveOrUpdateMessage()方法收到请求后,首先会将这次推送任务持久化到数据库中。如果发现这次推送任务属于定时调度类型,则先根据开始时间结束时间和推送次数创建定时任务推送记录,然后返回。如果发现这次推送任务属于即时调度类型,则直接发起推送。
对于定时调度类型的推送任务,只要将定时任务推送记录保存到数据库中,后续XXLJob便会进行专门的推送处理。
@DubboService(version = "1.0.0", interfaceClass = MessagePushApi.class, retries = 0) public class MessagePushApiImpl implements MessagePushApi { @Autowired private MessagePushService messagePushService; @Override public JsonResult<SaveOrUpdateMessageDTO> saveOrUpdateMessage(SaveOrUpdateMessageRequest saveOrUpdateMessageRequest) { try { SaveOrUpdateMessageDTO dto = messagePushService.saveOrUpdateMessage(saveOrUpdateMessageRequest); return JsonResult.buildSuccess(dto); } catch (BaseBizException e) { log.error("biz error: request={}", JSON.toJSONString(saveOrUpdateMessageRequest), e); return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg()); } catch (Exception e) { log.error("system error: request={}", JSON.toJSONString(saveOrUpdateMessageRequest), e); return JsonResult.buildError(e.getMessage()); } } ... } @Service public class MessagePushServiceImpl implements MessagePushService { @Autowired private MessagePushDAO messagePushDAO; @Autowired private MessagePushCrontabDAO messagePushCrontabDAO; @Transactional(rollbackFor = Exception.class) @Override public SaveOrUpdateMessageDTO saveOrUpdateMessage(SaveOrUpdateMessageRequest saveOrUpdateMessageRequest) { SaveOrUpdateMessageDTO saveOrUpdateMessageDTO = new SaveOrUpdateMessageDTO(); //构建消息实体 MessagePushDO messagePushDO = buildMessagePushDO(saveOrUpdateMessageRequest); //无论是定时推送还是立即推送,都会把一次推送任务,持久化到数据库表里去 boolean saveFlag = messagePushDAO.save(messagePushDO); //这次推送任务属于定时调度类型,定时发送推送消息 if (Objects.equals(saveOrUpdateMessageRequest.getPushType(), PushTypeEnum.DELAY.getCode())) { //如果是定时调度的任务,那么需要根据开始时间和结束时间得出在这段时间里,定时调度推送多少次,每次推送都要有一个定时调度任务记录 //比如对不活跃用户推送优惠券,在7天内一共要推送7次,那么就要这7天内每天推送一次领取优惠券消息给这些不活跃用户 List<MessagePushCrontabDO> messagePushCrontabDOS = generateMessagePushCrontab(saveOrUpdateMessageRequest); if (!Collections.isEmpty(messagePushCrontabDOS)) { messagePushCrontabDAO.saveBatch(messagePushCrontabDOS); } saveOrUpdateMessageDTO.setSuccess(saveFlag); return saveOrUpdateMessageDTO; } //这次推送任务属于即时调度类型,马上发送推送消息 //首先构建推送消息DTO PushMessageDTO pushMessageDTO = PushMessageDTO.builder() .mainMessage(saveOrUpdateMessageRequest.getMainMessage()) .message(saveOrUpdateMessageRequest.getMessage()) .informType(saveOrUpdateMessageRequest.getInformType()) .build(); //然后立即推送消息 pushMessages(pushMessageDTO, saveOrUpdateMessageRequest.getMembershipFilterDTO()); saveOrUpdateMessageDTO.setSuccess(true); return saveOrUpdateMessageDTO; } //生成消息发送任务实体数据 private List<MessagePushCrontabDO> generateMessagePushCrontab(SaveOrUpdateMessageRequest request) { Integer sendPeriodCount = request.getSendPeriodCount(); List<MessagePushCrontabDO> messagePushCrontabDOS = new ArrayList<>(); if (sendPeriodCount == 1) { //周期内只发送一次,直接使用startTime作为定时任务的执行时间 MessagePushCrontabDO messagePushCrontab = buildMessagePushCrontabDO(request, 1, request.getPushStartTime()); messagePushCrontabDOS.add(messagePushCrontab); } else { LocalDateTime startTime = DateUtil.convertLocalDateTime(request.getPushStartTime()); LocalDateTime endTime = DateUtil.convertLocalDateTime(request.getPushEndTime()); //开始时间和结束时间相隔分钟数 Long minutes = DateUtil.betweenMinutes(startTime, endTime); //相邻定时任务执行周期间隔分钟数 long periodMinutes = minutes / (sendPeriodCount - 1); for (int i = 1; i <= sendPeriodCount; i++) { //任务执行时间计算逻辑:起始时间开始作为第一次,后面每次间隔periodMinutes执行一次定时任务,最后一次使用结束时间 MessagePushCrontabDO messagePushCrontab; if (i == sendPeriodCount) { messagePushCrontab = buildMessagePushCrontabDO(request, i, request.getPushEndTime()); } else { //第一次调度,startTime;第二次调度,startTime + 每次调度耗费的分钟数;第三次调度,startTime + 2 * 每次调度耗费的分钟数;最后一次,endTime LocalDateTime crontabTime = startTime.plusMinutes(periodMinutes * (i - 1)); messagePushCrontab = buildMessagePushCrontabDO(request, i, DateUtil.convertDate(crontabTime)); } messagePushCrontabDOS.add(messagePushCrontab); } } return messagePushCrontabDOS; } ... }
11.立即推送模式的流程、缺陷和削峰
对特定用户推送领取优惠券,会分为定时推送和即时推送,前面的实现中,定时推送其实已经没有什么问题了。但即时推送还是会存在一定问题,如果特定用户人群数量较大,如下的实现还是会有性能问题。
问题一:查询用户耗费时间,大量用户数据占内存
问题二:发送大量消息到MQ耗费网络资源和时间
使用下面的立即推送有一个前置条件,就是特定用户数量比较少,千级别。
@Service public class MessagePushServiceImpl implements MessagePushService { @DubboReference(version = "1.0.0") private AccountApi accountApi; @Autowired private DefaultProducer defaultProducer; ... @Override public void pushMessages(PushMessageDTO pushMessageDTO, MembershipFilterDTO membershipFilterDTO) { JsonResult<List<MembershipAccountDTO>> accountResult = accountApi.listAccountByConditions(membershipFilterDTO); for (MembershipAccountDTO accountDTO : accountResult.getData()) { PlatformMessagePushMessage platformMessagePushMessage = buildPlatformMessagePushMessage(pushMessageDTO, accountDTO); String msgJson = JsonUtil.object2Json(platformMessagePushMessage); defaultProducer.sendMessage(RocketMqConstant.PLATFORM_MESSAGE_SEND_TOPIC, msgJson, "平台消息推送消息"); } } ... }
12.XXLJob驱动定时推送模式的运行原理
(1)XXLJob运行原理
(2)推送系统首先进行XXLJob的配置
(3)在推送系统编写执行任务的Spring Bean
(1)XXLJob运行原理
说明一:XXLJob首先要配置一组Excutors,该组Excutors会有名字。推送系统在启动时就需要启动一个Excutor,并且会注册到XXLJob里指定名字的一个Excutors中。XXLJob收到推送系统Excutor的注册请求后,会根据注册的名字把它们划分到对应的一组Excutors里面。
说明二:然后开发人员便可以在XXLJob配置定时调度任务,绑定某组Excutors以及指定执行任务的SpringBean。当配置好的定时任务的执行时间到达时,就会找到绑定的Excutors,发送执行任务请求给推送系统的Excutor;
说明三:推送系统的Excutor收到XXLJob发送的执行任务请求后,便会找到指定的SpringBean去执行任务,每个推送系统的SpringBean接着会从MySQL数据库里查询出相关的推送任务。
说明四:为了决定每个推送系统的SpringBean该执行从MySQL数据库查询出来的哪些推送任务。XXLJob在发送执行任务请求给推送系统的Excutor时,会带上shardIndex和shardNums。其中shardNums指的是当前执行定时任务的推送系统Excutor一共有多少个节点,每一个节点可以认为是任务执行的分片。shardIndex就是对各个定时任务节点进行标号,比如发给推送系统1的shardIndex=1,发送给推送系统2的shardIndex=2。
说明五:这时推送系统的SpringBean从数据库查出来一批推送任务时:就会根据任务ID的Hash值对shardNums进行取模。通过取模结果和推送系统所属的shardIndex是否一样,来决定这个任务是属于哪个分片,从而实现多个节点对同一批任务的分布式调度。
(2)推送系统首先进行XXLJob的配置
application.yml的XXLJob配置如下:
xxl: job: admin: addresses: http://127.0.0.1:8080/xxl-job-admin executor: appname: message-push port: 9999
读取配置的Bean如下:
@Configuration public class XxlJobConfig { @Value("${xxl.job.admin.addresses}") private String adminAddresses; @Value("${xxl.job.executor.appname}") private String appname; @Value("${xxl.job.executor.port}") private int port; @Bean public XxlJobSpringExecutor xxlJobExecutor() { log.info(">>>>>>>>>>> xxl-job config init."); //在推送系统上创建一个Executor XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor(); xxlJobSpringExecutor.setAdminAddresses(adminAddresses); xxlJobSpringExecutor.setAppname(appname); xxlJobSpringExecutor.setPort(port); return xxlJobSpringExecutor; } }
(3)在推送系统编写执行任务的Spring Bean
@Component public class ScheduleSendMessageJobHandler { ... @Resource private MessagePushCrontabDAO messagePushCrontabDAO; @Autowired private MessagePushService messagePushService; //执行定时任务,筛选并执行"需要定时发送领取优惠券推送消息"的任务 @XxlJob("messagePushHandler") public void messagePushHandler() { log.info("messagePushHandler 开始执行"); int shardIndex = Optional.ofNullable(XxlJobHelper.getShardIndex()).orElse(0); int totalShardNum = Optional.ofNullable(XxlJobHelper.getShardTotal()).orElse(0); //查询出当前时间需要执行的任务,小于等于当前时间且没有执行过的任务 LambdaQueryWrapper<MessagePushCrontabDO> queryWrapper = Wrappers.lambdaQuery(); queryWrapper.le(MessagePushCrontabDO::getCrontabTime, new Date()).eq(MessagePushCrontabDO::getExecuteFlag, YesOrNoEnum.NO.getCode()); List<MessagePushCrontabDO> messagePushCrontabDOS = messagePushCrontabDAO.list(queryWrapper); log.info("获取需要定时发送消息的优惠券等活动消息任务, messagePushCrontabDOS:{}", JsonUtil.object2Json(messagePushCrontabDOS)); for (MessagePushCrontabDO messagePushCrontabDO : messagePushCrontabDOS) { int shardNo = messagePushCrontabDO.getId().hashCode() % totalShardNum; if (shardNo == shardIndex) { sendMessage(messagePushCrontabDO); //消息发送成功,将数据变更为已执行 messagePushCrontabDO.setExecuteFlag(YesOrNoEnum.YES.getCode()); } } //已经执行过的任务,修改数据库 messagePushCrontabDAO.updateBatchById(messagePushCrontabDOS); log.info("hotGoodsPushHandler 执行结束"); } private void sendMessage(MessagePushCrontabDO messagePushCrontabDO) { MembershipFilterDTO membershipFilterDTO = JsonUtil.json2Object(messagePushCrontabDO.getFilterCondition(), MembershipFilterDTO.class); PushMessageDTO pushMessageDTO = PushMessageDTO.builder() .mainMessage(messagePushCrontabDO.getMainMessage()) .message(messagePushCrontabDO.getMessageInfo()) .informType(messagePushCrontabDO.getInformType()) .build(); messagePushService.pushMessages(pushMessageDTO, membershipFilterDTO); } ... }
13.不活跃用户领取优惠券流程
(1)领取优惠券接口介绍
(2)领取优惠券HTTP接口
(3)领取优惠券服务接口
(1)领取优惠券接口介绍
提供给⽤户的优惠券领取链接,在⽤户点击之后,会调⽤此接⼝领取⼀张优惠券,并扣减发放的优惠券总数。在领取前会先判断优惠券的状态,避免错领、重复领取、或者领取到活动已经结束的优惠券。
接口参数如下:
接口响应如下:
(2)领取优惠券HTTP接口
@RestController @RequestMapping("/demo/promotion/coupon") public class PromotionCouponController { //优惠活动service @Autowired private CouponService couponService; ... @RequestMapping("/receive") public JsonResult<ReceiveCouponDTO> receiveCoupon(@RequestBody ReceiveCouponRequest request){ try { log.info("领取优惠券:{}", JSON.toJSONString(request)); ReceiveCouponDTO dto = couponService.receiveCoupon(request); return JsonResult.buildSuccess(dto); } catch (BaseBizException e) { log.error("biz error: request={}", JSON.toJSONString(request), e); return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg()); } catch (Exception e) { log.error("system error: request={}", JSON.toJSONString(request), e); return JsonResult.buildError(e.getMessage()); } } } //领取优惠券请求 @Data @Builder public class ReceiveCouponRequest implements Serializable { //优惠券ID private Long couponId; //优惠券类型:1:现金券,2:满减券 private Integer couponType; //用户账号 private Long userAccountId; //优惠规则 private String couponRule; //优惠券生效开始时间 private Date activityStartTime; //优惠券生效结束时间 private Date activityEndTime; @Tolerate public ReceiveCouponRequest() { } }
(3)领取优惠券服务接口
@Service public class CouponServiceImpl implements CouponService { //开启优惠券活动DAO @Autowired private SalesPromotionCouponDAO salesPromotionCouponDAO; //开启优惠券活动DAO @Autowired private SalesPromotionCouponItemDAO salesPromotionCouponItemDAO; ... @Transactional(rollbackFor = Exception.class) @Override public ReceiveCouponDTO receiveCoupon(ReceiveCouponRequest receiveCouponRequest) { //获取优惠券信息 SalesPromotionCouponDO couponDO = salesPromotionCouponDAO.getById(receiveCouponRequest.getCouponId()); //检查优惠券状态 ReceiveCouponDTO dto = checkCouponStatus(couponDO); if (!dto.getSuccess()) { return dto; } //查询用户是否已经领取过该优惠券了,如果领取过,直接返回 LambdaQueryWrapper<SalesPromotionCouponItemDO> queryWrapper = Wrappers.lambdaQuery(); queryWrapper.eq(SalesPromotionCouponItemDO::getCouponId, receiveCouponRequest.getCouponId()); queryWrapper.eq(SalesPromotionCouponItemDO::getUserAccountId, receiveCouponRequest.getUserAccountId()); int count = salesPromotionCouponItemDAO.count(queryWrapper); //用户已经领取过该优惠券 if (count > 0) { dto.setSuccess(false); dto.setMessage("已经领取过该优惠券,不要重复领取哦"); return dto; } //修改优惠券领取数量 couponDO.setCouponReceivedCount(couponDO.getCouponReceivedCount() + 1); //如果领取数量与发放数量相同,将优惠券状态设置为发放完 if (Objects.equals(couponDO.getCouponCount(), couponDO.getCouponReceivedCount())) { couponDO.setCouponStatus(CouponStatusEnum.USED.getCode()); } salesPromotionCouponDAO.updateById(couponDO); //领取一张优惠券 SalesPromotionCouponItemDO couponItemDO = buildSalesPromotionCouponItemDO(couponDO, receiveCouponRequest); //添加一条领取记录 salesPromotionCouponItemDAO.save(couponItemDO); return dto; } //检查优惠券的状态,并返回 领取优惠券结果 对象 private ReceiveCouponDTO checkCouponStatus(SalesPromotionCouponDO couponDO) { if (Objects.isNull(couponDO)) { throw new BaseBizException("优惠券不存在"); } ReceiveCouponDTO dto = new ReceiveCouponDTO(); Integer couponStatus = couponDO.getCouponStatus(); //领取完或已过期 if (!Objects.equals(couponStatus, CouponStatusEnum.NORMAL.getCode())) { dto.setSuccess(false); CouponStatusEnum statusEnum = CouponStatusEnum.getByCode(couponStatus); if (Objects.isNull(statusEnum)) { throw new BaseBizException("优惠券领取失败"); } dto.setMessage("优惠券"+ statusEnum.getMsg() + ",下次早点来哦"); return dto; } //发行数量小于或者等于领取数量,优惠券已经领取完 if (couponDO.getCouponCount() <= couponDO.getCouponReceivedCount()) { //修改coupon couponDO.setCouponStatus(CouponStatusEnum.USED.getCode()); salesPromotionCouponDAO.updateById(couponDO); dto.setSuccess(false); dto.setMessage("优惠券已发放完,下次早点来哦"); return dto; } //优惠券过期 if (couponDO.getActivityEndTime().before(new Date())) { //修改coupon couponDO.setCouponStatus(CouponStatusEnum.EXPIRED.getCode()); salesPromotionCouponDAO.updateById(couponDO); dto.setSuccess(false); dto.setMessage("优惠券已过期,下次早点来哦"); return dto; } dto.setSuccess(true); return dto; } }
14.热门商品根据用户画像定时推送以及MQ削峰
(1)推送系统的XXLJob实现
(2)推送系统的消费者实现
首先推送系统通过XXLJob获取到热门商品后会遍历每一个热门商品,然后根据热门商品对应的用户画像标签去获取对应的用户数据,接着遍历这些用户将具体推送消息发送到MQ进行削峰填谷,最后推送系统从MQ中消费这些消息调用第三方Push平台的SDK进行推送。
(1)推送系统的XXLJob实现
@Component public class ScheduleSendMessageJobHandler { @Autowired private ScheduleSendMessageJob scheduleSendMessageJob; @Resource private HotGoodsCrontabDAO hotGoodsCrontabDAO; ... //执行定时任务,筛选热门商品和用户发送给MQ @XxlJob("hotGoodsPushHandler") public void hotGoodsPushHandler() { log.info("hotGoodsPushHandler 开始执行"); //获取shard分片数据进行分布式调度 int shardIndex = Optional.ofNullable(XxlJobHelper.getShardIndex()).orElse(0); int totalShardNum = Optional.ofNullable(XxlJobHelper.getShardTotal()).orElse(0); //获取热门商品和用户画像,业务先简化为一对一关系 List<HotGoodsCrontabDO> crontabDOs = hotGoodsCrontabDAO.queryHotGoodsCrontabByCrontabDate(new Date()); log.info("获取热门商品和用户画像数据, crontabDOs:{}", JsonUtil.object2Json(crontabDOs)); //找出每个热门商品对应画像所匹配的用户 for (HotGoodsCrontabDO crontabDO : crontabDOs) { if (StringUtils.isEmpty(crontabDO.getPortrayal())) { continue; } //通过判断shard分片数据来实现分布式调度 int shardNo = crontabDO.getId().hashCode() % totalShardNum; if (shardNo != shardIndex) { continue; } //热门商品对应的画像实体 MembershipPointDO membershipPointDO = JsonUtil.json2Object(crontabDO.getPortrayal(), MembershipPointDO.class); if (Objects.isNull(membershipPointDO)) { continue; } //获取匹配画像的用户积分实体 //从会员系统中,根据用户画像标签获取对应的用户数据 MembershipFilterConditionDTO conditionDTO = buildCondition(membershipPointDO); JsonResult<List<MembershipPointDTO>> pointDetailResult = membershipPointApi.listMembershipPointByConditions(conditionDTO); log.info("根据用户画像匹配用户实体, pointDetailResult:{}", JsonUtil.object2Json(pointDetailResult)); if (!pointDetailResult.getSuccess()) { throw new BaseBizException(pointDetailResult.getErrorCode(), pointDetailResult.getErrorMessage()); } List<MembershipPointDTO> membershipPointDOs = pointDetailResult.getData(); //发送消息到MQ,进行削峰填谷 sendMessage(crontabDO, membershipPointDOs); } } //将热门商品和对应的用户消息发送给MQ private void sendMessage(HotGoodsCrontabDO crontabDO, List<MembershipPointDTO> membershipPointDTOs) { Set<Long> accountIds = new HashSet<>(); for (MembershipPointDTO membershipPointDTO : membershipPointDTOs) { accountIds.add(membershipPointDTO.getAccountId()); } HotGoodsVO hotGoodsVO = buildHotGoodsVO(crontabDO); scheduleSendMessageJob.matchingAndSendMessage(hotGoodsVO ,accountIds); log.info("传入参数,准备发送消息到MQ, hotGoodsVO: {}, accountIds: {}", JsonUtil.object2Json(hotGoodsVO), JsonUtil.object2Json(accountIds)); } ... } //推荐系统/大数据系统,会去计算出不同人群的爆款商品 //如果某个用户是一个运动达人,那么其用户画像标签就是"运动",然后根据购买量最高/浏览量最高/好评率最高,来计算出这个用户画像群体的爆品 @Data @TableName("hot_goods_crontab") public class HotGoodsCrontabDO { //主键 private Long id; //热门商品id private Long goodsId; //热门商品名 private String goodsName; //热门商品描述 private String goodsDesc; //关键字 private String keywords; //定时任务日期 private Date crontabDate; //用户画像,json格式,简化业务处理,和热门商品处于一对一关系 private String portrayal; private Integer createUser; private Date createTime; private Integer updateUser; private Date updateTime; } @Service public class ScheduleSendMessageJobImpl implements ScheduleSendMessageJob { @Resource private DefaultProducer defaultProducer; @Override public void matchingAndSendMessage(HotGoodsVO hotGood, Set<Long> accountIds) { for (Long accountId : accountIds) { //热门商品消息 String hotMessage = buildMessage(hotGood, accountId); log.info("构造热门商品MQ消息, hotMessage: {}", hotMessage); //发送给mq defaultProducer.sendMessage(RocketMqConstant.PLATFORM_HOT_PRODUCT_SEND_TOPIC, hotMessage, "平台发送热门商品消息"); log.info("发送热门商品消息到MQ, topic: {}, hotMessage: {}", RocketMqConstant.PLATFORM_HOT_PRODUCT_SEND_TOPIC, hotMessage); } } ... }
(2)推送系统的消费者实现
//RocketMQ的消费者工程实现,通常是在工程代码里加入一个@Configuration注解 //ConsumerBeanConfig类一旦加了这个Spring提供的注解,那么系统启动时,这个类就会被Spring容器接管(实例化), //ConsumerBeanConfig类在被Spring容器接管的过程中,Spring也会对被@Autowired标记的属性进行注入 //比如RocketMQProperties作为RocketMQ配置数据的Bean,加了@Autowired标记,就会先被Spring容器接管(实例化) //ConsumerBeanConfig里面有很多方法,每个方法都加了一个@Bean注解 //这表示着Spring容器在执行这些方法时能拿到自定义的Bean实例,然后把实例纳入到Spring容器管理中去 //所以,ConsumerBeanConfig类里的每个方法和一个@Bean注解就定义了一个系统里的RocketMQ消费者Bean @Configuration public class ConsumerBeanConfig { //配置内容对象 @Autowired private RocketMQProperties rocketMQProperties; ... //热门商品推送 @Bean("platformHotProductSendTopic") public DefaultMQPushConsumer platformHotProductConsumer(PlatFormHotProductListener platFormHotProductListener) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PLATFORM_HOT_PRODUCT_SEND_CONSUMER_GROUP); consumer.setNamesrvAddr(rocketMQProperties.getNameServer()); consumer.subscribe(PLATFORM_HOT_PRODUCT_SEND_TOPIC, "*"); consumer.registerMessageListener(platFormHotProductListener); return consumer; } ... } @Component public class PlatFormHotProductListener implements MessageListenerConcurrently { //并发消费消息 @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) { try { for (MessageExt messageExt : msgList) { log.debug("执行平台发送通知消息逻辑,消息内容:{}", messageExt.getBody()); String msg = new String(messageExt.getBody()); HashMap hotProductMessage = JSON.parseObject(msg , HashMap.class); //调用第三方平台SDK发送推送通知 informByPush(hotProductMessage); } } catch (Exception e) { log.error("consume error,平台优惠券消费失败", e); //本次消费失败,下次重新消费 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
15.营销的四大业务场景MQ削峰方案总结
(1)简单总结
(2)详细总结
(1)简单总结
一.大用户量推送和大用户量发券的业务挑战
营销系统的两个业务场景"大用户量推送和大用户量发券"中,都会面临瞬时大量推送和发券的处理高峰。例如大用户量推送场景需要瞬时高并发调用第三方平台的SDK,大用户量发券需要瞬时高并发写入数据库。这时就会引入RocketMQ进行削峰填谷,这就是"大用户量推送和大用户量发券"的业务挑战。
二.多种推送方式引入抽象工厂设计模式处理
通过抽象工厂获取组件工厂,然后再由组件工厂获取具体组件。
三.热门商品定时推送的业务挑战
XXLJob实现的分布式定时调度。
(2)详细总结
一.优惠活动场景(全量用户推送促销活动)
运营开启的优惠活动,可能包括满减活动、积分活动、双⼗⼀活动、会员⽇活动等。运营活动开启时,会针对⽤户群体发送⼤量的通知消息。运营开启⼀次运营活动后,运营活动的通知会通过多种⽅式推送到⽤户端,包括APP通知、短信、Email。
解决方案:
利用MQ对瞬时高并发调用通知接口进行削峰填谷 + 抽象工厂设计模式完成多种类型消息的发送。
二.优惠券场景(全量用户发放静默优惠券)
优惠券场景,主要有两种。
第⼀种:系统需要给⽤户发放优惠券,其中平台发放类型的优惠券,正常都是后台保存进数据库中。当⽤户打开系统,或者打开APP后,就会显示这张优惠券,并且在下单购物时可以选择优惠券抵扣。
第⼆种:系统发放优惠券后,设置领取数量,只有点击了领取链接的⽤户才能领取到这个优惠券。
针对第⼀种情况,在发放优惠券的活动开始后,系统需要⼤量的后台数据库操作,保存优惠券到数据库。这个过程如果活动次数较多,会对数据库产⽣巨⼤的压⼒。为了让这个功能快速完成,所以需要借助MQ来削峰,即把每个⽤户的优惠券都发到MQ。由消费者慢慢消费后,再保存到数据库,以避免⼤量活动开启时,对数据库造成过⼤的压⼒。
针对第⼆种情况,由于⽤户需要点击链接才能完成优惠券的领取,所以对数据库的压⼒不⼤。但是活动的内容需要让⽤户知道,因此需要发送⼤量的通知消息通知⽤户。此时也有⼤量的发通知操作需要调⽤通知服务,或者调⽤第三⽅平台推送这个通知消息。优惠券通过系统后台的⽅式,及⽤户点击活动⻚⾯的⽅式保存到数据库中。
解决方案:
利用MQ对瞬时高并发写库进行削峰填谷。
三.消息通知场景及其解决方案:(特定用户发送领取优惠券推送)
单独的通知场景也是公司种⽐较常⻅的场景,⽐如会员⽣⽇、⼴告推送、热⻔商品推送,以及借助⼤数据系统计算出⼀些结果发送个性化通知和推荐消息。此时也会⼤量调⽤通知服务或者第三⽅通知平台来推送相应的通知消息给⽤户。
四.定时任务自动推送场景:(热门商品定时推送)
在通知场景中,还有⼀种周期性质的消息通知。⽐如每⽇的热⻔商品推送、双⼗⼀活动通知、在双⼗⼀开始前每隔3⽇推送⼀份⼉双⼗⼀活动通知等。
解决方案:即时推送使用MQ对瞬时高并发调用第三方接口进行削峰填谷,定时推送使用XXLJob实现分布式调度。