[ThingsBoard] 3. 源码解读Actor

一、前言

本文基于 ThingsBoard 4.0.2 编写,对应提交Version set to 4.0.2(01c5ba7d37006e1f8a3492afbb3c67d017ca8dd3)
由于个人技术能力和写作经验有限,欢迎读者指出文中的错误与不足。

二、Actor模型

参考 Actor模型是解决高并发的终极解决方案 - 知乎
本人写的一般,可以看参考文章

1. Actor

Actor 模型则将一切视为 Actor。Actor 是并发执行的基本单元,与其他 Actor 之间通过消息传递进行通信,当接收到消息时,一个 Actor 可以并行地执行三件事:向其他 Actor 发送消息、创建新的 Actor,以及决定如何处理下一条消息。

2. 消息传递

Actor之间互相并行,因此消息传递都是异步的,Mailbox负责给Actor进行消息传递。
两个Actor之间消息传递时,Actor A发送到Actor B的Mailbox。等到Actor B处理消息时。
它就会从自己的Mailbox中获取消息,进行处理。

三、ThingsBoard中的Actor

ThingsBoard以下简称TB,代码进行一些精简,方法使用lambda方法引用表示。

1. TbActorMailBox

在TB的Actor模型中,Mailbox对应的实现类是TbActorMailbox,其继承关系为TbActorMailboxTbActorCtxTbActorRef。值得注意的是,TbActorCtxTbActorRef在整个系统中都只有TbActorMailbox这一个实现类,因此在之后的代码阅读中可以将TbActorRefTbActorCtx都认为是TbActorMailbox这点在之后的代码分析很有用

Actor通过其::tell::tellWithHighPriority传递消息其对应的Actor。每个TbActor都只对应一个TbActorMailbox

private final TbActor actor;  public void tell(TbActorMsg actorMsg) {     enqueue(actorMsg, NORMAL_PRIORITY); }  public void tellWithHighPriority(TbActorMsg actorMsg) {     enqueue(actorMsg, HIGH_PRIORITY); }  private void tryProcessQueue(boolean newMsg) {     dispatcher.getExecutor().execute(this::processMailbox); }   private void enqueue(TbActorMsg msg, boolean highPriority) {     if (highPriority) {         highPriorityMsgs.add(msg);     } else {         normalPriorityMsgs.add(msg);     }     tryProcessQueue(); } 

其中::tryProcessQueue会异步处理消息队列,实际的处理逻辑在::processMailbox中。通过actor::process委托给实际的Actor执行。

private void processMailbox() {     for (int i = 0; i < settings.getActorThroughput(); i++) {         TbActorMsg msg = highPriorityMsgs.poll();         if (msg == null) {             msg = normalPriorityMsgs.poll();         }         if (msg != null) {             actor.process(msg);         }     } } 

2.Actor的初始化:

初始化的逻辑只选择部分重点分析,其他部分可以让ai能进行逐行分析。

Actor可以分为两大类

  • 管理其他Actor,负责创建其管理的Actor,传递消息。
  • 逻辑处理,接收消息,将消息委托给processor字段类处理。

2.1 管理创建Actor的Actor

AppActor

首先TB也是一个Spring应用,因此Actor的创建也是从被Spring管理的Bean开始第一个的创建。其受到Spring管理的就是DefaultActorService类在::initActorSystem中创建了AppActor
看向TbActorSystem::createRootActor方法,它需要一个TbActorCreator的实例,而TbActorCreator则是负责创建每个TbActor的对象。会调用::createActor

@PostConstruct public void initActorSystem() {     appActor = system.createRootActor(APP_DISPATCHER_NAME, new AppActor.ActorCreator(actorContext)); } 

