热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

第六章数据中台PaaS层离线存储之HDFS源码剖析第一部分NameNode启动流程HadoopRpc协议详述

1、大数据源码解读思路(1)掌握其网络通信架构(2)场景驱动方式HDFS:namenodedatanode

1、大数据源码解读思路

(1)掌握其网络通信架构

(2)场景驱动方式


  • HDFS:

    • namenode datanode启动

    • 写数据得流程

    • 更新原数据流程

    • 读数据流程


2、Hadoop RPC的Demo详述


  • 含义:远程过程调用,即不同进程的方法的调用。

在这里插入图片描述


2.1、创建pom依赖

<dependency><groupId>org.apache.hadoopgroupId><artifactId>hadoop-clientartifactId><version>2.7.0version>
dependency>

2.2、相关源码调试

&#xff08;1&#xff09;创建客户端和服务端通信协议接口

/**
* 协议
* &#64;author Administrator
*
*/

public interface ClientProtocol {long versionID&#61;1234L;void makeDir(String path);
}

&#xff08;2&#xff09;服务端相关源码开发

public class NameNodeRpcServer implements ClientProtocol {/*** 创建⽬录*/&#64;Overridepublic void makeDir(String path) {System.out.println("服务端&#xff1a;"&#43;path);}// 构建服务端public static void main(String[] args) throws Exception {Server server &#61; new RPC.Builder(new Configuration()).setBindAddress("localhost").setPort(9999).setProtocol(ClientProtocol.class).setInstance(new NameNodeRpcServer()).build();//启动服务端server.start();}
}

  • 运行情况&#xff1a;服务端启动会一直等待客户端调用

在这里插入图片描述


  • 查看进程&#xff1a;通过JPS查看进程

在这里插入图片描述

&#xff08;3&#xff09;客户端相关源码开发

/**
* RPC客户端
* &#64;author Administrator
*
*/

public class DFSClient {public static void main(String[] args) throws IOException {ClientProtocol namenode &#61; RPC.getProxy(ClientProtocol.class,ClientProtocol.versionID,new InetSocketAddress("localhost",9999),new Configuration());namenode.makeDir("/user/opt/soft");}
}

  • 查看运行结果&#xff1a;客户端调用服务端方法&#xff0c;方法的执行在服务端

在这里插入图片描述

&#xff08;4&#xff09;Hadoop RPC特性

①RPC其实指的不同进程之间的调用&#xff0c;例如客户端调用服务端方法&#xff0c;方法的执行在服务端&#xff1b;

②协议接口特征&#xff1a;接口里面必须有VersionID&#xff1b;

③接口协议&#xff1a;服务端会实现接口&#xff08;协议里面的方法&#xff09;&#xff1b;

hadoop rpc的服务端&#xff0c;我们在JPS时候是可以看得到的。

⑤服务端和客户端创建


  • 服务端&#xff1a;接受参数&#xff0c;创建方法。
  • 客户端&#xff1a;传递参数&#xff0c;调用服务端方法。

3、源码剖析之namenode启动流程


3.1、第一步 - Name注释概述

&#xff08;1&#xff09;NameNode服务管理了两个重要的表


  • namespace&#xff1a;管理了文件与block之间的关系&#xff0c;存储到磁盘上。

    • 关系固定&#xff0c;不会发生变化。
  • inodes&#xff1a;管理了Block与主机之间的关系&#xff0c;每次重启重新构建。

    • 关系不固定&#xff0c;档一个节点挂掉&#xff0c;文件块就会挪到另外一台服务器上。

&#xff08;2&#xff09;NameNode服务由三个重要类支撑

①NameNode类&#xff1a;管理配置的参数

②NameNode Server


