Netty 学习(十):ChannelPipeline源码说明

Netty 学习(十):ChannelPipeline源码说明

作者: Grey

原文地址:

博客园:Netty 学习(十):ChannelPipeline源码说明

CSDN:Netty 学习(十):ChannelPipeline源码说明

ChannelPipeline可以看作一条流水线,原料(字节流)进来,经过加工,形成一个个Java对象,然后基于这些对象进行处理,最后输出二进制字节流。

ChannelPipeline 在创建 NioSocketChannel 的时候创建, 其默认实现是 DefaultChannelPipeline

    final AbstractChannelHandlerContext head;     final AbstractChannelHandlerContext tail;     protected DefaultChannelPipeline(Channel channel) {         this.channel = ObjectUtil.checkNotNull(channel, "channel");         succeededFuture = new SucceededChannelFuture(channel, null);         voidPromise =  new VoidChannelPromise(channel, true);          tail = new TailContext(this);         head = new HeadContext(this);          head.next = tail;         tail.prev = head;     } 

ChannelPipeline 中保存了 Channel 的引用,且其中每个节点都是一个 ChannelHandlerContext 对象。每个 ChannelHandlerContext 节点都保存了执行器(即:ChannelHandler)。

ChannelPipeline里有两种不同的节点,一种是 ChannelInboundHandler,处理 inbound 事件(例如:读取数据流),还有一种是 ChannelOutboundHandler,处理 Outbound 事件,比如调用writeAndFlush()类方法时,就会调用该 handler。

添加 handler 的逻辑如下

    @Override     public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {         final AbstractChannelHandlerContext newCtx;         synchronized (this) {             // 检查是否有重复的 handler             checkMultiplicity(handler);             // 创建 节点             newCtx = newContext(group, filterName(name, handler), handler);             // 添加节点             addLast0(newCtx);              // If the registered is false it means that the channel was not registered on an eventLoop yet.             // In this case we add the context to the pipeline and add a task that will call             // ChannelHandler.handlerAdded(...) once the channel is registered.             if (!registered) {                 newCtx.setAddPending();                 callHandlerCallbackLater(newCtx, true);                 return this;             }              EventExecutor executor = newCtx.executor();             if (!executor.inEventLoop()) {                 callHandlerAddedInEventLoop(newCtx, executor);                 return this;             }         }         // 回调用户方法         callHandlerAdded0(newCtx);         return this;     } 

如上代码,整个添加过程见注释说明,其实主要就是四步:

第一步:检查是否有重复的 handler,核心逻辑见

    private static void checkMultiplicity(ChannelHandler handler) {         if (handler instanceof ChannelHandlerAdapter) {             ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;             if (!h.isSharable() && h.added) {                 // 非共享的且添加过,就抛出异常,反之,如果一个 handler 支持共享,就可以无限次被添加到 ChannelPipeline 中                 throw new ChannelPipelineException(                         h.getClass().getName() +                         " is not a @Sharable handler, so can't be added or removed multiple times.");             }             h.added = true;         }     } 

第二步:创建节点,即把 handler 包裹成 ChannelHandlerContext,核心逻辑如下

    private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =             new FastThreadLocal<Map<Class<?>, String>>() {         @Override         protected Map<Class<?>, String> initialValue() {             return new WeakHashMap<Class<?>, String>();         }     };     private String generateName(ChannelHandler handler) {         Map<Class<?>, String> cache = nameCaches.get();         Class<?> handlerType = handler.getClass();         String name = cache.get(handlerType);         if (name == null) {             name = generateName0(handlerType);             cache.put(handlerType, name);         }          // It's not very likely for a user to put more than one handler of the same type, but make sure to avoid         // any name conflicts.  Note that we don't cache the names generated here.         if (context0(name) != null) {             String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.             for (int i = 1;; i ++) {                 String newName = baseName + i;                 if (context0(newName) == null) {                     name = newName;                     break;                 }             }         }         return name;     } 

注:Netty 使用 FastThreadLocal 变量来缓存 Handler 的类和名称的映射关系,在生成 name 的时候,首先看缓存中有没有生成过默认 name,如果没有生成,就调用generateName0()生成默认名称,加入缓存。

第三步:把 ChannelHandlerContext 作为节点添加到 pipeline 中,核心代码如下

    private void addLast0(AbstractChannelHandlerContext newCtx) {         AbstractChannelHandlerContext prev = tail.prev;         newCtx.prev = prev;         newCtx.next = tail;         prev.next = newCtx;         tail.prev = newCtx;     } 

其本质就是一个双向链表的插入节点过程,而且,ChannelPipeline 删除 ChannelHandler 的方法,本质就是把这个双向链表的某个节点删掉!

第四步:回调用户方法,核心代码如下

private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {         try {             ctx.callHandlerAdded();         } catch (Throwable t) {             boolean removed = false;             try {                 atomicRemoveFromHandlerList(ctx);                 ctx.callHandlerRemoved();                 removed = true;             } catch (Throwable t2) {                 if (logger.isWarnEnabled()) {                     logger.warn("Failed to remove a handler: " + ctx.name(), t2);                 }             }              if (removed) {                 fireExceptionCaught(new ChannelPipelineException(                         ctx.handler().getClass().getName() +                         ".handlerAdded() has thrown an exception; removed.", t));             } else {                 fireExceptionCaught(new ChannelPipelineException(                         ctx.handler().getClass().getName() +                         ".handlerAdded() has thrown an exception; also failed to remove.", t));             }         }     }     final void callHandlerRemoved() throws Exception {         try {             // Only call handlerRemoved(...) if we called handlerAdded(...) before.             if (handlerState == ADD_COMPLETE) {                 handler().handlerRemoved(this);             }         } finally {             // Mark the handler as removed in any case.             setRemoved();         }     } 

其中ctx.callHandlerAdded();就是回调用户的handlerAdded方法,然后通过 CAS 方式修改节点的状态为 REMOVE_COMPLETE (说明该节点已经被移除),或者 ADD_COMPLETE (添加完成)。

完整代码见:hello-netty

本文所有图例见:processon: Netty学习笔记

更多内容见:Netty专栏

参考资料

跟闪电侠学 Netty:Netty 即时聊天实战与底层原理

深度解析Netty源码

发表评论

相关文章