【深入浅出 Yarn 架构与实现】2-2 Yarn 基础库 – 底层通信库 RPC

RPC(Remote Procedure Call) 是 Hadoop 服务通信的关键库,支撑上层分布式环境下复杂的进程间(Inter-Process Communication, IPC)通信逻辑,是分布式系统的基础。允许运行于一台计算机上的程序像调用本地方法一样,调用另一台计算机的子程序。
由于 RPC 服务整体知识较多,本节仅针对对 Yarn RPC 进行简略介绍,详细内容会后续开专栏介绍。

一、RPC 通信模型介绍

为什么会有 RPC 框架?
在分布式或微服务情境下,会有大量的服务间交互,如果用传统的 HTTP 协议端口来通信,需要耗费大量时间处理网络数据交换上,还要考虑编解码等问题。如下图所示。
【深入浅出 Yarn 架构与实现】2-2 Yarn 基础库 - 底层通信库 RPC

  • 客户端通过 RPC 框架的动态代理得到一个代理类实例,称为 Stub(桩)
  • 客户端调用接口方法(实际是 Stub 对应的方法),Stub 会构造一个请求,包括函数名和参数
  • 服务端收到这个请求后,先将服务名(函数)解析出来,查找是否有对应的服务提供者
  • 服务端找到对应的实现类后,会传入参数调用
  • 服务端 RPC 框架得到返回结果后,再进行封装返回给客户端
  • 客户端的 Stub 收到返回值后,进行解析,返回给调用者,完成 RPC 调用。

二、Hadoop RPC 介绍

一)简介

Hadoop RPC 是 Hadoop 自己实现的一个 RPC 框架,主要有以下几个特点:

  • 透明性:像调用本地方法一样调用远程方法。
  • 高性能:Hadoop 各个系统均采用 Master/Slave 结构,Master 是一个 RPC Server 用于处理各个 Slave 节点发送的请求,需要有高性能。
  • 可控性:由于 JDK 中的 RPC 框架 RMI 重量级过大,且封装度太高,不方便控制和修改。因此实现了自己的 RPC 框架,以保证轻量级、高性能、可控性。

框架原理和整体执行流程与第一节介绍的 RPC 框架一致,感兴趣可深入源码进行了解。

二)总体架构

Hadoop RPC 架构底层依靠 Java 的 nio、反射、动态代理等功能实现「客户端 - 服务器(C/S)」通信模型。
上层封装供程序调用的 RPC 接口。
【深入浅出 Yarn 架构与实现】2-2 Yarn 基础库 - 底层通信库 RPC

三、案例 demo

下面两个案例的 demo 已上传至 github。有帮助的话点个⭐️。
https://github.com/Simon-Ace/hadoop_rpc_demo

一)RPC Writable 案例实现

1、新建一个 maven 工程,添加依赖

<dependency>     <groupId>org.apache.hadoop</groupId>     <artifactId>hadoop-common</artifactId>     <version>2.8.5</version> </dependency> 

2、定义 RPC 协议

public interface BusinessProtocol {     void mkdir(String path);     String getName(String name);     long versionID = 345043000L; } 

3、定义协议实现

public class BusinessIMPL implements BusinessProtocol {     @Override     public void mkdir(String path) {         System.out.println("成功创建了文件夹 :" + path);     }      @Override     public String getName(String name) {         System.out.println("成功打了招呼: hello :" + name);         return "bigdata";     } } 