::createActor中创建了一个TbActorMailbox
TbActorSystem内部会逐步调用到::createActor(String, TbActorCreator, TbActorId),它加锁进行创建。

 public TbActorRef createRootActor(String dispatcherId, TbActorCreator creator) {     return createActor(dispatcherId, creator, null); }  public TbActorRef createChildActor(String dispatcherId, TbActorCreator creator, TbActorId parent) {     return createActor(dispatcherId, creator, parent); }  private TbActorRef createActor(String dispatcherId, TbActorCreator creator, TbActorId parent) {     Dispatcher dispatcher = dispatchers.get(dispatcherId)     TbActorId actorId = creator.createActorId();     TbActorMailbox actorMailbox = actors.get(actorId);     Lock actorCreationLock = actorCreationLocks.computeIfAbsent(actorId, id -> new ReentrantLock());     actorCreationLock.lock();     try {     if (actorMailbox == null) {         log.debug("Creating actor with id [{}]!", actorId);         TbActor actor = creator.createActor();         TbActorRef parentRef = null;         if (parent != null) {             parentRef = getActor(parent);         }         TbActorMailbox mailbox = new TbActorMailbox(this, settings, actorId, parentRef, actor, dispatcher);         actors.put(actorId, mailbox);         mailbox.initActor(); // 注意此处         actorMailbox = mailbox;     } else {         log.debug("Actor with id [{}] is already registered!", actorId);     }      } finally {         actorCreationLock.unlock();         actorCreationLocks.remove(actorId);     }     return actorMailbox; }  private TbActorRef createActor(String dispatcherId, TbActorCreator creator, TbActorId parent) {     TbActor actor = creator.createActor();     TbActorRef parentRef = null;     if (parent != null) {         parentRef = getActor(parent);     }     TbActorMailbox mailbox = new TbActorMailbox(this, settings, actorId, parentRef, actor, dispatcher);     actors.put(actorId, mailbox);     mailbox.initActor(); } 

而在TbActorMailbox::initActor方法中,会提交::tryInit的异步初始化,其中actor.init(this)。就进行了Actor的初始化。

public void initActor() {     dispatcher.getExecutor().execute(() -> tryInit(1)); }  private void tryInit(int attempt) {     actor.init(this); } 

TenantActor

AppActor::doProcess的第一个if,调用::initTenantActors,它初始化所有TenantActor
AppActor自己管理着所有的TenantActors。其::getOrCreateTenantActor则完成了获取和创建的工作。
实现了一个懒加载的功能。

protected boolean doProcess(TbActorMsg msg) {     if (!ruleChainsInitialized) {         if (MsgType.APP_INIT_MSG.equals(msg.getMsgType())) {             initTenantActors();             ruleChainsInitialized = true;         }     } }  private void initTenantActors() {     PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(tenantService::findTenants, ENTITY_PACK_LIMIT);     for (Tenant tenant : tenantIterator) {         log.debug("[{}] Creating tenant actor", tenant.getId());         getOrCreateTenantActor(tenant.getId())     } } 

追溯::getOrCreateTenantActor,他会使用类型为TbActorCtx的ctx字段的getOrCreateChildActor,根据之前的分析我们可以直接到TbActorMailbox进行分析。

private Optional<TbActorRef> getOrCreateTenantActor(TenantId tenantId) {     return Optional.ofNullable(ctx.getOrCreateChildActor(new TbEntityActorId(tenantId),             () -> DefaultActorService.TENANT_DISPATCHER_NAME,             () -> new TenantActor.ActorCreator(systemContext, tenantId),             () -> true)); } 

显示从类型为TbActorSystemsystem字段获取actor。实现上DefaultTbActorSystem内有一个actorsConcurrentMap的字段用于存储系统中所有的Actor。之后就和AppActor创建处一致的执行流程。

public TbActorRef getOrCreateChildActor(TbActorId actorId, Supplier<String> dispatcher, Supplier<TbActorCreator> creator, Supplier<Boolean> createCondition) {     TbActorRef actorRef = system.getActor(actorId);     if (actorRef == null && createCondition.get()) {         return system.createChildActor(dispatcher.get(), creator.get(), selfId);     } else {         return actorRef;     } } 

RuleChainActor

同理RuleChainActor也是由TenantActor进行创建的。不过实际的逻辑在TenantActor的父类RuleChainManagerActor中。
一般都是在::init方法中调用::initRuleChains方法.initRuleChains方法就会从数据库中获取所有RuleChain并初始化。

2.2 负责逻辑处理的Actor

RuleChainActor

到了它,明显有了不一样,一下子类一下子变得简洁了。类本身只有两个方法
分析一下它的父类。RuleChainActorRuleEngineComponentActorComponentActor
其中的ComponentActor::init方法就调用了抽象方法ComponentActor::createProcessor

@Override public void init(TbActorCtx ctx) throws TbActorException {     this.processor = createProcessor(ctx);     initProcessor(ctx); } 

再向上看ComponentActor的父类ContextAwareActor就发现,::process变成对::process做了个封装。只是添加了打印日志的功能。::doProcess其中,只是将对应的消息委托给了对应的processor

public boolean process(TbActorMsg msg) {     if (log.isDebugEnabled()) {         log.debug("Processing msg: {}", msg);     }     if (!doProcess(msg)) {         log.warn("Unprocessed message: {}!", msg);     }     return false; } 

RuleNodeActor

这就是规则引擎最核心的部分,可以明显的推测到每个TbNode都对应着一个RuleNodeActor
不过同理,它的实际逻辑也委托给了processor。但是明显它的创建更加复杂。
它的创建实际上是由RuleChainActor中的processor负责的,即RuleChainActorMessageProcessor类。

 public void init(TbActorCtx ctx)  {     this.processor = createProcessor(ctx);     initProcessor(ctx); }  protected void initProcessor(TbActorCtx ctx) {     processor.start(ctx); } 

其中的::start方法,而::start的方法则是在前文TbActor的父类ComponentActor::init中再调用::initProcessor中,
调用的RuleChainActorMessageProcessor::start方法。

public void start(TbActorCtx context) {     List<RuleNode> ruleNodeList = service.getRuleChainNodes(tenantId, entityId);     for (RuleNode ruleNode : ruleNodeList) {         TbActorRef ruleNodeActor = createRuleNodeActor(context, ruleNode);         nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode));     }     initRoutes(ruleChain, ruleNodeList); } 

其中从数据库中读取到所有nodeActors的信息并将其全部创建。
再调用::initRoutes初始化ruleChain中对应的路由。

private void initRoutes(RuleChain ruleChain, List<RuleNode> ruleNodeList) {     for (RuleNode ruleNode : ruleNodeList) {         List<EntityRelation> relations = service.getRuleNodeRelations(TenantId.SYS_TENANT_ID, ruleNode.getId());         log.trace("[{}][{}][{}] Processing rule node relations [{}]", tenantId, entityId, ruleNode.getId(), relations.size());         if (relations.isEmpty()) {             nodeRoutes.put(ruleNode.getId(), Collections.emptyList());         } else {             for (EntityRelation relation : relations) {                 nodeRoutes.computeIfAbsent(ruleNode.getId(), k -> new ArrayList<>())                         .add(new RuleNodeRelation(ruleNode.getId(), relation.getTo(), relation.getType()));             }         }     } } 

能看出nodeRoutes接近一个邻接表的结构,构建了整个路由。至此将Actor创建的三种模式介绍清楚。

  1. 由外部创建,如AppActor
  2. 由其父Actor创建,如TenantActor
  3. 由对应的一个processor字段创建,如RuleChainActor

四、结尾

最开始分析的时候,我还不了解Actor模型,纯粹看着代码进行分析,惊叹于设计的巧妙。但是在QQ水群突然有人提及自己在改为了无锁的Actor模型,我搜索一番,原来这是一个很成熟的通用的设计了啊。自己写小项目还是喜欢ExecutorFuture一股脑的用。不知道下一篇要多久才更新了。

发表评论

评论已关闭。

相关文章