  • IPC Server&#xff1a;开发端口等待别人调用
  • HTTP Server&#xff1a;开放50070界面&#xff0c;可以通过该界面了解HDFS的情况

③FSNameNameSystem&#xff1a;管理了HDFS的元数据


3.2、NameNode的main方法源码剖析

public static void main(String argv[]) throws Exception {// 1、解析參數如果异常则退出if (DFSUtil.parseHelpArgument(argv, NameNode.USAGE, System.out, true)) {System.exit(0);}try {StringUtils.startupShutdownMessage(NameNode.class, argv, LOG);///2、创建NameNodeNameNode namenode &#61; createNameNode(argv, null);// 3、如果NameNode不为空则线程阻塞if (namenode !&#61; null) {namenode.join();}} catch (Throwable e) {LOG.error("Failed to start namenode.", e);terminate(1, e);}
}

3.3、createNameNode源码剖析

  • 集群启动参数传递&#xff1a;hadoop-daemon.sh start namenode

public static NameNode createNameNode(String argv[], Configuration conf) throws IOException {LOG.info("createNameNode " &#43; Arrays.asList(argv));if (conf &#61;&#61; null)conf &#61; new HdfsConfiguration();// 1、解析参数StartupOption startOpt &#61; parseArguments(argv);// 2、判断传入参数则执行相关枚举操作if (startOpt &#61;&#61; null) {printUsage(System.err);return null;}setStartupOption(conf, startOpt);// 3、NameNode初始化方法switch (startOpt) {......default: {DefaultMetricsSystem.initialize("NameNode");return new NameNode(conf);}}
}// NameNode初始化入口
public NameNode(Configuration conf) throws IOException {this(conf, NamenodeRole.NAMENODE);
}

3.4、NameNode初始化源码剖析

protected NameNode(Configuration conf, NamenodeRole role) try {initializeGenericKeys(conf, nsId, namenodeId);// 初始化方法initialize(conf);} catch (IOException e) {this.stop();throw e;} catch (HadoopIllegalArgumentException e) {this.stop();throw e;}this.started.set(true);
}

3.5、启动代码

protected void initialize(Configuration conf) throws IOException {// 1、启动HTTP服务if (NamenodeRole.NAMENODE &#61;&#61; role) {startHttpServer(conf);}this.spanReceiverHost &#61; SpanReceiverHost.getInstance(conf);// 2、加载元数据loadNamesystem(conf);// 3、hadoop RPC配置// NameNodeRPCServer中有两个主要的RPC服务// (1)ClientRPCServer&#xff1a;hdfs客户端去操作HDFS方法// (2)ServiceRPCServer&#xff1a;服务之间相互进行方法调用(注册、心跳等)rpcServer &#61; createRpcServer(conf);pauseMonitor &#61; new JvmPauseMonitor(conf);pauseMonitor.start();metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);startCommonServices(conf);
}

&#xff08;1&#xff09;查看httpserver相关源码


  • 相关示意图如下

在这里插入图片描述

private void startHttpServer(final Configuration conf) throws IOException {httpServer &#61; new NameNodeHttpServer(conf, this, getHttpServerBindAddress(conf));httpServer.start();httpServer.setStartupProgress(startupProgress);
}// 向上追溯
protected InetSocketAddress getHttpServerBindAddress(Configuration conf) {InetSocketAddress bindAddress &#61; getHttpServerAddress(conf);
}//向上追溯
protected InetSocketAddress getHttpServerAddress(Configuration conf) {return getHttpAddress(conf);
}//向上追溯
public static InetSocketAddress getHttpAddress(Configuration conf) {return NetUtils.createSocketAddr(conf.getTrimmed(DFS_NAMENODE_HTTP_ADDRESS_KEY, DFS_NAMENODE_HTTP_ADDRESS_DEFAULT));
}
// 查看相关参数
public static final int DFS_NAMENODE_HTTP_PORT_DEFAULT &#61; 50070;
public static final String DFS_NAMENODE_HTTP_ADDRESS_KEY &#61; "dfs.namenode.http-address";
public static final String DFS_NAMENODE_HTTP_ADDRESS_DEFAULT &#61; "0.0.0.0:" &#43; DFS_NAMENODE_HTTP_PORT_DEFAULT;

  • httpserver启动方法查看

private void startHttpServer(final Configuration conf) throws IOException {httpServer &#61; new NameNodeHttpServer(conf, this, getHttpServerBindAddress(conf));// 启动httpserver服务httpServer.start();}// 1、JDK中本来有httpserver服务&#xff0c;hadoop自己封装了一个httpserver2服务,便于hadoop自己使用
void start() throws IOException {HttpConfig.Policy policy &#61; DFSUtil.getHttpPolicy(conf);HttpServer2.Builder builder &#61; DFSUtil.httpServerTemplateForNNAndJN(conf,httpAddr, httpsAddr, "hdfs",DFSConfigKeys.DFS_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY,DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY);httpServer &#61; builder.build();// 1.1、进行httpserver相关服务配置httpServer.setAttribute(NAMENODE_ATTRIBUTE_KEY, nn);httpServer.setAttribute(JspHelper.CURRENT_CONF, conf);// 1.2、设置ServletssetupServlets(httpServer, conf);httpServer.start();// 1.3、启动httpServer服务,对外开放50070端口httpServer.start();
}// 2、servlet相关配置,根据不同的servlet进行相关功能操作。
private static void setupServlets(HttpServer2 httpServer, Configuration conf) {httpServer.addInternalServlet("startupProgress",StartupProgressServlet.PATH_SPEC, StartupProgressServlet.class);...httpServer.addInternalServlet("contentSummary", "/contentSummary/*",ContentSummaryServlet.class, false);
}

&#xff08;2&#xff09;加载元数据


  • 相关示意图如下&#xff1a;

在这里插入图片描述

// 1、从磁盘上加载元数据文件
protected void loadNamesystem(Configuration conf) throws IOException {this.namesystem &#61; FSNamesystem.loadFromDisk(conf);
}// 2、加载元数据源码分析
static FSNamesystem loadFromDisk(Configuration conf) throws IOException {checkConfiguration(conf);// 2.1、新建FSImage文件FSImage fsImage &#61; new FSImage(conf,FSNamesystem.getNamespaceDirs(conf),FSNamesystem.getNamespaceEditsDirs(conf));FSNamesystem namesystem &#61; new FSNamesystem(conf, fsImage, false);StartupOption startOpt &#61; NameNode.getStartupOption(conf);long loadStart &#61; monotonicNow();try {// 2.2、加载元数据namesystem.loadFSImage(startOpt);} catch (IOException ioe) {LOG.warn("Encountered exception loading fsimage", ioe);fsImage.close();throw ioe;}
}// 3、加载FSImage文件
private void loadFSImage(StartupOption startOpt) throws IOException {final FSImage fsImage &#61; getFSImage();try {// We shouldn&#39;t be calling saveNamespace if we&#39;ve come up in standby state.MetaRecoveryContext recovery &#61; startOpt.createRecoveryContext();// 3.1、合并元数据 : (fsimage &#43; editlog &#61; new FSImage)final boolean staleImage&#61; fsImage.recoverTransitionRead(startOpt, this, recovery);if (RollingUpgradeStartupOption.ROLLBACK.matches(startOpt) ||RollingUpgradeStartupOption.DOWNGRADE.matches(startOpt)) {rollingUpgradeInfo &#61; null;}final boolean needToSave &#61; staleImage && !haEnabled && !isRollingUpgrade(); LOG.info("Need to save fs image? " &#43; needToSave&#43; " (staleImage&#61;" &#43; staleImage &#43; ", haEnabled&#61;" &#43; haEnabled&#43; ", isRollingUpgrade&#61;" &#43; isRollingUpgrade() &#43; ")");if (needToSave) {// 3.2、将合并后的FSImage写到磁盘文件上fsImage.saveNamespace(this);} else {updateStorageVersionForRollingUpgrade(fsImage.getLayoutVersion(),startOpt);// No need to save, so mark the phase done.StartupProgress prog &#61; NameNode.getStartupProgress();prog.beginPhase(Phase.SAVING_CHECKPOINT);prog.endPhase(Phase.SAVING_CHECKPOINT);}// This will start a new log segment and write to the seen_txid file, so// we shouldn&#39;t do it when coming up in standby stateif (!haEnabled || (haEnabled && startOpt &#61;&#61; StartupOption.UPGRADE)|| (haEnabled && startOpt &#61;&#61; StartupOption.UPGRADEONLY)) {// 3.3、打开一个新的EditLog文件写入数据fsImage.openEditLogForWrite();}}

&#xff08;3&#xff09;Hadoop RPC相关操作


  • 相关示意图如下
    • ClientRPCServer&#xff1a;hdfs客户端去操作HDFS方法
    • ServiceRPCServer&#xff1a;NameNode和DataNode服务之间相互进行方法调用(注册、心跳等)

在这里插入图片描述

// 1、创建NameNodeRpcServer服务
protected NameNodeRpcServer createRpcServer(Configuration conf)throws IOException {return new NameNodeRpcServer(conf, this);
}// 2、启动serviceRpcServer服务,用来监听DataNode发送来的请求
class NameNodeRpcServer implements NamenodeProtocols {this.serviceRpcServer &#61; new RPC.Builder(conf).setProtocol(org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class).setInstance(clientNNPbService).setBindAddress(bindHost).setPort(serviceRpcAddr.getPort()).setNumHandlers(serviceHandlerCount).setVerbose(false).setSecretManager(namesystem.getDelegationTokenSecretManager()).build();
}// 3、接口继承相关协议,namenode中的不同方法对应给不同的协议
public interface NamenodeProtocolsextends ClientProtocol,DatanodeProtocol,NamenodeProtocol,RefreshAuthorizationPolicyProtocol,RefreshUserMappingsProtocol,RefreshCallQueueProtocol,GenericRefreshProtocol,GetUserMappingsProtocol,HAServiceProtocol,TraceAdminProtocol {
}

  • 协议的方法实现&#xff1a;

// 1、查看接口相关方法
public interface ClientProtocol {public boolean mkdirs(String src, FsPermission masked, boolean createParent)throws AccessControlException, FileAlreadyExistsException,FileNotFoundException, NSQuotaExceededException,ParentNotDirectoryException, SafeModeException, UnresolvedLinkException,SnapshotAccessControlException, IOException;
}// 2、真正创建mkdirs方法是RPC的服务端
public boolean mkdirs(String src, FsPermission masked, boolean createParent)throws IOException {checkNNStartup();if(stateChangeLog.isDebugEnabled()) {stateChangeLog.debug("*DIR* NameNode.mkdirs: " &#43; src);}if (!checkPathLength(src)) {throw new IOException("mkdirs: Pathname too long. Limit " &#43; MAX_PATH_LENGTH &#43; " characters, " &#43; MAX_PATH_DEPTH &#43; " levels.");}return namesystem.mkdirs(src,new PermissionStatus(getRemoteUser().getShortUserName(),null, masked), createParent);
}

推荐阅读
  • 本文介绍了Web学习历程记录中关于Tomcat的基本概念和配置。首先解释了Web静态Web资源和动态Web资源的概念,以及C/S架构和B/S架构的区别。然后介绍了常见的Web服务器,包括Weblogic、WebSphere和Tomcat。接着详细讲解了Tomcat的虚拟主机、web应用和虚拟路径映射的概念和配置过程。最后简要介绍了http协议的作用。本文内容详实,适合初学者了解Tomcat的基础知识。 ... [详细]
  • http:my.oschina.netleejun2005blog136820刚看到群里又有同学在说HTTP协议下的Get请求参数长度是有大小限制的,最大不能超过XX ... [详细]
  • 本文介绍了在rhel5.5操作系统下搭建网关+LAMP+postfix+dhcp的步骤和配置方法。通过配置dhcp自动分配ip、实现外网访问公司网站、内网收发邮件、内网上网以及SNAT转换等功能。详细介绍了安装dhcp和配置相关文件的步骤,并提供了相关的命令和配置示例。 ... [详细]
  • 本文介绍了RPC框架Thrift的安装环境变量配置与第一个实例,讲解了RPC的概念以及如何解决跨语言、c++客户端、web服务端、远程调用等需求。Thrift开发方便上手快,性能和稳定性也不错,适合初学者学习和使用。 ... [详细]
  • HDFS2.x新特性
    一、集群间数据拷贝scp实现两个远程主机之间的文件复制scp-rhello.txtroothadoop103:useratguiguhello.txt推pushscp-rr ... [详细]
  • 深入理解Kafka服务端请求队列中请求的处理
    本文深入分析了Kafka服务端请求队列中请求的处理过程,详细介绍了请求的封装和放入请求队列的过程,以及处理请求的线程池的创建和容量设置。通过场景分析、图示说明和源码分析,帮助读者更好地理解Kafka服务端的工作原理。 ... [详细]
  • centos安装Mysql的方法及步骤详解
    本文介绍了centos安装Mysql的两种方式:rpm方式和绿色方式安装,详细介绍了安装所需的软件包以及安装过程中的注意事项,包括检查是否安装成功的方法。通过本文,读者可以了解到在centos系统上如何正确安装Mysql。 ... [详细]
  • Nginx使用AWStats日志分析的步骤及注意事项
    本文介绍了在Centos7操作系统上使用Nginx和AWStats进行日志分析的步骤和注意事项。通过AWStats可以统计网站的访问量、IP地址、操作系统、浏览器等信息,并提供精确到每月、每日、每小时的数据。在部署AWStats之前需要确认服务器上已经安装了Perl环境,并进行DNS解析。 ... [详细]
  • 安装mysqlclient失败解决办法
    本文介绍了在MAC系统中,使用django使用mysql数据库报错的解决办法。通过源码安装mysqlclient或将mysql_config添加到系统环境变量中,可以解决安装mysqlclient失败的问题。同时,还介绍了查看mysql安装路径和使配置文件生效的方法。 ... [详细]
  • 阿,里,云,物,联网,net,core,客户端,czgl,aliiotclient, ... [详细]
  • 本文介绍了C#中数据集DataSet对象的使用及相关方法详解,包括DataSet对象的概述、与数据关系对象的互联、Rows集合和Columns集合的组成,以及DataSet对象常用的方法之一——Merge方法的使用。通过本文的阅读,读者可以了解到DataSet对象在C#中的重要性和使用方法。 ... [详细]
  • t-io 2.0.0发布-法网天眼第一版的回顾和更新说明
    本文回顾了t-io 1.x版本的工程结构和性能数据,并介绍了t-io在码云上的成绩和用户反馈。同时,还提到了@openSeLi同学发布的t-io 30W长连接并发压力测试报告。最后,详细介绍了t-io 2.0.0版本的更新内容,包括更简洁的使用方式和内置的httpsession功能。 ... [详细]
  • 解决VS写C#项目导入MySQL数据源报错“You have a usable connection already”问题的正确方法
    本文介绍了在VS写C#项目导入MySQL数据源时出现报错“You have a usable connection already”的问题,并给出了正确的解决方法。详细描述了问题的出现情况和报错信息,并提供了解决该问题的步骤和注意事项。 ... [详细]
  • 本文介绍了如何使用C#制作Java+Mysql+Tomcat环境安装程序,实现一键式安装。通过将JDK、Mysql、Tomcat三者制作成一个安装包,解决了客户在安装软件时的复杂配置和繁琐问题,便于管理软件版本和系统集成。具体步骤包括配置JDK环境变量和安装Mysql服务,其中使用了MySQL Server 5.5社区版和my.ini文件。安装方法为通过命令行将目录转到mysql的bin目录下,执行mysqld --install MySQL5命令。 ... [详细]
  • 本文介绍了一个适用于PHP应用快速接入TRX和TRC20数字资产的开发包,该开发包支持使用自有Tron区块链节点的应用场景,也支持基于Tron官方公共API服务的轻量级部署场景。提供的功能包括生成地址、验证地址、查询余额、交易转账、查询最新区块和查询交易信息等。详细信息可参考tron-php的Github地址:https://github.com/Fenguoz/tron-php。 ... [详细]
author-avatar
我只记得她
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有