4、通过 Hadoop RPC 构建一个 RPC 服务端

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RPC;  import java.io.IOException;  public class MyServer {     public static void main(String[] args) {         try {             // 构建一个 RPC server 端,提供了一个 BussinessProtocol 协议的 BusinessIMPL 服务实现             RPC.Server server = new RPC.Builder(new Configuration())                     .setProtocol(BusinessProtocol.class)                     .setInstance(new BusinessIMPL())                     .setBindAddress("localhost")                     .setPort(6789)                     .build();              server.start();         } catch (IOException e) {             e.printStackTrace();         }     } } 

5、构建一个 RPC 客户端

import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.conf.Configuration;  import java.io.IOException; import java.net.InetSocketAddress;  public class MyClient {     public static void main(String[] args) {         try {         	// 获取代理类实例,也就是 Stub             BusinessProtocol proxy = RPC.getProxy(BusinessProtocol.class, BusinessProtocol.versionID,                     new InetSocketAddress("localhost", 6789), new Configuration());              // 通过 Stub 发送请求,实际使用就像调用本地方法一样             proxy.mkdir("/tmp/ABC");             String res = proxy.getName("Simon");             System.out.println("从 RPC 服务端接收到的返回值:" + res);         } catch (IOException e) {             e.printStackTrace();         }     } } 

6、测试,先启动服务端,再启动客户端
服务端输出

成功创建了文件夹 :/tmp/ABC 成功打了招呼: hello :Simon 

客户端输出

从 RPC 服务端接收到的返回值:bigdata 

二)RPC Protobuf 案例实现

项目结构如下
【深入浅出 Yarn 架构与实现】2-2 Yarn 基础库 - 底层通信库 RPC

对 proto 文件格式不熟悉的同学,参考上一篇文章《2-1 Yarn 基础库概述》

MyResourceTrackerMessage.proto 定义数据格式

syntax = "proto3"; option java_package = "com.shuofxz.protobuf_rpc.proto"; option java_outer_classname = "MyResourceTrackerMessageProto"; option java_generic_services = true; option java_generate_equals_and_hash = true;  message MyRegisterNodeManagerRequestProto {     string hostname = 1;     int32 cpu = 2;     int32 memory = 3; }  message MyRegisterNodeManagerResponseProto {     string flag = 1; } 

MyResourceTracker.proto 定义 rpc 接口

syntax = "proto3";  import "com/shuofxz/protobuf_rpc/proto/MyResourceTrackerMessage.proto"; option java_package = "com.shuofxz.protobuf_rpc.proto"; option java_outer_classname = "MyResourceTrackerProto"; option java_generic_services = true; option java_generate_equals_and_hash = true;  service MyResourceTrackerService {     rpc registerNodeManager(MyRegisterNodeManagerRequestProto) returns (MyRegisterNodeManagerResponseProto); } 

2、对 proto 文件编译,生成 java 类

# 在项目根目录执行,路径按照自己的进行修改 protoc -I=src/main/java --java_out=src/main/java src/main/java/com/shuofxz/protobuf_rpc/proto/MyResource.proto  protoc -I=src/main/java --java_out=src/main/java src/main/java/com/shuofxz/protobuf_rpc/proto/MyResourceTracker.proto 

3、定义调用方法接口 MyResourceTracker

import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto; import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto;  public interface MyResourceTracker {     MyRegisterNodeManagerResponseProto registerNodeManager(MyRegisterNodeManagerRequestProto request) throws Exception; }  

4、对调用方法接口的实现(服务端)

import com.shuofxz.protobuf_rpc.interf.MyResourceTracker; import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerMessageProto;  public class MyResourceTrackerImpl implements MyResourceTracker {     @Override     public MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto registerNodeManager(             MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto request) {          // 输出注册的消息         String hostname = request.getHostname();         int cpu = request.getCpu();         int memory = request.getMemory();         System.out.println("NodeManager 的注册消息: hostname = " + hostname + ", cpu = " + cpu + ", memory = " + memory);          // 省略处理逻辑         // 构建一个响应对象,用于返回         MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto.Builder builder =                 MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto.newBuilder();         // 直接返回 True         builder.setFlag("true");         MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto response = builder.build();         return response;     } } 

5、编写 proto 的协议接口

import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerProto; import org.apache.hadoop.ipc.ProtocolInfo;  @ProtocolInfo(protocolName = "com.shuofxz.blablabla", protocolVersion = 1) public interface MyResourceTrackerPB extends MyResourceTrackerProto.MyResourceTrackerService.BlockingInterface { } 

6、编写 proto 的协议接口实现(服务端)

import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; import com.shuofxz.protobuf_rpc.interf.MyResourceTracker; import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerMessageProto; import com.shuofxz.protobuf_rpc.interf.MyResourceTrackerPB;  public class MyResourceTrackerServerSidePB implements MyResourceTrackerPB {     final private MyResourceTracker server;      public MyResourceTrackerServerSidePB(MyResourceTracker server) {         this.server = server;     }      @Override     public MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto registerNodeManager(             RpcController controller, MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto request) throws ServiceException {         try {             return server.registerNodeManager(request);         } catch (Exception e) {             e.printStackTrace();         }         return null;     } } 

7、RPC Server 的实现

import com.shuofxz.protobuf_rpc.interf.MyResourceTrackerPB; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerProto;  import java.io.IOException;  public class ProtobufRpcServer {     public static void main(String[] args) throws IOException {         Configuration conf = new Configuration();          RPC.setProtocolEngine(conf, MyResourceTrackerPB.class, ProtobufRpcEngine.class);          // 构建 Rpc Server         RPC.Server server = new RPC.Builder(conf)                 .setProtocol(MyResourceTrackerPB.class)                 .setInstance(MyResourceTrackerProto.MyResourceTrackerService                         .newReflectiveBlockingService(new MyResourceTrackerServerSidePB(new MyResourceTrackerImpl())))                 .setBindAddress("localhost")                 .setPort(9998)                 .setNumHandlers(1)                 .setVerbose(true)                 .build();          // Rpc Server 启动         server.start();     } } 

8、RPC Client 的实现

import com.google.protobuf.ServiceException; import com.shuofxz.protobuf_rpc.proto.MyResourceTrackerMessageProto; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; import com.shuofxz.protobuf_rpc.interf.MyResourceTrackerPB;  import java.io.IOException; import java.net.InetSocketAddress;  public class ProtobufRpcClient {     public static void main(String[] args) throws IOException {         // 设置 RPC 引擎为 ProtobufRpcEngine         Configuration conf = new Configuration();         String hostname = "localhost";         int port = 9998;         RPC.setProtocolEngine(conf, MyResourceTrackerPB.class, ProtobufRpcEngine.class);          // 获取代理         MyResourceTrackerPB protocolProxy = RPC                 .getProxy(MyResourceTrackerPB.class, 1, new InetSocketAddress(hostname, port), conf);          // 构建请求对象         MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto.Builder builder =                 MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto.newBuilder();         MyResourceTrackerMessageProto.MyRegisterNodeManagerRequestProto bigdata02 =                 builder.setHostname("bigdata02").setCpu(64).setMemory(128).build();          // 发送 RPC 请求,获取响应         MyResourceTrackerMessageProto.MyRegisterNodeManagerResponseProto response = null;         try {             response = protocolProxy.registerNodeManager(null, bigdata02);         } catch (ServiceException e) {             e.printStackTrace();         }          // 处理响应         String flag = response.getFlag();         System.out.println("最终注册结果: flag = " + flag);     } } 

9、测试
先启动服务端,在启动客户端。

四、总结

本节介绍了 Hadoop 底层通信库 RPC。首先介绍了 RPC 的框架和原理,之后对 Hadoop 自己实现的 RPC 进行了介绍,并给出了两个 demo 实践。
强烈建议了解基础知识后,跟着 demo 实现一个案例出来,可以更好的帮助你理解。
文中 Demo:https://github.com/Simon-Ace/hadoop_rpc_demo


参考文章:
YARN-RPC网络通信架构设计
YARN-高并发RPC源码实现
Hadoop3.2.1 【 HDFS 】源码分析 : RPC原理 [八] Client端实现&源码
Hadoop RPC机制详解
Hadoop2源码分析-RPC探索实战
《Hadoop 技术内幕 - 深入解析 Yarn 结构设计与实现原理》3.3 节

发表评论

评论已关闭。

相关文章