热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

「Flink」工具人Flink之组件通信

Flink算子之间通过Netty交互,而组件之间的通信是

      最近,工具人突然被拉进了很多工作微信群。这些群有些共性:比如:

1,在这些群中,被动加入的都是工具人,而群主和他的朋友,都是工具的使用人;

2,在这些群中,工具的使用人各不相同,而工具就那么几个;

3,这些群,安静时是逢年过节,活跃时都在讨论用哪个工具人祭天;

4,这些群的名字,大部分都叫:XX需求爆肝群,XX问题背锅群,XX团队马屁群。

     所以从被迫进群的那一刻,工具人们就如同开始了一场绝地求生的游戏,不约而同地开启消息免打扰,尽量保持隐身。于是,工具人很困惑,明明已经有开会这个消磨时间的沟通方式了,为啥还要在微信中打磨工具人呢?

     在Flink中,同样让工具人困惑的是,之前学习了Flink的算子之间的数据传递过程,这部分的数据传递在Flink中是通过netty来实现的(相关阅读可以参见:「Flink」工具人之初学Flink-Task算子通信(一)和「Flink」工具人之初学Flink-Task算子通信(二)),而今天要学习的Flink组件之间的通信(如:JobMaster,TaskManager等)却是通过Akka来实现的。

在整个通信的设计的模型中,主要分为几个概念:


一,RpcService

RpcService的唯一实现就是AkkaRpcService,这就是组件事件通信的akka实体。

下图中,我们可以看到AkkaRpcService持有了进程的地址、服务端口,以及提供的一些核心服务。

在进程启动时,以JobManager为例:

