Hadoop的RPC设计分析 光环大数据hadoop培训

编辑:光环大数据 来源: 互联网 时间: 2018-02-11 17:14 阅读:

光环大数据作为国内知名的hadoop培训的机构,聘请专业讲师面对面授课,与时俱进及时更新课程体系,为保障学员就业与多家单位进行合作,保障学员就业。光环大数据所有项目都由阿里云真实项目数据,光环大数据成为阿里云授权认证中心,毕业通过相关考试就可以获得阿里云的证书。

之前鼓捣Hbase的时候,觉得单机和伪分布式模式太low了,就在笔记本上用三个虚拟机搭建了一个“完全分布式”的Hbase环境(心疼破本子一秒钟)。刚好趁这个元旦假期,我就研究了一下hadoop。

Hadoop也算是个巨无霸了,涉及了很多方面的功能。个人工作中有多个RPC client管理以及交互的场景,一直觉得设计的不太好。所以心里一直想研究一下优秀项目的多路RPC是如何实现的,然后计划一直搁置到现在。难得小假期,就拿手上的Hadoop开刀吧!

1. 宏观背景

Hadoop的RPC确实挺复杂的,就单单以HDFS为例,client与NameNode, client与DataNode, NameNode与DataNode以及DataNode与其他DataNode。如果要提到Hadoop map/reduce,那么事情就更不简单了。虽然Hadoop的RPC如此复杂,但是这些RPC都是基于同一个RPC框架,这个RPC框架是Hadoop自己实现的。不同的RPC只需要在这个RPC框架上实现自己的通信协议即可。这篇文章里,我打算主要分析这个底层的RPC框架是如何实现的。

2. Client实现

RPC的client端实现在org.apache.hadoop.ipc这个包里面。至于这个包为啥叫ipc,我也不太明白,这个ipc也该不是inter process communication的缩写。前文已经说过了,一个Client(不仅仅是Hadoop的客户端,也可能是DataNode等等)会存在多个客户端连接。这个情况下,Hadoop的Client的内部会持有多个连接。Client有Connection、ConnectionId这样的一些内部类。其中ConnectionId包含IntetSocketAddress和一些配置信息;而Connection则就是一个Thread的子类,负责接收和发送消息。

private ConcurrentMap<ConnectionId, Connection> connections =    new ConcurrentHashMap<>();

这个connections就是Client的成员变量,代表着Client所建立的所有连接。此外Client还有一个叫Call内部类。Call代表一次RCP调用,虽然Hadoop的RPC是直接基于TCP的,但上层使用起来和REST之类的RPC还是非常相似的。Call的代码片段如下:

/**    * Class that represents an RPC call   */  static class Call {    final int id;               // call id    final int retry;           // retry count    final Writable rpcRequest;  // the serialized rpc request    Writable rpcResponse;       // null if rpc has error    IOException error;          // exception, null if success    final RPC.RpcKind rpcKind;      // Rpc EngineKind    boolean done;               // true when call is done    private final Object externalHandler;    private Call(RPC.RpcKind rpcKind, Writable param) {      this.rpcKind = rpcKind;      this.rpcRequest = param;      final Integer id = callId.get();      if (id == null) {        this.id = nextCallId();      } else {        callId.set(null);        this.id = id;      }            final Integer rc = retryCount.get();      if (rc == null) {        this.retry = 0;      } else {        this.retry = rc;      }      this.externalHandler = EXTERNAL_CALL_HANDLER.get();    }}

id表示这次RPC的调用的编号,因为这里的TCP RPC是全双工的,所以需要一个序列标识。为了保证Call的id在单个连接中唯一,Client定义了几个AtomicInteger变量。每个RPC Call都会把这个id带上,call的response里面也会带上这个id,这样客户端可以分发消息了。一个Client主要的数据结构如下图所示:

其实这个逻辑结构显得挺简单的,主要的工作还是在Connection类中完成的。Connection作为一个Thread的子类,它的run()方法其实就是不断的read,然后根据Response中的Call id分发返回消息。在具体实现中,Connection的run方法就是在while循环中不断receiveRpcResponse()。

private void receiveRpcResponse() {  if (shouldCloseConnection.get()) {    return;  }  touch();    try {    ByteBuffer bb = ipcStreams.readResponse();    RpcWritable.Buffer packet = RpcWritable.Buffer.wrap(bb);    RpcResponseHeaderProto header =        packet.getValue(RpcResponseHeaderProto.getDefaultInstance());    checkResponse(header);    int callId = header.getCallId();    if (LOG.isDebugEnabled())      LOG.debug(getName() + " got value #" + callId);    RpcStatusProto status = header.getStatus();    if (status == RpcStatusProto.SUCCESS) {      Writable value = packet.newInstance(valueClass, conf);      final Call call = calls.remove(callId);      call.setRpcResponse(value);    }    // verify that packet length was correct    if (packet.remaining() > 0) {      throw new RpcClientException("RPC response length mismatch");    }    if (status != RpcStatusProto.SUCCESS) { // Rpc Request failed      final String exceptionClassName = header.hasExceptionClassName() ?            header.getExceptionClassName() :               "ServerDidNotSetExceptionClassName";      final String errorMsg = header.hasErrorMsg() ?             header.getErrorMsg() : "ServerDidNotSetErrorMsg" ;      final RpcErrorCodeProto erCode =                 (header.hasErrorDetail() ? header.getErrorDetail() : null);      if (erCode == null) {         LOG.warn("Detailed error code not set by server on rpc error");      }      RemoteException re = new RemoteException(exceptionClassName, errorMsg, erCode);      if (status == RpcStatusProto.ERROR) {        final Call call = calls.remove(callId);        call.setException(re);      } else if (status == RpcStatusProto.FATAL) {        // Close the connection        markClosed(re);      }    }  } catch (IOException e) {    markClosed(e);  }}

看了接收逻辑,那么发送RPC call的逻辑也必不可少。有一点值得注意的是,发送RPC call都不是connection线程,所以这里需要一些线程同步方法。一般来说,会使用消息队列的方式来缓存call,然后一个发送线程不断发送call。不过Hadoop不是这样做的,它使用的是一个线程池,然后传输给线程池的是一个包装发送Call的Runnable。为什么采用这种完全task base的方法,我也没太明白。不过话说回来,也没有明显的缺点,反而是把消息队列的工作扔给线程池了,减少了一定工作量。这里简单的贴一点代码:

public void sendRpcRequest(final Call call)    throws InterruptedException, IOException {  if (shouldCloseConnection.get()) {    return;  }  // Serialize the call to be sent. This is done from the actual  // caller thread, rather than the sendParamsExecutor thread,  RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(      call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,      clientId);  final ResponseBuffer buf = new ResponseBuffer();  header.writeDelimitedTo(buf);  RpcWritable.wrap(call.rpcRequest).writeTo(buf);  synchronized (sendRpcRequestLock) {    Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() {      @Override      public void run() {        try {          synchronized (ipcStreams.out) {            if (shouldCloseConnection.get()) {              return;            }            if (LOG.isDebugEnabled()) {              LOG.debug(getName() + " sending #" + call.id);            }            // RpcRequestHeader + RpcRequest            ipcStreams.sendRequest(buf.toByteArray());            ipcStreams.flush();          }        } catch (IOException e) {          // exception at this point would leave the connection in an          // unrecoverable state (eg half a call left on the wire).          // So, close the connection, killing any outstanding calls          markClosed(e);        } finally {          //the buffer is just an in-memory buffer, but it is still polite to          // close early          IOUtils.closeStream(buf);        }      }    });  }}

3. Server实现

前面大致分析了一遍Client,然后这里就轮到了Server的实现了。Server和Client在一个包,不过这个Server是个抽象类。Server唯一的一个抽象方法就是call方法,这个方法就是处理具体请求的。不同功能的Server会有不同的业务逻辑,所以它们需要实现这个函数。

通过之前的Client分析,Server的实现也应该能猜出一二了。Server类的逻辑结构图如下:

Server类的Connection、Call与Client的非常相似,所以这里就不再赘述。相对Client来说,Server的线程模型更复杂一些。Server类有很多内部类,Listener, Responder, Handler这几个内部类都是Thread的子类。

private CallQueueManager<Call> callQueue;  // maintains the set of client connections and handles idle timeouts  private Listener listener = null;  private Responder responder = null;  private Handler[] handlers = null;

从代码片段中可以看出,一个Server类会存在一个Listener线程,一个Responder线程以及多个Handler线程。其中Listener线程是一个使用NIO的线程,接收所有的连接请求都是由Listener线程处理的。其实Listener线程内部还有多个Reader线程,Reader线程的功能是处理Accept之后的连接,构造出RpcCall请求,然后扔到CallQueueManager<Call> callQueue这个队列中。

然后Handler线程们从callQueue中取出Call并执行具体的RPC。Handler处理完之后,会以NIO channel的方式发送给Responder,Responder再实际发送给Client端。

由于Hadoop实际的通信协议有很多种,这里也就不探讨RpcInvoker的具体逻辑了。主要就是通过反射调用对应的call方法实现,也不是很难理解。

OK!这篇博客就到这了。再过几个小时就是2018年了,真的是时光如梭啊!未来的路通向哪里,我不知道,但我会加快脚步追寻光明。

 


大数据培训、人工智能培训、Python培训、大数据培训机构、大数据培训班、数据分析培训、大数据可视化培训,就选光环大数据!光环大数据,聘请专业的大数据领域知名讲师,确保教学的整体质量与教学水准。讲师团及时掌握时代潮流技术,将前沿技能融入教学中,确保学生所学知识顺应时代所需。通过深入浅出、通俗易懂的教学方式,指导学生更快的掌握技能知识,成就上万个高薪就业学子。 更多问题咨询,欢迎点击------>>>>在线客服

你可能也喜欢这些

在线客服咨询

领取资料

X
立即免费领取

请准确填写您的信息

点击领取
#第三方统计代码(模版变量) '); })();
'); })();