事件驱动模式
举个例子🌰
大部分软件或者APP都有会有会员系统,当我们注册为会员时,商家一般会把我们拉入会员群、给我们发优惠券、推送欢迎语什么的。

值得注意的是:
- 注册成功后才会产生后面的这些动作;
- 注册成功后的这些动作没有先后执行顺序之分;
- 注册成功后的这些动作的执行结果不能互相影响;
传统写法
public Boolean doRegisterVip(){ //1、注册会员 registerVip(); //2、入会员群 joinMembershipGroup(); //3、发优惠券 issueCoupons(); //4、推送消息 sendWelcomeMsg(); }
这样的写法将所有的动作都耦合在doRegisterVip方法中,首先执行效率低下,其次耦合度太高,最后不好扩展。那么如何优化这种逻辑呢?
事件驱动模式原理介绍🍓
Spring的事件驱动模型由三部分组成:
事件:用户可自定义事件类和相关属性及行为来表述事件特征,Spring4.2之后定义事件不需要再显式继承ApplicationEvent类,直接定义一个bean即可,Spring会自动通过PayloadApplicationEvent来包装事件。
事件发布者:在Spring中可通过ApplicationEventPublisher把事件发布出去,这样事件内容就可以被监听者消费处理。
事件监听者:ApplicationListener,监听发布事件,处理事件发生之后的后续操作。
原理图如下:

代码实现
1. 定义基本元素
事件发布者:EventEngine.java、EventEngineImpl.java
package com.example.event.config; /** * 事件引擎 */ public interface EventEngine { /** * 发送事件 * * @param event 事件 */ void publishEvent(BizEvent event); }
package com.example.event.config; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Executor; import org.springframework.util.CollectionUtils; /** * 事件引擎实现类 */ public class EventEngineImpl implements EventEngine { /** * 异步执行器。也系统需要自行定义线程池 */ private Executor bizListenerExecutor; /** * 是否异步,默认为false */ private boolean async; /** * 订阅端 KEY是TOPIC,VALUES是监听器集合 */ private Map<String, List<BizEventListener>> bizSubscribers = new HashMap<>(16); @Override public void publishEvent(BizEvent event) { List<BizEventListener> listeners = bizSubscribers.get(event.getTopic()); if (CollectionUtils.isEmpty(listeners)) { return; } for (BizEventListener bizEventListener : listeners) { if (bizEventListener.decide(event)) { //异步执行的话,放入线程池 if (async) { bizListenerExecutor.execute(new EventSubscriber(bizEventListener, event)); } else { bizEventListener.onEvent(event); } } } } /** * Setter method for property <tt>bizListenerExecutor</tt>. * * @param bizListenerExecutor value to be assigned to property bizListenerExecutor */ public void setBizListenerExecutor(Executor bizListenerExecutor) { this.bizListenerExecutor = bizListenerExecutor; } /** * Setter method for property <tt>bizSubscribers</tt>. * * @param bizSubscribers value to be assigned to property bizSubscribers */ public void setBizSubscribers(Map<String, List<BizEventListener>> bizSubscribers) { this.bizSubscribers = bizSubscribers; } /** * Setter method for property <tt>async</tt>. * * @param async value to be assigned to property async */ public void setAsync(boolean async) { this.async = async; } }
事件:BizEvent.java
package com.example.event.config; import java.util.EventObject; /** * 业务事件 */ public class BizEvent extends EventObject { /** * Topic */ private final String topic; /** * 业务id */ private final String bizId; /** * 数据 */ private final Object data; /** * @param topic 事件topic,用于区分事件类型 * @param bizId 业务ID,标识这一次的调用 * @param data 事件传输对象 */ public BizEvent(String topic, String bizId, Object data) { super(data); this.topic = topic; this.bizId = bizId; this.data = data; } /** * Getter method for property <tt>topic</tt>. * * @return property value of topic */ public String getTopic() { return topic; } /** * Getter method for property <tt>id</tt>. * * @return property value of id */ public String getBizId() { return bizId; } /** * Getter method for property <tt>data</tt>. * * @return property value of data */ public Object getData() { return data; } }
事件监听者:EventSubscriber.java
package com.example.event.config; /** * 事件监听者。注意:此时已经没有线程上下文,如果需要请修改构造函数,显示复制上下文信息 */ public class EventSubscriber implements Runnable { /** * 业务监听器 **/ private BizEventListener bizEventListener; /** * 业务事件 */ private BizEvent bizEvent; /** * @param bizEventListener 事件监听者 * @param bizEvent 事件 */ public EventSubscriber(BizEventListener bizEventListener, BizEvent bizEvent) { super(); this.bizEventListener = bizEventListener; this.bizEvent = bizEvent; } @Override public void run() { bizEventListener.onEvent(bizEvent); } }
2. 其他组件
业务事件监听器:BizEventListener.java
package com.example.event.config; import java.util.EventListener; /** * 业务事件监听器 * */ public interface BizEventListener extends EventListener { /** * 是否执行事件 * * @param event 事件 * @return */ public boolean decide(BizEvent event); /** * 执行事件 * * @param event 事件 */ public void onEvent(BizEvent event); }
事件引擎topic:EventEngineTopic.java
package com.example.event.config; /** * 事件引擎topic,用于区分事件类型 */ public class EventEngineTopic { /** * 入会员群 */ public static final String JOIN_MEMBERSHIP_GROUP = "joinMembershipGroup"; /** * 发优惠券 */ public static final String ISSUE_COUPONS = "issueCoupons"; /** * 推送消息 */ public static final String SEND_WELCOME_MSG = "sendWelcomeMsg"; }
3. 监听器实现
优惠券处理器:CouponsHandlerListener.java
package com.example.event.listener; import com.example.event.config.BizEvent; import com.example.event.config.BizEventListener; import org.springframework.stereotype.Component; /** * 优惠券处理器 */ @Component public class CouponsHandlerListener implements BizEventListener { @Override public boolean decide(BizEvent event) { return true; } @Override public void onEvent(BizEvent event) { System.out.println("优惠券处理器:十折优惠券已发放"); } }
会员群处理器:MembershipHandlerListener.java
package com.example.event.listener; import com.example.event.config.BizEvent; import com.example.event.config.BizEventListener; import org.springframework.stereotype.Component; /** * 会员群处理器 */ @Component public class MembershipHandlerListener implements BizEventListener { @Override public boolean decide(BizEvent event) { return true; } @Override public void onEvent(BizEvent event) { System.out.println("会员群处理器:您已成功加入会员群"); } }
消息推送处理器:MsgHandlerListener.java
package com.example.event.listener; import com.example.event.config.BizEvent; import com.example.event.config.BizEventListener; import org.springframework.stereotype.Component; /** * 消息推送处理器 */ @Component public class MsgHandlerListener implements BizEventListener { @Override public boolean decide(BizEvent event) { return true; } @Override public void onEvent(BizEvent event) { System.out.println("消息推送处理器:欢迎成为会员!!!"); } }
事件驱动引擎配置:EventEngineConfig.java
package com.example.event.listener; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import com.example.event.config.BizEventListener; import com.example.event.config.EventEngine; import com.example.event.config.EventEngineImpl; import com.example.event.config.EventEngineTopic; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.CustomizableThreadFactory; /** * 事件驱动引擎配置 */ @Configuration public class EventEngineConfig { /** * 线程池异步处理事件 */ private static final Executor EXECUTOR = new ThreadPoolExecutor(20, 50, 10, TimeUnit.MINUTES, new LinkedBlockingQueue(500), new CustomizableThreadFactory("EVENT_ENGINE_POOL")); @Bean("eventEngineJob") public EventEngine initJobEngine(CouponsHandlerListener couponsHandlerListener, MembershipHandlerListener membershipHandlerListener, MsgHandlerListener msgHandlerListener) { Map<String, List<BizEventListener>> bizEvenListenerMap = new HashMap<>(); //注册优惠券事件 bizEvenListenerMap.put(EventEngineTopic.ISSUE_COUPONS, Arrays.asList(couponsHandlerListener)); //注册会员群事件 bizEvenListenerMap.put(EventEngineTopic.JOIN_MEMBERSHIP_GROUP, Arrays.asList(membershipHandlerListener)); //注册消息推送事件 bizEvenListenerMap.put(EventEngineTopic.SEND_WELCOME_MSG, Arrays.asList(msgHandlerListener)); EventEngineImpl eventEngine = new EventEngineImpl(); eventEngine.setBizSubscribers(bizEvenListenerMap); eventEngine.setAsync(true); eventEngine.setBizListenerExecutor(EXECUTOR); return eventEngine; } }
4. 测试类
TestController.java
package com.example.event.controller; import java.util.HashMap; import java.util.Map; import java.util.UUID; import javax.annotation.Resource; import com.example.event.config.BizEvent; import com.example.event.config.EventEngine; import com.example.event.config.EventEngineTopic; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; /** * 测试 */ @RestController @RequestMapping("/test") public class TestController { @Resource(name = "eventEngineJob") private EventEngine eventEngine; @GetMapping("/doRegisterVip") public String doRegisterVip(@RequestParam(required = true) String userName, @RequestParam(required = true) Integer age) { Map<String, Object> paramMap = new HashMap<>(16); paramMap.put("userName", userName); paramMap.put("age", age); //1、注册会员,这里不实现了 System.out.println("注册会员成功"); //2、入会员群 eventEngine.publishEvent( new BizEvent(EventEngineTopic.JOIN_MEMBERSHIP_GROUP, UUID.randomUUID().toString(), paramMap)); //3、发优惠券 eventEngine.publishEvent( new BizEvent(EventEngineTopic.ISSUE_COUPONS, UUID.randomUUID().toString(), paramMap)); //4、推送消息 eventEngine.publishEvent( new BizEvent(EventEngineTopic.SEND_WELCOME_MSG, UUID.randomUUID().toString(), paramMap)); return "注册会员成功"; } }
项目代码结构

调用接口
http://localhost:8080/test/doRegisterVip?userName=zhangsan&age=28
