热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Spark底层通信RPC源码分析

RPC通信:无论是hadoop2.x的Rpc通信方式还是Spark2.x的Rpc通信方式,简单通俗的来说就是两个进程之间的远程通信,比如java一个A项目里面有一个classA,里面有一个wash

RPC通信:无论是hadoop2.x的Rpc通信方式还是Spark2.x的Rpc通信方式,简单通俗的来说就是两个进程之间的远程通信,比如java 一个A项目里面有一个class A,里面有一个washA方法一个B项目里面有一个Class B类,里面有一个方法是washB,B项目通过代理模式以及java的反射机制调用到A项目里面的washA,这种情况下就可以理解为是一个简单的Rpc通信方式。

Spark2.x

Spark2.x使用基于RPC的通信方式,去除了1.x的Akka的实现方式,只保留了netty的实现方式,Spark2.xRpc提供了上层抽象(RpcEndpoint、RpcEnv、RpcEndPointRef),具体的实现方式只要实现了定义的抽象就可以完成Rpc通信,Spark2.x之后目前版本只保留了Netty(NettyRpcEnv、NettyRpcEndpointRef)的实现,定义抽象最大的好处相信开发的朋友都很清楚,以后不管提供了什么方式的实现只要实现了RPCEndpoint,RpcEnv,RpcEndpointRef就可以完成的通信功能。比如自己写一个自己版本的Rpc通信实现。

Spark2.x的Rpc通信方式主要包括一下几个重要方面

RpcEndpoint:消息通信体,主要是用来接收消息、处理消息,实现了RpcEndPoint接口就是一个消息通信体(Master、Work),RpcEndpoint 需要向RpcEnv注册

RpcEnv:Rpc通信的上下文环境,消息发送过来首先经过RpcEnv然后路由给对应的RpcEndPoint,得到RpcEndPoint

RpcEndPointRef:RpcEndPoint的引用如果要想某个RpcEndPoint发送消息,首先要通过RpcEnv得到RpcEndPoint的引用

RpcEndPoint 接口 里面的定义如下

 

val rpcEnv : RpcEnv //得到RpcEnv对象

final def self: RpcEndpointRef = {//返回一个RpcEnpointRef这个方法通常用来自己给自己发送消息

   rpcEnv.endpointRef(this)

  }

def receive: PartialFunction[Any, Unit]//处理RpcEndPointRef.send或者RpcEndPointRef.reply方法,该方法不需要进行响应信息

 

def receiveAndReply(context:RpcCallContext): PartialFunction[Any, Unit]//处理RpcEndPointref.ask发送的消息,处理完之后需要给调用ask的通信端响应消息(reply)

 

def onError(cause: Throwable)//处理消息失败的时候会调用此方法

def onConnected(remoteAddress: RpcAddress)//远程连接的当前节点的时候触发

def onDisconnected(remoteAddress:RpcAddress)//远程连接断开时候触发

def onNetworkError(cause: Throwable,remoteAddress: RpcAddress)//远程连接发生网络异常时触发

def onStop()//停止RpcEndPoint

def onStart()//启动RpcEndPoint,这里不仅仅是网络上说的启动RpcEndPoint处理任何消息,onStart方法里面很多情况下可以写自己的RpcEndPoint的一些实现比如启动端口,或者创建目录

但是RpcEndPoint只有在onStart方法做一些处理之后才可以接受RpcEndPointRef发送的消息

private[spark] trait ThreadSafeRpcEndpointextends RpcEndpoint//因为receive是并发操作如果要现成安全就是用threadSafeRpcEndPoint

 

RpcEndPoint的生命周期 构造-->onStart--> receive -->onStop,注意onStart的方法是在调用setRpcEndPoint注册之后就会执行任何RpcEndPoint的onStart方法都是在注册之后执行的

原因后面的源码的提到

 

RpcEndpointRef:抽象类

 

  defaddress: RpcAddress //根据主机名端口返回一个RppAddress

def name: String//name 一个字符串 暂时不知道干嘛的

def send(message: Any): Unit//向RpcEndPoint发送一个消息 不需要返回结果

 defask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]

  defask[T: ClassTag](message: Any): Future[T] = ask(message, defaultAskTimeout) //向RpcEndPoint发送消息并得到返回结果

def askWithRetry[T: ClassTag](message:Any): T = askWithRetry(message, defaultAskTimeout)// 想RpcEndPoint发送消息并在一定时间内返回结果 失败的时候并且进行一定次数的重试

 

 

RpcEnv

 

 private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef//传入RpcEndPoint得到RpcEndPointref对象

  defaddress: RpcAddress//根据主机名端口返回一个RppAddress

def setupEndpoint(name: String, endpoint:RpcEndpoint): RpcEndpointRef//注册RpcEndPoint返回对应的RpcEndPointRef