ClusterEntrypoint.class

    private void runCluster(Configuration configuration) throws Exception {
    synchronized (lock) {
          //各项服务初始化,其中就包涵了AkkaService的初始化
          initializeServices(configuration);
          .....
    }
    }

    protected void initializeServices(Configuration configuration) throws Exception {


    LOG.info("Initializing cluster services.");


    synchronized (lock) {
    final String bindAddress = configuration.getString(JobManagerOptions.ADDRESS);
    final String portRange = getRPCPortRange(configuration);
    //创建RpcService
    commOnRpcService= createRpcService(configuration, bindAddress, portRange);
    ......
    }
    }


    @Nonnull
    private RpcService createRpcService(Configuration configuration, String bindAddress, String portRange) throws Exception {
        //创建了AkkaRpcService
    return AkkaRpcServiceUtils.createRpcService(bindAddress, portRange, configuration);
    }

    二,RpcEndpoint是服务提供方,而RpcGateway是服务调用方的代理类。

    如下面的类图中,我们以JobMaster举例:


    JobMaster作为JobManager中的核心服务,他是服务提供方。由于他都受到了高可用的保护,会同时存在多个节点,所以,他都继承于FencedRpcEndpoint,并提供相应的RPC服务调用。

    同时JobMaster也继承了JobMasterGateway,通过继承JobMasterGateway来约束了JobMaster必须提供的接口契约,JobMasterGateway是TaskManager中的JobMaster的代理实例,将会被TaskManager调用。


    每个服务提供方(如:JobMaster)在初始化时,都会在基类RpcEndpoint的构造过程中,调用自动自己的Akka服务,创建ActorRef对象,绑定远端Gateway的动态代理,并返回了RpcServer对象。

    RpcEndpoint.class

      protected RpcEndpoint(final RpcService rpcService, final String endpointId) {
      this.rpcService = checkNotNull(rpcService, "rpcService");
      this.endpointId = checkNotNull(endpointId, "endpointId");
          //调用AkkaRpcService创建本服务的ActorRef,并绑定远端Gateway的动态代理
      this.rpcServer = rpcService.startServer(this);


      this.mainThreadExecutor = new MainThreadExecutor(rpcServer, this::validateRunsInMainThread);
      }

      这里我们截取部分rpcService::startServer的源码略作说明:

      AkkaRpcService.class

        @Override
        public RpcServer startServer(C rpcEndpoint) {
        checkNotNull(rpcEndpoint, "rpc endpoint");
        .......
        ActorRef actorRef;
            //在这里创建了actorRef
        synchronized (lock) {
        checkState(!stopped, "RpcService is stopped");
        actorRef = actorSystem.actorOf(akkaRpcActorProps, rpcEndpoint.getEndpointId());
        actors.put(actorRef, rpcEndpoint);
        }


        LOG.info("Starting RPC endpoint for {} at {} .", rpcEndpoint.getClass().getName(), actorRef.path());


        final String akkaAddress = AkkaUtils.getAkkaURL(actorSystem, actorRef);
        final String hostname;
        Option host = actorRef.path().address().host();
        if (host.isEmpty()) {
        hostname = "localhost";
        } else {
        hostname = host.get();
        }


        Set> implementedRpcGateways = new HashSet<>(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass()));


        implementedRpcGateways.add(RpcServer.class);
        implementedRpcGateways.add(AkkaBasedEndpoint.class);


        final InvocationHandler akkaInvocationHandler;


        if (rpcEndpoint instanceof FencedRpcEndpoint) {
        // a FencedRpcEndpoint needs a FencedAkkaInvocationHandler
        akkaInvocatiOnHandler= new FencedAkkaInvocationHandler<>(
        akkaAddress,
        hostname,
        actorRef,
        configuration.getTimeout(),
        configuration.getMaximumFramesize(),
        terminationFuture,
        ((FencedRpcEndpoint) rpcEndpoint)::getFencingToken);


        implementedRpcGateways.add(FencedMainThreadExecutable.class);
        } else {
        akkaInvocatiOnHandler= new AkkaInvocationHandler(
        akkaAddress,
        hostname,
        actorRef,
        configuration.getTimeout(),
        configuration.getMaximumFramesize(),
        terminationFuture);
        }


        // Rather than using the System ClassLoader directly, we derive the ClassLoader
        // from this class . That works better in cases where Flink runs embedded and all Flink
        // code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader
        ClassLoader classLoader = getClass().getClassLoader();

        //在这里创建了动态代理
        @SuppressWarnings("unchecked")
        RpcServer server = (RpcServer) Proxy.newProxyInstance(
        classLoader,
        implementedRpcGateways.toArray(new Class[implementedRpcGateways.size()]),
        akkaInvocationHandler);


        return server;
        }


        而当JobMaster启动时,则会最终调用rpcServer的start方法

        JobMaster.class

          /**
          * Start the rpc service and begin to run the job.
          *
          * @param newJobMasterId The necessary fencing token to run the job
          * @return Future acknowledge if the job could be started. Otherwise the future contains an exception
          */
          public CompletableFuture start(final JobMasterId newJobMasterId) throws Exception {
          // make sure we receive RPC and async calls
          super.start();


          return callAsyncWithoutFencing(() -> startJobExecution(newJobMasterId), RpcUtils.INF_TIMEOUT);
          }

          这里的rpcEndpoint就是实际的ActorRef对象,向Actor发送了一个匿名的START指令,启动了endpoint。

          RpcEndpoint.class

            @Override
            public void start() {
            rpcEndpoint.tell(Processing.START, ActorRef.noSender());
            }

            三,连接过程

            当TaskManager收到了ResourceManager的创建资源的消息后,会主动通知JobMaster,然后JobMaster会发起到TaskExecutor的连接。

            JobLeaderService.class

              /**
              * Retrying registration for the job manager <--> task manager connection.
              */
              private static final class JobManagerRetryingRegistration
              extends RetryingRegistration<JobMasterId, JobMasterGateway, JMTMRegistrationSuccess> {


              private final String taskManagerRpcAddress;


              private final TaskManagerLocation taskManagerLocation;


              JobManagerRetryingRegistration(
              Logger log,
              RpcService rpcService,
              String targetName,
              Class targetType,
              String targetAddress,
              JobMasterId jobMasterId,
              RetryingRegistrationConfiguration retryingRegistrationConfiguration,
              String taskManagerRpcAddress,
              TaskManagerLocation taskManagerLocation) {
              super(
              log,
              rpcService,
              targetName,
              targetType,
              targetAddress,
              jobMasterId,
              retryingRegistrationConfiguration);


              this.taskManagerRpcAddress = taskManagerRpcAddress;
              this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
              }


              @Override
              protected CompletableFuture invokeRegistration(
              JobMasterGateway gateway,
              JobMasterId jobMasterId,
              long timeoutMillis) throws Exception {

                    //通知JobMaster,来连接TaskManager
              return gateway.registerTaskManager(taskManagerRpcAddress, taskManagerLocation, Time.milliseconds(timeoutMillis));
              }
              }

              这里其实就是通过动态代理AkkaInvocationHandler发起了远程调用。

              在动态代理AkkaInvocationHandler内部,会根据调用的方判断是走本地调用还是远程调用。

                @Override
                public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                Class declaringClass = method.getDeclaringClass();


                Object result;


                if (declaringClass.equals(AkkaBasedEndpoint.class) ||
                declaringClass.equals(Object.class) ||
                declaringClass.equals(RpcGateway.class) ||
                declaringClass.equals(StartStoppable.class) ||
                declaringClass.equals(MainThreadExecutable.class) ||
                      declaringClass.equals(RpcServer.class)) {
                result = method.invoke(this, args);
                } else if (declaringClass.equals(FencedRpcGateway.class)) {
                throw new UnsupportedOperationException("AkkaInvocationHandler does not support the call FencedRpcGateway#" +
                method.getName() + ". This indicates that you retrieved a FencedRpcGateway without specifying a " +
                "fencing token. Please use RpcService#connect(RpcService, F, Time) with F being the fencing token to " +
                "retrieve a properly FencedRpcGateway.");
                    } else {
                result = invokeRpc(method, args);
                    }
                return result;
                }

                在Flink中,有些场景看起来像是发起了一个Rpc调用,其实调用的还是本地服务,比如:Standalone模式下向ResourceManager申请资源等。所以,invokeRpc中将所有的调用参数统一封装为一个叫RpcInvocation的数据结构。

                它的两个子类LocalRpcInvocation和RemoteRpcInvocation 唯一的区别就是RemoteRpcInvocation 中的数据经过了序列化,方便网络传递。

                  protected RpcInvocation createRpcInvocationMessage(
                  final String methodName,
                  final Class[] parameterTypes,
                  final Object[] args) throws IOException {
                  final RpcInvocation rpcInvocation;


                  if (isLocal) {
                  rpcInvocation = new LocalRpcInvocation(
                  methodName,
                  parameterTypes,
                  args);
                  } else {
                  try {
                  RemoteRpcInvocation remoteRpcInvocation = new RemoteRpcInvocation(
                  methodName,
                  parameterTypes,
                  args);


                  if (remoteRpcInvocation.getSize() > maximumFramesize) {
                  throw new IOException("The rpc invocation size exceeds the maximum akka framesize.");
                  } else {
                  rpcInvocation = remoteRpcInvocation;
                  }
                  } catch (IOException e) {
                  LOG.warn("Could not create remote rpc invocation message. Failing rpc invocation because...", e);
                  throw e;
                  }
                  }


                  return rpcInvocation;
                  }


                  收到请求后 JobMaster连接TaskExecutor,并返回TaskExecutorGateway

                  JobMaster.class

                    @Override
                    public CompletableFuture registerTaskManager(
                    final String taskManagerRpcAddress,
                    final TaskManagerLocation taskManagerLocation,
                    final Time timeout) {


                    final ResourceID taskManagerId = taskManagerLocation.getResourceID();


                    if (registeredTaskManagers.containsKey(taskManagerId)) {
                    final RegistrationResponse respOnse= new JMTMRegistrationSuccess(resourceId);
                    return CompletableFuture.completedFuture(response);
                    } else {
                          // 连接TaskManager 并返回TaskExecutor
                    return getRpcService()
                    .connect(taskManagerRpcAddress, TaskExecutorGateway.class)
                    .handleAsync(
                    (TaskExecutorGateway taskExecutorGateway, Throwable throwable) -> {
                    ......
                    },
                    getMainThreadExecutor());
                    }
                    }


                    总结一下,以上整体过程大致如下图所示:



                    最后,工具人表示,在数不清的微信群中疲于奔命,会影响工作效率和质量;工具人们是喝咖啡集中精力思考与创造的工程师,而不是喝着麻辣烫,服务那个伺候这个的保姆。



                    推荐阅读
                    • http:my.oschina.netleejun2005blog136820刚看到群里又有同学在说HTTP协议下的Get请求参数长度是有大小限制的,最大不能超过XX ... [详细]
                    • Java太阳系小游戏分析和源码详解
                      本文介绍了一个基于Java的太阳系小游戏的分析和源码详解。通过对面向对象的知识的学习和实践,作者实现了太阳系各行星绕太阳转的效果。文章详细介绍了游戏的设计思路和源码结构,包括工具类、常量、图片加载、面板等。通过这个小游戏的制作,读者可以巩固和应用所学的知识,如类的继承、方法的重载与重写、多态和封装等。 ... [详细]
                    • 本文介绍了如何使用C#制作Java+Mysql+Tomcat环境安装程序,实现一键式安装。通过将JDK、Mysql、Tomcat三者制作成一个安装包,解决了客户在安装软件时的复杂配置和繁琐问题,便于管理软件版本和系统集成。具体步骤包括配置JDK环境变量和安装Mysql服务,其中使用了MySQL Server 5.5社区版和my.ini文件。安装方法为通过命令行将目录转到mysql的bin目录下,执行mysqld --install MySQL5命令。 ... [详细]
                    • 如何自行分析定位SAP BSP错误
                      The“BSPtag”Imentionedintheblogtitlemeansforexamplethetagchtmlb:configCelleratorbelowwhichi ... [详细]
                    • Nginx使用AWStats日志分析的步骤及注意事项
                      本文介绍了在Centos7操作系统上使用Nginx和AWStats进行日志分析的步骤和注意事项。通过AWStats可以统计网站的访问量、IP地址、操作系统、浏览器等信息,并提供精确到每月、每日、每小时的数据。在部署AWStats之前需要确认服务器上已经安装了Perl环境,并进行DNS解析。 ... [详细]
                    • 本文介绍了闭包的定义和运转机制,重点解释了闭包如何能够接触外部函数的作用域中的变量。通过词法作用域的查找规则,闭包可以访问外部函数的作用域。同时还提到了闭包的作用和影响。 ... [详细]
                    • Iamtryingtomakeaclassthatwillreadatextfileofnamesintoanarray,thenreturnthatarra ... [详细]
                    • Java容器中的compareto方法排序原理解析
                      本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
                    • JavaSE笔试题-接口、抽象类、多态等问题解答
                      本文解答了JavaSE笔试题中关于接口、抽象类、多态等问题。包括Math类的取整数方法、接口是否可继承、抽象类是否可实现接口、抽象类是否可继承具体类、抽象类中是否可以有静态main方法等问题。同时介绍了面向对象的特征,以及Java中实现多态的机制。 ... [详细]
                    • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
                    • 本文介绍了Web学习历程记录中关于Tomcat的基本概念和配置。首先解释了Web静态Web资源和动态Web资源的概念,以及C/S架构和B/S架构的区别。然后介绍了常见的Web服务器,包括Weblogic、WebSphere和Tomcat。接着详细讲解了Tomcat的虚拟主机、web应用和虚拟路径映射的概念和配置过程。最后简要介绍了http协议的作用。本文内容详实,适合初学者了解Tomcat的基础知识。 ... [详细]
                    • 自动轮播,反转播放的ViewPagerAdapter的使用方法和效果展示
                      本文介绍了如何使用自动轮播、反转播放的ViewPagerAdapter,并展示了其效果。该ViewPagerAdapter支持无限循环、触摸暂停、切换缩放等功能。同时提供了使用GIF.gif的示例和github地址。通过LoopFragmentPagerAdapter类的getActualCount、getActualItem和getActualPagerTitle方法可以实现自定义的循环效果和标题展示。 ... [详细]
                    • Java学习笔记之面向对象编程(OOP)
                      本文介绍了Java学习笔记中的面向对象编程(OOP)内容,包括OOP的三大特性(封装、继承、多态)和五大原则(单一职责原则、开放封闭原则、里式替换原则、依赖倒置原则)。通过学习OOP,可以提高代码复用性、拓展性和安全性。 ... [详细]
                    • 先看官方文档TheJavaTutorialshavebeenwrittenforJDK8.Examplesandpracticesdescribedinthispagedontta ... [详细]
                    • 本文详细介绍了Java中vector的使用方法和相关知识,包括vector类的功能、构造方法和使用注意事项。通过使用vector类,可以方便地实现动态数组的功能,并且可以随意插入不同类型的对象,进行查找、插入和删除操作。这篇文章对于需要频繁进行查找、插入和删除操作的情况下,使用vector类是一个很好的选择。 ... [详细]
                    author-avatar
                    lucky_笨鸟_660
                    这个家伙很懒,什么也没留下!
                    PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
                    Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有