大纲
1.NodeSelectorSlot构建资源调用树
2.LogSlot和StatisticSlot采集资源的数据
3.Sentinel监听器模式的规则对象与规则管理
4.AuthoritySlot控制黑白名单权限
5.SystemSlot根据系统保护规则进行流控
1.NodeSelectorSlot构建资源调用树
(1)Entry的处理链的执行入口
(2)NodeSelectorSlot的源码
(3)Context对象中存储的资源调用树总结
(1)Entry的处理链的执行入口
每当一个线程处理包含某些资源的接口请求时,会调用SphU的entry()方法去创建并管控该接口中涉及的Entry资源访问对象。
在创建Entry资源访问对象的期间,会创建一个ResourceWrapper对象、一个Context对象、以及根据ResourceWrapper对象创建或获取一个ProcessorSlotChain对象,也就是把ProcessorSlotChain对象、Context对象与ResourceWrapper对象绑定到Entry对象中。
public class SphU { private static final Object[] OBJECTS0 = new Object[0]; ... public static Entry entry(String name) throws BlockException { //调用CtSph.entry()方法创建一个Entry资源访问对象,默认的请求类型为OUT return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0); } } public class Env { //创建一个CtSph对象 public static final Sph sph = new CtSph(); static { InitExecutor.doInit(); } } public class CtSph implements Sph { //Same resource will share the same ProcessorSlotChain}, no matter in which Context. //Same resource is that ResourceWrapper#equals(Object). private static volatile Map<ResourceWrapper, ProcessorSlotChain> chainMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(); ... @Override public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException { //StringResourceWrapper是ResourceWrapper的子类,且StringResourceWrapper的构造方法默认了资源类型为COMMON StringResourceWrapper resource = new StringResourceWrapper(name, type); return entry(resource, count, args); } //Do all {@link Rule}s checking about the resource. public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException { //调用CtSph.entryWithPriority()方法,执行如下处理: //初始化Context -> 将Context与线程绑定 -> 初始化Entry -> 将Context和ResourceWrapper放入Entry中 return entryWithPriority(resourceWrapper, count, false, args); } private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException { //从当前线程中获取Context Context context = ContextUtil.getContext(); if (context instanceof NullContext) { return new CtEntry(resourceWrapper, null, context); } //如果没获取到Context if (context == null) { //Using default context. //创建一个名为sentinel_default_context的Context,并且与当前线程绑定 context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME); } //Global switch is close, no rule checking will do. if (!Constants.ON) { return new CtEntry(resourceWrapper, null, context); } //调用CtSph.lookProcessChain()方法初始化处理链(处理器插槽链条) ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper); if (chain == null) { return new CtEntry(resourceWrapper, null, context); } //创建出一个Entry对象,将处理链(处理器插槽链条)、Context与Entry绑定 //其中会将Entry的三个基础属性(封装在resourceWrapper里)以及当前Entry所属的Context作为参数传入CtEntry的构造方法 Entry e = new CtEntry(resourceWrapper, chain, context); try { //处理链(处理器插槽链条)入口,负责采集数据,规则验证 //调用DefaultProcessorSlotChain.entry()方法执行处理链每个节点的逻辑(数据采集+规则验证) chain.entry(context, resourceWrapper, null, count, prioritized, args); } catch (BlockException e1) { //规则验证失败,比如:被流控、被熔断降级、触发黑白名单等 e.exit(count, args); throw e1; } catch (Throwable e1) { RecordLog.info("Sentinel unexpected exception", e1); } return e; } ... private final static class InternalContextUtil extends ContextUtil { static Context internalEnter(String name) { //调用ContextUtil.trueEnter()方法创建一个Context对象 return trueEnter(name, ""); } static Context internalEnter(String name, String origin) { return trueEnter(name, origin); } } //Get ProcessorSlotChain of the resource. //new ProcessorSlotChain will be created if the resource doesn't relate one. //Same resource will share the same ProcessorSlotChain globally, no matter in which Context. //Same resource is that ResourceWrapper#equals(Object). ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) { ProcessorSlotChain chain = chainMap.get(resourceWrapper); if (chain == null) { synchronized (LOCK) { chain = chainMap.get(resourceWrapper); if (chain == null) { //Entry size limit. if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) { return null; } //调用SlotChainProvider.newSlotChain()方法初始化处理链(处理器插槽链条) chain = SlotChainProvider.newSlotChain(); //写时复制 Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(chainMap.size() + 1); newMap.putAll(chainMap); newMap.put(resourceWrapper, chain); chainMap = newMap; } } } return chain; } } public class StringResourceWrapper extends ResourceWrapper { public StringResourceWrapper(String name, EntryType e) { //调用父类构造方法,且默认资源类型为COMMON super(name, e, ResourceTypeConstants.COMMON); } ... } //Utility class to get or create Context in current thread. //Each SphU.entry() should be in a Context. //If we don't invoke ContextUtil.enter() explicitly, DEFAULT context will be used. public class ContextUtil { //Store the context in ThreadLocal for easy access. //存放线程与Context的绑定关系 //每个请求对应一个线程,每个线程绑定一个Context,所以每个请求对应一个Context private static ThreadLocal<Context> contextHolder = new ThreadLocal<>(); //Holds all EntranceNode. Each EntranceNode is associated with a distinct context name. //以Context的name作为key,EntranceNode作为value缓存到HashMap中 private static volatile Map<String, DefaultNode> contextNameNodeMap = new HashMap<>(); private static final ReentrantLock LOCK = new ReentrantLock(); private static final Context NULL_CONTEXT = new NullContext(); ... //ContextUtil.trueEnter()方法会尝试从ThreadLocal获取一个Context对象 //如果获取不到,再创建一个Context对象然后放入ThreadLocal中 //入参name其实一般就是默认的Constants.CONTEXT_DEFAULT_NAME=sentinel_default_context //由于当前线程可能会涉及创建多个Entry资源访问对象,所以trueEnter()方法需要注意并发问题 protected static Context trueEnter(String name, String origin) { //从ThreadLocal中获取当前线程绑定的Context对象 Context context = contextHolder.get(); //如果当前线程还没绑定Context对象,则初始化Context对象并且与当前线程进行绑定 if (context == null) { //首先要获取或创建Context对象所需要的EntranceNode对象,EntranceNode会负责统计名字相同的Context下的指标数据 //将全局缓存contextNameNodeMap赋值给一个临时变量localCacheNameMap //因为后续会对contextNameNodeMap的内容进行修改,所以这里需要将原来的contextNameNodeMap复制一份出来 //从而避免后续对contextNameNodeMap的内容进行修改时,可能造成对接下来读取contextNameNodeMap内容的影响 Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap; //从缓存副本localCacheNameMap中获取EntranceNode //这个name其实一般就是默认的sentinel_default_context DefaultNode node = localCacheNameMap.get(name); //如果获取的EntranceNode为空 if (node == null) { //为了防止缓存无限制地增长,导致内存占用过高,需要设置一个上限,只要超过上限,就直接返回NULL_CONTEXT if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) { setNullContext(); return NULL_CONTEXT; } else { //如果Context还没创建,缓存里也没有当前Context名称对应的EntranceNode,并且缓存数量尚未达到2000 //那么就创建一个EntranceNode,创建EntranceNode时需要加锁,否则会有线程不安全问题 //毕竟需要修改HashMap类型的contextNameNodeMap //通过加锁 + 缓存 + 写时复制更新缓存,避免并发情况下创建出多个EntranceNode对象 //一个线程对应一个Context对象,多个线程对应多个Context对象 //这些Context对象会使用ThreadLocal进行隔离,但它们的name默认都是sentinel_default_context //根据下面的代码逻辑: //多个线程(对应多个Context的name默认都是sentinel_default_context)会共用同一个EntranceNode //于是可知,多个Context对象会共用一个EntranceNode对象 LOCK.lock(); try { //从缓存中获取EntranceNode node = contextNameNodeMap.get(name); //对node进行Double Check //如果没获取到EntranceNode if (node == null) { if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) { setNullContext(); return NULL_CONTEXT; } else { //创建EntranceNode,缓存到contextNameNodeMap当中 node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null); //Add entrance node. //将新创建的EntranceNode添加到ROOT中,ROOT就是每个Node的根结点 Constants.ROOT.addChild(node); //写时复制,将新创建的EntranceNode添加到缓存中 Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1); newMap.putAll(contextNameNodeMap); newMap.put(name, node); contextNameNodeMap = newMap; } } } finally { //解锁 LOCK.unlock(); } } } //此处可能会有多个线程同时执行到此处,并发创建多个Context对象 //但这是允许的,因为一个请求对应一个Context,一个请求对应一个线程,所以一个线程本来就需要创建一个Context对象 //初始化Context,将刚获取到或刚创建的EntranceNode放到Context的entranceNode属性中 context = new Context(node, name); context.setOrigin(origin); //将创建出来的Context对象放入ThreadLocal变量contextHolder中,实现Context对象与当前线程的绑定 contextHolder.set(context); } return context; } ... } public final class SlotChainProvider { private static volatile SlotChainBuilder slotChainBuilder = null; //The load and pick process is not thread-safe, //but it's okay since the method should be only invoked via CtSph.lookProcessChain() under lock. public static ProcessorSlotChain newSlotChain() { //如果存在,则直接返回 if (slotChainBuilder != null) { return slotChainBuilder.build(); } //Resolve the slot chain builder SPI. //通过SPI机制初始化SlotChainBuilder slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault(); if (slotChainBuilder == null) { //Should not go through here. RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default"); slotChainBuilder = new DefaultSlotChainBuilder(); } else { RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}", slotChainBuilder.getClass().getCanonicalName()); } return slotChainBuilder.build(); } private SlotChainProvider() { } } @Spi(isDefault = true) public class DefaultSlotChainBuilder implements SlotChainBuilder { @Override public ProcessorSlotChain build() { //创建一个DefaultProcessorSlotChain对象实例 ProcessorSlotChain chain = new DefaultProcessorSlotChain(); //通过SPI机制加载责任链的节点ProcessorSlot实现类 //然后按照@Spi注解的order属性进行排序并进行实例化 //最后将ProcessorSlot实例放到sortedSlotList中 List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted(); //遍历已排好序的ProcessorSlot集合 for (ProcessorSlot slot : sortedSlotList) { //安全检查,防止业务系统也写了一个SPI文件,但没按规定继承AbstractLinkedProcessorSlot if (!(slot instanceof AbstractLinkedProcessorSlot)) { RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain"); continue; } //调用DefaultProcessorSlotChain.addLast()方法构建单向链表 //将责任链的节点ProcessorSlot实例放入DefaultProcessorSlotChain中 chain.addLast((AbstractLinkedProcessorSlot<?>) slot); } //返回单向链表 return chain; } }
在DefaultSlotChainBuilder的build()方法中,从其初始化ProcessorSlotChain的逻辑可知,Entry的处理链的执行入口就是DefaultProcessorSlotChain的entry()方法。
当一个线程调用SphU的entry()方法创建完与接口相关的Entry对象后,就会调用DefaultProcessorSlotChain的entry()方法执行处理链节点的逻辑。因为NodeSelectorSlot是Entry的处理链ProcessorSlotChain的第一个节点,所以接着会调用NodeSelectorSlot的entry()方法。由于处理链中紧接着NodeSelectorSlot的下一个节点是ClusterBuilderSlot,所以执行完NodeSelectorSlot的entry()方法后,会接着执行ClusterBuilderSlot的entry()方法。
public class DefaultProcessorSlotChain extends ProcessorSlotChain { ... @Override public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args) throws Throwable { //默认情况下会调用处理链的第一个节点NodeSelectorSlot的transformEntry()方法 first.transformEntry(context, resourceWrapper, t, count, prioritized, args); } ... } public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> { ... void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args) throws Throwable { T t = (T)o; entry(context, resourceWrapper, t, count, prioritized, args); } ... }
(2)NodeSelectorSlot的源码
NodeSelectorSlot和ClusterBuilderSlot会一起构建Context的资源调用树,资源调用树的作用其实就是用来统计资源的调用数据。
在一个Context对象实例的资源调用树上主要会有如下三类节点:DefaultNode、EntranceNode、ClusterNode,分别对应于:单机里的资源维度、接口维度、集群中的资源维度。
其中DefaultNode会统计名字相同的Context下的某个资源的调用数据,EntranceNode会统计名字相同的Context下的全部资源的调用数据,ClusterNode会统计某个资源在全部Context下的调用数据。
在执行NodeSelectorSlot的entry()方法时,首先会从缓存(NodeSelectorSlot.map属性)中获取一个DefaultNode对象。如果获取不到,再通过DCL机制创建一个DefaultNode对象并更新缓存。其中缓存的key是Context的name,value是DefaultNode对象。由于默认情况下多个线程对应的Context的name都相同,所以多个线程访问同一资源时使用的DefaultNode对象也一样。
在执行ClusterBuilderSlot的entry()方法时,首先会判断缓存是否为null,若是则创建一个ClusterNode对象,然后再将ClusterNode对象设置到DefaultNode对象的clusterNode属性中。
由DefaultNode、EntranceNode、ClusterNode构成的资源调用树:因为DefaultNode是和资源ResourceWrapper以及Context挂钩的,所以DefaultNode应该添加到EntranceNode中。因为ClusterNode和资源挂钩,而不和Context挂钩,所以ClusterNode应该添加到DefaultNode中。
具体的资源调用树构建源码如下:
//This class will try to build the calling traces via: //adding a new DefaultNode if needed as the last child in the context. //the context's last node is the current node or the parent node of the context. //setting itself to the context current node. //It works as follow: // ContextUtil.enter("entrance1", "appA"); // Entry nodeA = SphU.entry("nodeA"); // if (nodeA != null) { // nodeA.exit(); // } // ContextUtil.exit(); //Above code will generate the following invocation structure in memory: // machine-root // / // / // EntranceNode1 // / // / // DefaultNode(nodeA)- - - - - -> ClusterNode(nodeA); //Here the EntranceNode represents "entrance1" given by ContextUtil.enter("entrance1", "appA"). //Both DefaultNode(nodeA) and ClusterNode(nodeA) holds statistics of "nodeA", which is given by SphU.entry("nodeA"). //The ClusterNode is uniquely identified by the ResourceId; //The DefaultNode is identified by both the resource id and {@link Context}. //In other words, one resource id will generate multiple DefaultNode for each distinct context, //but only one ClusterNode. //the following code shows one resource id in two different context: // ContextUtil.enter("entrance1", "appA"); // Entry nodeA = SphU.entry("nodeA"); // if (nodeA != null) { // nodeA.exit(); // } // ContextUtil.exit(); // ContextUtil.enter("entrance2", "appA"); // nodeA = SphU.entry("nodeA"); // if (nodeA != null) { // nodeA.exit(); // } // ContextUtil.exit(); //Above code will generate the following invocation structure in memory: // machine-root // / // / // EntranceNode1 EntranceNode2 // / // / // DefaultNode(nodeA) DefaultNode(nodeA) // | | // +- - - - - - - - - - +- - - - - - -> ClusterNode(nodeA); //As we can see, two DefaultNode are created for "nodeA" in two context, //but only one ClusterNode is created. //We can also check this structure by calling: http://localhost:8719/tree?type=root @Spi(isSingleton = false, order = Constants.ORDER_NODE_SELECTOR_SLOT) public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> { //DefaultNodes of the same resource in different context. //缓存map以Context的name为key,DefaultNode为value //由于默认情况下多个线程对应的Context的name都相同,所以多个线程访问资源时使用的DefaultNode也一样 private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10); @Override public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable { //It's interesting that we use context name rather resource name as the map key. //Remember that same resource will share the same ProcessorSlotChain globally, no matter in which context. //Same resource is that ResourceWrapper#equals(Object). //So if code goes into entry(Context, ResourceWrapper, DefaultNode, int, Object...), //the resource name must be same but context name may not. //If we use SphU.entry(String resource)} to enter same resource in different context, //using context name as map key can distinguish the same resource. //In this case, multiple DefaultNodes will be created of the same resource name, //for every distinct context (different context name) each. //Consider another question. One resource may have multiple DefaultNode, //so what is the fastest way to get total statistics of the same resource? //The answer is all DefaultNodes with same resource name share one ClusterNode. //See ClusterBuilderSlot for detail. //先从缓存中获取 DefaultNode node = map.get(context.getName()); if (node == null) { //使用DCL机制,即Double Check + Lock机制 synchronized (this) { node = map.get(context.getName()); if (node == null) { //每个线程访问Entry时,都会调用CtSph.entry()方法创建一个ResourceWrapper对象 //下面根据ResourceWrapper创建一个DefaultNode对象 node = new DefaultNode(resourceWrapper, null); //写时复制更新缓存map HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size()); cacheMap.putAll(map); cacheMap.put(context.getName(), node); map = cacheMap; //Build invocation tree //首先会调用context.getLastNode()方法,获取到的是Context.entranceNode属性即一个EntranceNode对象 //EntranceNode对象是在执行ContextUtil.trueEnter()方法创建Context对象实例时添加到Context对象中的 //然后会将刚创建的DefaultNode对象添加到EntranceNode对象的childList列表中 ((DefaultNode) context.getLastNode()).addChild(node); } } } //设置Context的curNode属性为当前获取到或新创建的DefaultNode对象 context.setCurNode(node); //触发执行下一个ProcessorSlot,即ClusterBuilderSlot fireEntry(context, resourceWrapper, node, count, prioritized, args); } @Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { fireExit(context, resourceWrapper, count, args); } } //This slot maintains resource running statistics (response time, qps, thread count, exception), //and a list of callers as well which is marked by ContextUtil.enter(String origin). //One resource has only one cluster node, while one resource can have multiple default nodes. @Spi(isSingleton = false, order = Constants.ORDER_CLUSTER_BUILDER_SLOT) public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> { //Remember that same resource will share the same ProcessorSlotChain globally, no matter in which context. //Same resource is that ResourceWrapper#equals(Object). //So if code goes into entry(Context, ResourceWrapper, DefaultNode, int, boolean, Object...), //the resource name must be same but context name may not. //To get total statistics of the same resource in different context, //same resource shares the same ClusterNode} globally. //All ClusterNodes are cached in this map. //The longer the application runs, the more stable this mapping will become. //so we don't concurrent map but a lock. //as this lock only happens at the very beginning while concurrent map will hold the lock all the time. private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>(); private static final Object lock = new Object(); private volatile ClusterNode clusterNode = null; @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { if (clusterNode == null) { //使用DCL机制,即Double Check + Lock机制 synchronized (lock) { if (clusterNode == null) { //Create the cluster node. //创建ClusterNode对象 clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType()); HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16)); newMap.putAll(clusterNodeMap); newMap.put(node.getId(), clusterNode); clusterNodeMap = newMap; } } } //设置DefaultNode的clusterNode属性为获取到的ClusterNode对象 node.setClusterNode(clusterNode); //if context origin is set, we should get or create a new {@link Node} of the specific origin. if (!"".equals(context.getOrigin())) { Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin()); context.getCurEntry().setOriginNode(originNode); } //执行下一个ProcessorSlot fireEntry(context, resourceWrapper, node, count, prioritized, args); } @Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { fireExit(context, resourceWrapper, count, args); } ... }
(3)Context对象中存储的资源调用树总结
其实Context对象的属性entranceNode就代表了一棵资源调用树。
首先,在调用ContextUtil的trueEnter()方法创建Context对象实例时,便会创建一个EntranceNode对象并赋值给Context的entranceNode属性,以及调用Constants.ROOT的addChild()方法,将这个EntranceNode对象放入Constants.ROOT的childList列表中。
然后,执行NodeSelectorSlot的entry()方法时,便会创建一个DefaultNode对象。该DefaultNode对象会被添加到Context.entranceNode的childList列表中,也就是前面创建的EntranceNode对象的childList列表中。
接着,执行ClusterBuilderSlot的entry()方法时,便会创建一个ClusterNode对象,该ClusterNode对象会赋值给前面DefaultNode对象中的clusterNode属性。
至此,便构建完Context下的资源调用树了。Constants.ROOT的childList里会存放多个EntranceNode对象,每个EntranceNode对象的childList里会存放多个DefaultNode对象,而每个DefaultNode对象会指向一个ClusterNode对象。
//This class holds metadata of current invocation: //the EntranceNode: the root of the current invocation tree. //the current Entry: the current invocation point. //the current Node: the statistics related to the Entry. //the origin: The origin is useful when we want to control different invoker/consumer separately. //Usually the origin could be the Service Consumer's app name or origin IP. //Each SphU.entry() or SphO.entry() should be in a Context, //if we don't invoke ContextUtil.enter() explicitly, DEFAULT context will be used. //A invocation tree will be created if we invoke SphU.entry() multi times in the same context. //Same resource in different context will count separately, see NodeSelectorSlot. public class Context { //Context name. private final String name; //The entrance node of current invocation tree. private DefaultNode entranceNode; //Current processing entry. private Entry curEntry; //The origin of this context (usually indicate different invokers, e.g. service consumer name or origin IP). private String origin = ""; ... public Context(DefaultNode entranceNode, String name) { this(name, entranceNode, false); } public Context(String name, DefaultNode entranceNode, boolean async) { this.name = name; this.entranceNode = entranceNode; this.async = async; } //Get the parent Node of the current. public Node getLastNode() { if (curEntry != null && curEntry.getLastNode() != null) { return curEntry.getLastNode(); } else { return entranceNode; } } ... } public class ContextUtil { //以Context的name作为key,EntranceNode作为value缓存所有的EntranceNode到HashMap中 private static volatile Map<String, DefaultNode> contextNameNodeMap = new HashMap<>(); ... protected static Context trueEnter(String name, String origin) { ... //从缓存中获取EntranceNode DefaultNode node = contextNameNodeMap.get(name); ... //创建EntranceNode,缓存到contextNameNodeMap当中 node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null); //将新创建的EntranceNode添加到ROOT中,ROOT就是每个Node的根结点 Constants.ROOT.addChild(node); ... //初始化Context,将刚获取到或刚创建的EntranceNode放到Context的entranceNode属性中 context = new Context(node, name); ... } ... } public final class Constants { ... //Global ROOT statistic node that represents the universal parent node. public final static DefaultNode ROOT = new EntranceNode( new StringResourceWrapper(ROOT_ID, EntryType.IN), new ClusterNode(ROOT_ID, ResourceTypeConstants.COMMON) ); ... } //A Node used to hold statistics for specific resource name in the specific context. //Each distinct resource in each distinct Context will corresponding to a DefaultNode. //This class may have a list of sub DefaultNodes. //Child nodes will be created when calling SphU.entry() or SphO.entry() multiple times in the same Context. public class DefaultNode extends StatisticNode { //The resource associated with the node. private ResourceWrapper id; //The list of all child nodes. private volatile Set<Node> childList = new HashSet<>(); //Associated cluster node. private ClusterNode clusterNode; ... //Add child node to current node. public void addChild(Node node) { if (node == null) { RecordLog.warn("Trying to add null child to node <{}>, ignored", id.getName()); return; } if (!childList.contains(node)) { synchronized (this) { if (!childList.contains(node)) { Set<Node> newSet = new HashSet<>(childList.size() + 1); newSet.addAll(childList); newSet.add(node); childList = newSet; } } RecordLog.info("Add child <{}> to node <{}>", ((DefaultNode)node).id.getName(), id.getName()); } } //Reset the child node list. public void removeChildList() { this.childList = new HashSet<>(); } ... } @Spi(isSingleton = false, order = Constants.ORDER_NODE_SELECTOR_SLOT) public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> { //DefaultNodes of the same resource in different context. //缓存map以Context的name为key,DefaultNode为value //由于默认情况下多个线程对应的Context的name都相同,所以多个线程访问资源时使用的DefaultNode也一样 private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10); ... @Override public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable { ... //先从缓存中获取 DefaultNode node = map.get(context.getName()); ... //下面根据ResourceWrapper创建一个DefaultNode对象 node = new DefaultNode(resourceWrapper, null); ... //Build invocation tree //首先会调用context.getLastNode()方法,获取到的是Context.entranceNode属性即一个EntranceNode对象 //EntranceNode对象是在执行ContextUtil.trueEnter()方法创建Context对象实例时添加到Context对象中的 //然后会将刚创建的DefaultNode对象添加到EntranceNode对象的childList列表中 ((DefaultNode) context.getLastNode()).addChild(node); ... //执行下一个ProcessorSlot fireEntry(context, resourceWrapper, node, count, prioritized, args); } ... } @Spi(isSingleton = false, order = Constants.ORDER_CLUSTER_BUILDER_SLOT) public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> { ... private volatile ClusterNode clusterNode = null; @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { ... //创建ClusterNode对象 clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType()); ... //设置DefaultNode的clusterNode属性为获取到的ClusterNode对象 node.setClusterNode(clusterNode); ... //执行下一个ProcessorSlot fireEntry(context, resourceWrapper, node, count, prioritized, args); } ... } //资源调用树的示例如下所示: // machine-root // / // / // EntranceNode1 EntranceNode2 // / // / // DefaultNode(nodeA) DefaultNode(nodeA) // | | // +- - - - - - - - - - +- - - - - - -> ClusterNode(nodeA); //其中,machine-root中的childList里会有很多个EntranceNode对象 //EntranceNode对象中的childList里又会有很多个DefaultNode对象 //每个DefaultNode对象下都会指向一个ClusterNode对象
一些对应关系的梳理总结:
一个线程对应一个ResourceWrapper对象实例,一个线程对应一个Context对象实例。如果ResourceWrapper对象相同,则会共用一个ProcessorSlotChain实例。如果ResourceWrapper对象相同,则也会共用一个ClusterNode实例。如果Context对象的名字相同,则会共用一个EntranceNode对象实例。如果Context对象的名字相同,则也会共用一个DefaultNode对象实例。
//每个请求对应一个线程,每个线程绑定一个Context,所以每个请求对应一个Context private static ThreadLocal<Context> contextHolder = new ThreadLocal<>(); //以Context的name作为key,EntranceNode作为value缓存所有的EntranceNode到HashMap中 private static volatile Map<String, EntranceNode> contextNameNodeMap = new HashMap<>(); //Same resource will share the same ProcessorSlotChain}, no matter in which Context. //Same resource is that ResourceWrapper#equals(Object). private static volatile Map<ResourceWrapper, ProcessorSlotChain> chainMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(); //DefaultNodes of the same resource in different context. //以Context的name作为key,DefaultNode作为value //由于默认情况下多个线程对应的Context的name都相同,所以多个线程访问资源时使用的DefaultNode也一样 private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10); //To get total statistics of the same resource in different context, //same resource shares the same ClusterNode globally. //All ClusterNodes are cached in this map. private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>();
2.LogSlot和StatisticSlot采集资源的数据
(1)LogSlot的源码
(2)StatisticSlot的源码
(3)记录资源在不同维度下的调用数据
(1)LogSlot的源码
LogSlot用于记录异常请求日志,以便于故障排查。也就是当出现BlockException异常时,调用EagleEyeLogUtil的log()方法将日志写到sentinel-block.log文件中。
//A ProcessorSlot that is response for logging block exceptions to provide concrete logs for troubleshooting. @Spi(order = Constants.ORDER_LOG_SLOT) public class LogSlot extends AbstractLinkedProcessorSlot<DefaultNode> { @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode obj, int count, boolean prioritized, Object... args) throws Throwable { try { //调用下一个ProcessorSlot fireEntry(context, resourceWrapper, obj, count, prioritized, args); } catch (BlockException e) { //被流控或者熔断降级后打印log日志 EagleEyeLogUtil.log(resourceWrapper.getName(), e.getClass().getSimpleName(), e.getRuleLimitApp(), context.getOrigin(), e.getRule().getId(), count); throw e; } catch (Throwable e) { RecordLog.warn("Unexpected entry exception", e); } } @Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { try { //调用下一个ProcessorSlot fireExit(context, resourceWrapper, count, args); } catch (Throwable e) { RecordLog.warn("Unexpected entry exit exception", e); } } } public class EagleEyeLogUtil { public static final String FILE_NAME = "sentinel-block.log"; private static StatLogger statLogger; static { String path = LogBase.getLogBaseDir() + FILE_NAME; statLogger = EagleEye.statLoggerBuilder("sentinel-block-log") .intervalSeconds(1) .entryDelimiter('|') .keyDelimiter(',') .valueDelimiter(',') .maxEntryCount(6000) .configLogFilePath(path) .maxFileSizeMB(300) .maxBackupIndex(3) .buildSingleton(); } public static void log(String resource, String exceptionName, String ruleLimitApp, String origin, Long ruleId, int count) { String ruleIdString = StringUtil.EMPTY; if (ruleId != null) { ruleIdString = String.valueOf(ruleId); } statLogger.stat(resource, exceptionName, ruleLimitApp, origin, ruleIdString).count(count); } }
(2)StatisticSlot的源码
StatisticSlot用于统计资源的调用数据,如请求成功数、请求失败数、响应时间等。
注意:开始对请求进行规则验证时,需要调用SphU的entry()方法。完成对请求的规则验证后,也需要调用Entry的exit()方法。
//A processor slot that dedicates to real time statistics. //When entering this slot, we need to separately count the following information: //ClusterNode: total statistics of a cluster node of the resource ID. //Origin node: statistics of a cluster node from different callers/origins. //DefaultNode: statistics for specific resource name in the specific context. //Finally, the sum statistics of all entrances. @Spi(order = Constants.ORDER_STATISTIC_SLOT) public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> { @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { try { //Do some checking. //执行下一个ProcessorSlot,先进行规则验证等 fireEntry(context, resourceWrapper, node, count, prioritized, args); //Request passed, add thread count and pass count. //如果通过了后面ProcessorSlot的验证 //则将处理当前资源resourceWrapper的线程数 + 1 以及 将对当前资源resourceWrapper的成功请求数 + 1 node.increaseThreadNum(); node.addPassRequest(count); if (context.getCurEntry().getOriginNode() != null) { //Add count for origin node. context.getCurEntry().getOriginNode().increaseThreadNum(); context.getCurEntry().getOriginNode().addPassRequest(count); } if (resourceWrapper.getEntryType() == EntryType.IN) { //Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseThreadNum(); Constants.ENTRY_NODE.addPassRequest(count); } //Handle pass event with registered entry callback handlers. for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onPass(context, resourceWrapper, node, count, args); } } catch (PriorityWaitException ex) { node.increaseThreadNum(); if (context.getCurEntry().getOriginNode() != null) { //Add count for origin node. context.getCurEntry().getOriginNode().increaseThreadNum(); } if (resourceWrapper.getEntryType() == EntryType.IN) { //Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseThreadNum(); } //Handle pass event with registered entry callback handlers. for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onPass(context, resourceWrapper, node, count, args); } } catch (BlockException e) {//捕获BlockException //Blocked, set block exception to current entry. context.getCurEntry().setBlockError(e); //Add block count. //如果规则验证失败,则将BlockQps+1 node.increaseBlockQps(count); if (context.getCurEntry().getOriginNode() != null) { context.getCurEntry().getOriginNode().increaseBlockQps(count); } if (resourceWrapper.getEntryType() == EntryType.IN) { //Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseBlockQps(count); } //Handle block event with registered entry callback handlers. for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onBlocked(e, context, resourceWrapper, node, count, args); } throw e; } catch (Throwable e) { //Unexpected internal error, set error to current entry. context.getCurEntry().setError(e); throw e; } } //开始对请求进行规则验证时,需要调用SphU.entry()方法 //完成对请求的规则验证后,也需要调用Entry.exit()方法 @Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { Node node = context.getCurNode(); if (context.getCurEntry().getBlockError() == null) { //Calculate response time (use completeStatTime as the time of completion). //获取系统当前时间 long completeStatTime = TimeUtil.currentTimeMillis(); context.getCurEntry().setCompleteTimestamp(completeStatTime); //计算响应时间 = 系统当前事件 - 根据资源resourceWrapper创建Entry资源访问对象时的时间 long rt = completeStatTime - context.getCurEntry().getCreateTimestamp(); Throwable error = context.getCurEntry().getError(); //Record response time and success count. //记录响应时间等信息 recordCompleteFor(node, count, rt, error); recordCompleteFor(context.getCurEntry().getOriginNode(), count, rt, error); if (resourceWrapper.getEntryType() == EntryType.IN) { recordCompleteFor(Constants.ENTRY_NODE, count, rt, error); } } //Handle exit event with registered exit callback handlers. Collection<ProcessorSlotExitCallback> exitCallbacks = StatisticSlotCallbackRegistry.getExitCallbacks(); for (ProcessorSlotExitCallback handler : exitCallbacks) { handler.onExit(context, resourceWrapper, count, args); } fireExit(context, resourceWrapper, count, args); } private void recordCompleteFor(Node node, int batchCount, long rt, Throwable error) { if (node == null) { return; } node.addRtAndSuccess(rt, batchCount); node.decreaseThreadNum(); if (error != null && !(error instanceof BlockException)) { node.increaseExceptionQps(batchCount); } } }
(3)记录资源在不同维度下的调用数据
一.如何统计单机里某个资源的调用数据
二.如何统计所有资源的调用数据即接口调用数据
三.如何统计集群中某个资源的调用数据
一.如何统计单机里某个资源的调用数据
由于DefaultNode会统计名字相同的Context下的某个资源的调用数据,它是按照单机里的资源维度进行调用数据统计的,所以在StatisticSlot的entry()方法中,会调用DefaultNode的方法来进行统计。
//A Node used to hold statistics for specific resource name in the specific context. //Each distinct resource in each distinct Context will corresponding to a DefaultNode. //This class may have a list of sub DefaultNodes. //Child nodes will be created when calling SphU.entry() or SphO.entry() multiple times in the same Context. public class DefaultNode extends StatisticNode { //The resource associated with the node. private ResourceWrapper id; //Associated cluster node. private ClusterNode clusterNode; ... @Override public void increaseThreadNum() { super.increaseThreadNum(); this.clusterNode.increaseThreadNum(); } @Override public void addPassRequest(int count) { super.addPassRequest(count); this.clusterNode.addPassRequest(count); } @Override public void increaseBlockQps(int count) { super.increaseBlockQps(count); this.clusterNode.increaseBlockQps(count); } @Override public void addRtAndSuccess(long rt, int successCount) { super.addRtAndSuccess(rt, successCount); this.clusterNode.addRtAndSuccess(rt, successCount); } @Override public void decreaseThreadNum() { super.decreaseThreadNum(); this.clusterNode.decreaseThreadNum(); } ... } public class StatisticNode implements Node { //The counter for thread count. private LongAdder curThreadNum = new LongAdder(); //Holds statistics of the recent INTERVAL milliseconds. //The INTERVAL is divided into time spans by given sampleCount. private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL); //Holds statistics of the recent 60 seconds. //The windowLengthInMs is deliberately set to 1000 milliseconds, //meaning each bucket per second, in this way we can get accurate statistics of each second. private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false); ... @Override public void increaseThreadNum() { curThreadNum.increment(); } @Override public void addPassRequest(int count) { rollingCounterInSecond.addPass(count); rollingCounterInMinute.addPass(count); } @Override public void increaseBlockQps(int count) { rollingCounterInSecond.addBlock(count); rollingCounterInMinute.addBlock(count); } @Override public void addRtAndSuccess(long rt, int successCount) { rollingCounterInSecond.addSuccess(successCount); rollingCounterInSecond.addRT(rt); rollingCounterInMinute.addSuccess(successCount); rollingCounterInMinute.addRT(rt); } @Override public void decreaseThreadNum() { curThreadNum.decrement(); } ... }
二.如何统计所有资源的调用数据即接口调用数据
由于EntranceNode会统计名字相同的Context下的全部资源的调用数据,它是按接口维度来统计调用数据的,即统计接口下所有资源的调用情况,所以可以通过遍历EntranceNode的childList来统计接口的调用数据。
//A Node represents the entrance of the invocation tree. //One Context will related to a EntranceNode, //which represents the entrance of the invocation tree. //New EntranceNode will be created if current context does't have one. //Note that same context name will share same EntranceNode globally. public class EntranceNode extends DefaultNode { public EntranceNode(ResourceWrapper id, ClusterNode clusterNode) { super(id, clusterNode); } @Override public double avgRt() { double total = 0; double totalQps = 0; for (Node node : getChildList()) { total += node.avgRt() * node.passQps(); totalQps += node.passQps(); } return total / (totalQps == 0 ? 1 : totalQps); } @Override public double blockQps() { double blockQps = 0; for (Node node : getChildList()) { blockQps += node.blockQps(); } return blockQps; } @Override public long blockRequest() { long r = 0; for (Node node : getChildList()) { r += node.blockRequest(); } return r; } @Override public int curThreadNum() { int r = 0; for (Node node : getChildList()) { r += node.curThreadNum(); } return r; } @Override public double totalQps() { double r = 0; for (Node node : getChildList()) { r += node.totalQps(); } return r; } @Override public double successQps() { double r = 0; for (Node node : getChildList()) { r += node.successQps(); } return r; } @Override public double passQps() { double r = 0; for (Node node : getChildList()) { r += node.passQps(); } return r; } @Override public long totalRequest() { long r = 0; for (Node node : getChildList()) { r += node.totalRequest(); } return r; } @Override public long totalPass() { long r = 0; for (Node node : getChildList()) { r += node.totalPass(); } return r; } }
三.如何统计集群中某个资源的调用数据
由于ClusterNode会统计某个资源在全部Context下的调用数据,它是按照集群中的资源维度进行调用数据统计的,而StatisticSlot的entry()调用DefaultNode的方法统计单机下的资源时,会顺便调用ClusterNode的方法来统计集群下的资源调用,所以通过ClusterNode就可以获取集群中某个资源的调用数据。
//A Node used to hold statistics for specific resource name in the specific context. //Each distinct resource in each distinct Context will corresponding to a DefaultNode. //This class may have a list of sub DefaultNodes. //Child nodes will be created when calling SphU.entry() or SphO.entry() multiple times in the same Context. public class DefaultNode extends StatisticNode { //The resource associated with the node. private ResourceWrapper id; //Associated cluster node. private ClusterNode clusterNode; ... @Override public void increaseThreadNum() { super.increaseThreadNum(); this.clusterNode.increaseThreadNum(); } @Override public void addPassRequest(int count) { super.addPassRequest(count); this.clusterNode.addPassRequest(count); } @Override public void increaseBlockQps(int count) { super.increaseBlockQps(count); this.clusterNode.increaseBlockQps(count); } @Override public void addRtAndSuccess(long rt, int successCount) { super.addRtAndSuccess(rt, successCount); this.clusterNode.addRtAndSuccess(rt, successCount); } @Override public void decreaseThreadNum() { super.decreaseThreadNum(); this.clusterNode.decreaseThreadNum(); } ... }
3.Sentinel监听器模式的规则对象与规则管理
(1)Sentinel的规则对象
(2)Sentinel的规则管理
(1)Sentinel的规则对象
一.Sentinel中的规则其实就是配置
二.规则接口Rule和抽象父类AbstractRule及其具体实现类
一.Sentinel中的规则其实就是配置
黑白名单控制规则:例如需要设置一份配置,确定哪些请求属于黑名单、哪些请求属于白名单,那么这份配置就是黑白名单控制规则。
系统负载自适应规则:例如需要设置当CPU使用率达到90%时,系统就不再接受新请求以防止系统崩溃,那么这个90%的CPU使用率阈值就是系统负载自适应规则。
流量控制规则:例如需要设置单机QPS最高为100,那么这个单机限流100QPS便是流量控制规则。
熔断降级规则:例如需要设置当错误比例在1秒内超过10次时,系统自动触发熔断降级,那么这个1秒内超过10次的错误比例就是熔断降级规则。
二.规则接口Rule和抽象父类AbstractRule及其具体实现类
首先规则与资源是紧密关联的,规则会对资源起作用,因此规则接口Rule需要一个获取资源的方法getResource()。
然后每一条具体的规则都应继承抽象父类AbstractRule并具备三个字段:规则id、资源name以及针对来源limitApp。其中针对来源指的是诸如黑名单值、白名单值等,默认是default。
//Base interface of all rules. public interface Rule { //Get target resource of this rule. //获取当前规则起作用的目标资源 String getResource(); } //Abstract rule entity. AbstractRule是实现了规则接口Rule的抽象规则类 public abstract class AbstractRule implements Rule { //rule id. 规则id private Long id; //Resource name. 资源名称 private String resource; //针对来源,默认是default //多个来源使用逗号隔开,比如黑名单规则,限制userId是1和3的访问,那么就设置limitApp为"1,3" //Application name that will be limited by origin. //The default limitApp is default, which means allowing all origin apps. //For authority rules, multiple origin name can be separated with comma (','). private String limitApp; public Long getId() { return id; } public AbstractRule setId(Long id) { this.id = id; return this; } @Override public String getResource() { return resource; } public AbstractRule setResource(String resource) { this.resource = resource; return this; } public String getLimitApp() { return limitApp; } public AbstractRule setLimitApp(String limitApp) { this.limitApp = limitApp; return this; } ... } //Authority rule is designed for limiting by request origins. public class AuthorityRule extends AbstractRule { ... } public class SystemRule extends AbstractRule { ... } public class FlowRule extends AbstractRule { ... } public class DegradeRule extends AbstractRule { ... }