def asyncSetupEndpointRefByURI(uri:String): Future[RpcEndpointRef]//通过uri一步获取 RpcEndPointRef

 defstop(endpoint: RpcEndpointRef): Unit//停止RpcEndPoint根据RpcEndPointRef

  defshutdown(): Unit//关闭RpcEndPoint

 defawaitTermination(): Unit//等待RpcEndPoint退出

 

object RpcEnv

 defcreate(

     name: String,

     host: String,

     port: Int,

     conf: SparkConf,

     securityManager: SecurityManager,

     clientMode: Boolean = false): RpcEnv = {

   val cOnfig= RpcEnvConfig(conf, name, host, port, securityManager,clientMode)

   new NettyRpcEnvFactory().create(config)

  }

//通过RpcEnvFactory.create创建RpcEnv环境

RpcEnvConfig

 

private[spark] case class RpcEnvConfig(

   conf: SparkConf,

   name: String,

   host: String,

   port: Int,

   securityManager: SecurityManager,

   clientMode: Boolean)

case类 里面包括SparkConf,name,host,port等

 

NettyRpcEnv NettyRpcEnv通过NettyRpcEnvFactory的create方法创建

 

 valnettyEnv =

      new NettyRpcEnv(sparkConf,javaSerializerInstance, config.host, config.securityManager)// 创建nettyEnv

 private val dispatcher: Dispatcher = newDispatcher(this)

 

Dispatcher负责RPC消息的路由,它能够将消息路由到对应的RpcEndpoint进行处理,同时存放RpcEndPoint与RpcEndPointRef的映射

NettyStreamManager 负责提供文件服务(文件、JAR文件、目录)

TransportContext负责管理网路传输上下文信息:创建MessageEncoder、MessageDecoder、TransportClientFactory、TransportServer

NettyRpcHandler负责处理网络IO事件,接收RPC调用请求,并通过Dispatcher派发消息

这里说一下Dispatcher 该类主要负责Rpc消息路由里面有一个内部累EndPointData 但是有一个现成安全的Inbox这里面存放的时候收到的消息,非常重要后面会做具体分析

private class EndpointData(

     val name: String,

     val endpoint: RpcEndpoint,

     val ref: NettyRpcEndpointRef) {

   val inbox = new Inbox(ref, endpoint)

  }

 

 private val endpoints = new ConcurrentHashMap[String, EndpointData]//存放name->对应的EndPoint的信息

 private val endpointRefs = new ConcurrentHashMap[RpcEndpoint,RpcEndpointRef]//存放RpcEndpoint, RpcEndpointRef的映射关系

 

 private val receivers = new LinkedBlockingQueue[EndpointData]//队列下面会有一个现成不断的从里面取出来处理

 

 

 defregisterRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef ={

   val addr = RpcEndpointAddress(nettyEnv.address, name)

   val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)

   synchronized {

     if (stopped) {

       throw new IllegalStateException("RpcEnv has been stopped")

     }

     if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint,endpointRef)) != null) {

       throw new IllegalArgumentException(s"There is already anRpcEndpoint called $name")

      }

     val data = endpoints.get(name)

     endpointRefs.put(data.endpoint, data.ref)

     receivers.offer(data)  // for theOnStart message

    }

   endpointRef

  }

//注册RpcEndPoint在这里面发生 同时将data put到receivers 

在NettyRpcEndPoint里面有一个threadpool

private val threadpool: ThreadPoolExecutor= {

   val numThreads =nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads",

     math.max(2, Runtime.getRuntime.availableProcessors()))

   val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop")

   for (i <- 0 until numThreads) {

     pool.execute(new MessageLoop)

    }

   pool

  }

MessageLoop 是一个实现了Runnable的类,里面的run方法里面不断从receivers取出来进行处理

重要代码 data.inbox.process(Dispatcher.this)

这个里面有一个非常重要的点就是什么时候调用onStart的方法因为receivers里面存放的是EndPoint的信息同时创建EndPointData对象

进入Inbox里面看一下

 inbox =>  // Give this an aliasso we can use it more clearly in closures.

 

 @GuardedBy("this")

 protected val messages = new java.util.LinkedList[InboxMessage]()

 inbox.synchronized {

   messages.add(OnStart)

  }

创建这个类的时候会有一个messagelinkedList的list集合在创建这个结合之后就会将onStart方法添加到里面,并且是现成安全的

然后process 方法里面会不断的拿到集合的数据来进行对应的操作

 caseOnStart=>

           endpoint.onStart()

           if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {

              inbox.synchronized {

                if (!stopped) {

                  enableCOncurrent= true

                }

              }

           }

这个时候就会调用onStart方法

这个时候相当于RpcEndPoint可以接受消息并且处理了

Spark Rpc通信方式 分为本地消息和远程消息,本地消息相当于调用的方法直接存放到Index(中文收件箱),远程消息需要走NettyRpcHandler


推荐阅读
  • Tomcat/Jetty为何选择扩展线程池而不是使用JDK原生线程池?
    本文探讨了Tomcat和Jetty选择扩展线程池而不是使用JDK原生线程池的原因。通过比较IO密集型任务和CPU密集型任务的特点,解释了为何Tomcat和Jetty需要扩展线程池来提高并发度和任务处理速度。同时,介绍了JDK原生线程池的工作流程。 ... [详细]
  • java线程池的实现原理源码分析
    这篇文章主要介绍“java线程池的实现原理源码分析”,在日常操作中,相信很多人在java线程池的实现原理源码分析问题上存在疑惑,小编查阅了各式资 ... [详细]
  • 开发笔记:Spark Java API 之 CountVectorizer
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了SparkJavaAPI之CountVectorizer相关的知识,希望对你有一定的参考价值。 ... [详细]
  • 本文介绍了解决Netty拆包粘包问题的一种方法——使用特殊结束符。在通讯过程中,客户端和服务器协商定义一个特殊的分隔符号,只要没有发送分隔符号,就代表一条数据没有结束。文章还提供了服务端的示例代码。 ... [详细]
  • VScode格式化文档换行或不换行的设置方法
    本文介绍了在VScode中设置格式化文档换行或不换行的方法,包括使用插件和修改settings.json文件的内容。详细步骤为:找到settings.json文件,将其中的代码替换为指定的代码。 ... [详细]
  • 本文介绍了一个Java猜拳小游戏的代码,通过使用Scanner类获取用户输入的拳的数字,并随机生成计算机的拳,然后判断胜负。该游戏可以选择剪刀、石头、布三种拳,通过比较两者的拳来决定胜负。 ... [详细]
  • Java序列化对象传给PHP的方法及原理解析
    本文介绍了Java序列化对象传给PHP的方法及原理,包括Java对象传递的方式、序列化的方式、PHP中的序列化用法介绍、Java是否能反序列化PHP的数据、Java序列化的原理以及解决Java序列化中的问题。同时还解释了序列化的概念和作用,以及代码执行序列化所需要的权限。最后指出,序列化会将对象实例的所有字段都进行序列化,使得数据能够被表示为实例的序列化数据,但只有能够解释该格式的代码才能够确定数据的内容。 ... [详细]
  • Android中高级面试必知必会,积累总结
    本文介绍了Android中高级面试的必知必会内容,并总结了相关经验。文章指出,如今的Android市场对开发人员的要求更高,需要更专业的人才。同时,文章还给出了针对Android岗位的职责和要求,并提供了简历突出的建议。 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • 本文介绍了在Mac上搭建php环境后无法使用localhost连接mysql的问题,并通过将localhost替换为127.0.0.1或本机IP解决了该问题。文章解释了localhost和127.0.0.1的区别,指出了使用socket方式连接导致连接失败的原因。此外,还提供了相关链接供读者深入了解。 ... [详细]
  • 本文介绍了作者在开发过程中遇到的问题,即播放框架内容安全策略设置不起作用的错误。作者通过使用编译时依赖注入的方式解决了这个问题,并分享了解决方案。文章详细描述了问题的出现情况、错误输出内容以及解决方案的具体步骤。如果你也遇到了类似的问题,本文可能对你有一定的参考价值。 ... [详细]
  • switch语句的一些用法及注意事项
    本文介绍了使用switch语句时的一些用法和注意事项,包括如何实现"fall through"、default语句的作用、在case语句中定义变量时可能出现的问题以及解决方法。同时也提到了C#严格控制switch分支不允许贯穿的规定。通过本文的介绍,读者可以更好地理解和使用switch语句。 ... [详细]
  • Java学习笔记之面向对象编程(OOP)
    本文介绍了Java学习笔记中的面向对象编程(OOP)内容,包括OOP的三大特性(封装、继承、多态)和五大原则(单一职责原则、开放封闭原则、里式替换原则、依赖倒置原则)。通过学习OOP,可以提高代码复用性、拓展性和安全性。 ... [详细]
  • 本文介绍了如何使用C#制作Java+Mysql+Tomcat环境安装程序,实现一键式安装。通过将JDK、Mysql、Tomcat三者制作成一个安装包,解决了客户在安装软件时的复杂配置和繁琐问题,便于管理软件版本和系统集成。具体步骤包括配置JDK环境变量和安装Mysql服务,其中使用了MySQL Server 5.5社区版和my.ini文件。安装方法为通过命令行将目录转到mysql的bin目录下,执行mysqld --install MySQL5命令。 ... [详细]
  • 2018深入java目标计划及学习内容
    本文介绍了作者在2018年的深入java目标计划,包括学习计划和工作中要用到的内容。作者计划学习的内容包括kafka、zookeeper、hbase、hdoop、spark、elasticsearch、solr、spring cloud、mysql、mybatis等。其中,作者对jvm的学习有一定了解,并计划通读《jvm》一书。此外,作者还提到了《HotSpot实战》和《高性能MySQL》等书籍。 ... [详细]
author-avatar
G路过的彩虹
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有