热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

大数据之Hadoop(MapReduce)

一MapReduce入门1.1MapReduce定义Mapreduce是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析应用”的核心框架.Mapreduce核心

一 MapReduce入门

1.1 MapReduce定义

Mapreduce是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析应用”的核心框架.

Mapreduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个hadoop集群上。

1.2 MapReduce优缺点

1.2.1 优点

1)MapReduce 易于编程。它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量廉价的PC机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。就是因为这个特点使得MapReduce编程变得非常流行。

2)良好的扩展性。当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。

3)高容错性。MapReduce设计的初衷就是使程序能够部署在廉价的PC机器上,这就要求它具有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由 Hadoop内部完成的。

4)适合PB级以上海量数据的离线处理。这里加红字体离线处理,说明它适合离线处理而不适合在线处理。比如像毫秒级别的返回一个结果,MapReduce很难做到。

1.2.2 缺点

MapReduce不擅长做实时计算、流式计算、DAG(有向图)计算。

1)实时计算。MapReduce无法像Mysql一样,在毫秒或者秒级内返回结果。

2)流式计算。流式计算的输入数据是动态的,而MapReduce的输入数据集是静态的,不能动态变化。这是因为MapReduce自身的设计特点决定了数据源必须是静态的。

3)DAG(有向图)计算。多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce并不是不能做,而是使用后,每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下。

大数据之Hadoop(MapReduce)

1)分布式的运算程序往往需要分成至少2个阶段。

2)第一个阶段的maptask并发实例,完全并行运行,互不相干。

3)第二个阶段的reduce task并发实例互不相干,但是他们的数据依赖于上一个阶段的所有maptask并发实例的输出。

4MapReduce编程模型只能包含一个map阶段和一个reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个mapreduce程序,串行运行。

1.4 MapReduce进程

一个完整的mapreduce程序在分布式运行时有三类实例进程:

1MrAppMaster:负责整个程序的过程调度及状态协调。

2MapTask:负责map阶段的整个数据处理流程。

3ReduceTask:负责reduce阶段的整个数据处理流程。

1.5 MapReduce编程规范(八股文)

用户编写的程序分成三个部分:MapperReducerDriver(提交运行mr程序的客户端)

1Mapper阶段

       1)用户自定义的Mapper要继承自己的父类

       2Mapper的输入数据是KV对的形式(KV的类型可自定义)

       3Mapper中的业务逻辑写在map()方法中

       4Mapper的输出数据是KV对的形式(KV的类型可自定义)

       5map()方法(maptask进程)对每一个调用一次

2Reducer阶段

       1)用户自定义的Reducer要继承自己的父类

       2Reducer的输入数据类型对应Mapper的输出数据类型,也是KV

       3Reducer的业务逻辑写在reduce()方法中

       4Reducetask进程对每一组相同k组调用一次reduce()方法

3Driver阶段

整个程序需要一个Drvier来进行提交,提交的是一个描述了各种必要信息的job对象

4)案例实操

       详见7.1.1统计一堆文件中单词出现的个数(WordCount案例)。

1.6 MapReduce程序运行流程分析

大数据之Hadoop(MapReduce)

1)在MapReduce程序读取文件的输入目录上存放相应的文件。

2)客户端程序在submit()方法执行前,获取待处理的数据信息,然后根据集群中参数的配置形成一个任务分配规划。

3)客户端提交job.split、jar包、job.xml等文件给yarn,yarn中的resourcemanager启动MRAppMaster

4)MRAppMaster启动后根据本次job的描述信息,计算出需要的maptask实例数量,然后向集群申请机器启动相应数量的maptask进程。

5maptask利用客户指定的inputformat来读取数据,形成输入KV对。

6maptask将输入KV对传递给客户定义的map()方法,做逻辑运算

7map()运算完毕后将KV对收集到maptask缓存。

8maptask缓存中的KV对按照K分区排序后不断写到磁盘文件

9MRAppMaster监控到所有maptask进程任务完成之后,会根据客户指定的参数启动相应数量的reducetask进程,并告知reducetask进程要处理的数据分区。

10Reducetask进程启动之后,根据MRAppMaster告知的待处理数据所在位置,从若干台maptask运行所在机器上获取到若干个maptask输出结果文件,并在本地进行重新归并排序,然后按照相同keyKV为一个组,调用客户定义的reduce()方法进行逻辑运算。

11Reducetask运算完毕后,调用客户指定的outputformat将结果数据输出到外部存储。

二 Hadoop序列化

2.1 为什么要序列化?

        一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能由本地的进程使用,不能被发送到网络上的另外一台计算机。然而序列化可以存储“活的”对象,可以将“活的”对象发送到远程计算机。

2.2 什么是序列化?

序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储(持久化)和网络传输。 

反序列化就是将收到字节序列(或其他数据传输协议)或者是硬盘的持久化数据,转换成内存中的对象。

2.3 为什么不用Java的序列化?

        Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,header,继承体系等),不便于在网络中高效传输。所以,hadoop自己开发了一套序列化机制(Writable),精简、高效。

2.4 为什么序列化对Hadoop很重要?

        因为Hadoop在集群之间进行通讯或者RPC调用的时候,需要序列化,而且要求序列化要快,且体积要小,占用带宽要小。所以必须理解Hadoop的序列化机制。

        序列化和反序列化在分布式数据处理领域经常出现:进程通信和永久存储。然而Hadoop中各个节点的通信是通过远程调用(RPC)实现的,那么RPC序列化要求具有以下特点:

1)紧凑:紧凑的格式能让我们充分利用网络带宽,而带宽是数据中心最稀缺的资

2)快速:进程通信形成了分布式系统的骨架,所以需要尽量减少序列化和反序列化的性能开销,这是基本的;

3)可扩展:协议为了满足新的需求变化,所以控制客户端和服务器过程中,需要直接引进相应的协议,这些是新协议,原序列化方式能支持新的协议报文;

4)互操作:能支持不同语言写的客户端和服务端进行交互; 

2.5 常用数据序列化类型

常用的数据类型对应的hadoop数据序列化类型

Java类型

Hadoop Writable类型

boolean

BooleanWritable

byte

ByteWritable

int

IntWritable

float

FloatWritable

long

LongWritable

double

DoubleWritable

string

Text

map

MapWritable

array

ArrayWritable

2.6 自定义bean对象实现序列化接口(Writable)

1)自定义bean对象要想序列化传输,必须实现序列化接口,需要注意以下7项。

1)必须实现Writable接口

2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造

3)重写序列化方法

4)重写反序列化方法

5)注意反序列化的顺序和序列化的顺序完全一致

6)要想把结果显示在文件中,需要重写toString(),且用”\t”分开,方便后续用

7)如果需要将自定义的bean放在key中传输,则还需要实现comparable接口,因为mapreduce框中的shuffle过程一定会对key进行排序

// 1 必须实现Writable接口

public class FlowBean implements Writable {

 

       private long upFlow;

       private long downFlow;

       private long sumFlow;

 

       //2 反序列化时,需要反射调用空参构造函数,所以必须有

       public FlowBean() {

              super();

       }

 

       /**

        * 3重写序列化方法

        *

        * @param out

        * @throws IOException

        */

       @Override

       public void write(DataOutput out) throws IOException {

              out.writeLong(upFlow);

              out.writeLong(downFlow);

              out.writeLong(sumFlow);

       }

 

       /**

        * 4 重写反序列化方法

5 注意反序列化的顺序和序列化的顺序完全一致

        *

        * @param in

        * @throws IOException

        */

       @Override

       public void readFields(DataInput in) throws IOException {

              upFlow = in.readLong();

              downFlow = in.readLong();

              sumFlow = in.readLong();

       }

 

    // 6要想把结果显示在文件中,需要重写toString(),且用”\t”分开,方便后续用

       @Override

       public String toString() {

              return upFlow + "\t" + downFlow + "\t" + sumFlow;

       }

 

    //7 如果需要将自定义的bean放在key中传输,则还需要实现comparable接口,因为mapreduce框中的shuffle过程一定会对key进行排序

       @Override

       public int compareTo(FlowBean o) {

              // 倒序排列,从大到小

              return this.sumFlow > o.getSumFlow() ? -1 : 1;

       }

}

2)案例实操

       详见7.2.1统计每一个手机号耗费的总上行流量、下行流量、总流量(序列化)。

三 MapReduce框架原理

3.1 MapReduce工作流程

1)流程示意图

大数据之Hadoop(MapReduce)

大数据之Hadoop(MapReduce)

2)流程详解

上面的流程是整个mapreduce最全工作流程,但是shuffle过程只是从第7步开始到第16步结束,具体shuffle过程详解,如下:

1maptask收集我们的map()方法输出的kv对,放到内存缓冲区中

2)从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件

3)多个溢出文件会被合并成大的溢出文件

4)在溢出过程中,及合并的过程中,都要调用partitioner进行分区和针对key进行排序

5reducetask根据自己的分区号,去各个maptask机器上取相应的结果分区数据

6reducetask会取到同一个分区的来自不同maptask的结果文件,reducetask会将这些文件再进行合并(归并排序)

7)合并成大文件后,shuffle的过程也就结束了,后面进入reducetask的逻辑运算过程(从文件中取出一个一个的键值对group,调用用户自定义的reduce()方法)

3)注意

Shuffle中的缓冲区大小会影响到mapreduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快。

缓冲区的大小可以通过参数调整,参数:io.sort.mb  默认100M

3.2 InputFormat数据输入

MapReduce任务的输入文件一般是存储在HDFS里面。输入的文件格式包括:基于行的日志文件、二进制格式文件等。这些文件一般会很大,达到数十GB,甚至更大。那么MapReduce是如何读取这些数据的呢?下面我们首先学习InputFormat接口。

3.2.1 InputFormat接口实现类

InputFormat常见的接口实现类包括:TextInputFormatKeyValueTextInputFormatNLineInputFormatCombineTextInputFormat和自定义InputFormat等。

1TextInputFormat

TextInputFormat是默认的InputFormat。每条记录是一行输入。键是LongWritable类型,存储该行在整个文件中的字节偏移量值是这行的内容,不包括任何行终止符(换行符和回车符)

以下是一个示例,比如,一个分片包含了如下4条文本记录。

Rich learning form

Intelligent learning engine

Learning more convenient

From the real demand for more close to the enterprise

每条记录表示为以下键/值对:

(0,Rich learning form)

(19,Intelligent learning engine)

(47,Learning more convenient)

(72,From the real demand for more close to the enterprise)

很明显,键并不是行号。一般情况下,很难取得行号,因为文件按字节而不是按行切分为分片。

2KeyValueTextInputFormat

每一行均为一条记录,被分隔符(缺省是tab\t))分割为keyText,valueText)。可以通过mapreduce.input.keyvaluelinerecordreader.key.value,separator属性来设定分隔符。它的默认值是一个制表符。

以下是一个示例,输入是一个包含4条记录的分片。其中——>表示一个(水平方向的)制表符。

line1 ——>Rich learning form

line2 ——>Intelligent learning engine

line3 ——>Learning more convenient

line4 ——>From the real demand for more close to the enterprise

每条记录表示为以下键/值对:

(line1,Rich learning form)

(line2,Intelligent learning engine)

(line3,Learning more convenient)

(line4,From the real demand for more close to the enterprise)

 此时的键是每行排在制表符之前的Text序列。

 3NLineInputFormat

默认情况下在对输入文件进行拆分时,会按block块的大小分成多个InputSplitInputSplit的数量取决于block的大小。每个map进程处理一个InputSplitInputSplit中有多少行记录就会调用多少次map函数。

如果使用NlineInputFormat,代表每个map进程处理的InputSplit不再按block块去划分,而是按NlineInputFormat指定的行数N来划分。即,每个InputSplit中只有N行记录数。同样InputSplit中有多少行记录就会调用多少次map函数。

以下是一个示例,仍然以上面的4行输入为例。

Rich learning form

Intelligent learning engine

Learning more convenient

From the real demand for more close to the enterprise

 例如,如果N2,则每个输入分片包含两行。一个mapper收到前两行键值对:

(0,Rich learning form)

(19,Intelligent learning engine)

另一个mapper 则收到后两行:

(47,Learning more convenient)

(72,From the real demand for more close to the enterprise)

        这里的键和值与TextInputFormat生成的一样。

3.2.2 自定义InputFormat

1)概述

(1)自定义一个类继承FileInputFormat

(2)改写RecordReader,实现一次读取一个完整文件封装为KV

(3)在输出时使用SequenceFileOutPutFormat输出合并文件

2)案例实操

       详见7.5小文件处理(自定义InputFormat)。

3.2.3 FileInputFormat切片机制

1)job提交流程源码详解

waitForCompletion()

submit();

// 1建立连接

       connect();      

              // 1)创建提交job的代理

              new Cluster(getConfiguration());

                     // (1)判断是本地yarn还是远程

                     initialize(jobTrackAddr,conf);

       // 2 提交job

submitter.submitJobInternal(Job.this, cluster)

       // 1)创建给集群提交数据的Stag路径

       Path jobStagingArea =JobSubmissionFiles.getStagingDir(cluster, conf);

       // 2)获取jobid ,并创建job路径

       JobID jobId =submitClient.getNewJobID();

       // 3)拷贝jar包到集群

copyAndConfigureFiles(job, submitJobDir);

       rUploader.uploadFiles(job,jobSubmitDir);

// 4)计算切片,生成切片规划文件

writeSplits(job, submitJobDir);

       maps = writeNewSplits(job, jobSubmitDir);

              input.getSplits(job);

// 5)向Stag路径写xml配置文件

writeConf(conf, submitJobFile);

       conf.writeXml(out);

// 6)提交job,返回提交状态

status = submitClient.submitJob(jobId,submitJobDir.toString(), job.getCredentials());

大数据之Hadoop(MapReduce)

2)FileInputFormat源码解析(input.getSplits(job))

(1)找到你数据存储的目录。

       (2)开始遍历处理(规划切片)目录下的每一个文件

       (3)遍历第一个文件ss.txt

              a)获取文件大小fs.sizeOf(ss.txt);

              b)计算切片大小computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M

c)默认情况下,切片大小=blocksize

              d)开始切,形成第1个切片:ss.txt—0:128M第2个切片ss.txt—128:256M 第3个切片ss.txt—256M:300M(每次切片时,都要判断切完剩下的部分是否大于块的1.1倍,不大于1.1倍就划分一块切片

              e)将切片信息写到一个切片规划文件中

              f)整个切片的核心过程在getSplit()方法中完成。

g)数据切片只是在逻辑上对输入数据进行分片,并不会再磁盘上将其切分成分片进行存储。InputSplit只记录了分片的元数据信息,比如起始位置、长度以及所在的节点列表等。

h)注意:blockHDFS上物理上存储的存储的数据,切片是对数据逻辑上的划分。

       4)提交切片规划文件到yarn上,yarn上的MrAppMaster就可以根据切片规划文件计算开启maptask个数。

3)FileInputFormat中默认的切片机制:

1)简单地按照文件的内容长度进行切片

2)切片大小,默认等于block大小

3)切片时不考虑数据集整体,而是逐个针对每一个文件单独切片

比如待处理数据有两个文件:

file1.txt    320M

file2.txt    10M

经过FileInputFormat的切片机制运算后,形成的切片信息如下: 

file1.txt.split1--  0~128

file1.txt.split2--  128~256

file1.txt.split3--  256~320

file2.txt.split1--  0~10M

4FileInputFormat切片大小的参数配置

1)通过分析源码,在FileInputFormat中,计算切片大小的逻辑:Math.max(minSize,Math.min(maxSize, blockSize)); 

切片主要由这几个值来运算决定

mapreduce.input.fileinputformat.split.minsize=1默认值为1

mapreduce.input.fileinputformat.split.maxsize=Long.MAXValue 默认值Long.MAXValue

因此,默认情况下,切片大小=blocksize

maxsize(切片最大值):参数如果调得比blocksize小,则会让切片变小,而且就等于配置的这个参数的值。

minsize(切片最小值):参数调的比blockSize大,则可以让切片变得比blocksize还大。

5)获取切片信息API

// 根据文件类型获取切片信息

FileSplit inputSplit = (FileSplit) context.getInputSplit();

// 获取切片的文件名称

String name = inputSplit.getPath().getName();

3.2.4 CombineTextInputFormat切片机制

关于大量小文件的优化策略

1)默认情况下TextInputformat对任务的切片机制是按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个maptask,这样如果有大量小文件,就会产生大量的maptask,处理效率极其低下。

2)优化策略

       (1)最好的办法,在数据处理系统的最前端(预处理/采集),将小文件先合并成大文件,再上传到HDFS做后续分析。

       (2)补救措施:如果已经是大量小文件在HDFS中了,可以使用另一种InputFormat来做切片(CombineTextInputFormat),它的切片逻辑跟TextFileInputFormat不同:它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个maptask。

       (3)优先满足最小切片大小,不超过最大切片大小

              CombineTextInputFormat.setMaxInputSplitSize(job,4194304);// 4m

              CombineTextInputFormat.setMinInputSplitSize(job,2097152);// 2m

       举例:0.5m+1m+0.3m+5m=2m + 4.8m=2m + 4m + 0.8m

3)具体实现步骤

// 9 如果不设置InputFormat,它默认用的是TextInputFormat.class

job.setInputFormatClass(CombineTextInputFormat.class)

CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m

CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m

4)案例实操

大量小文件的切片优化(CombineTextInputFormat)。


推荐阅读
  • Java序列化对象传给PHP的方法及原理解析
    本文介绍了Java序列化对象传给PHP的方法及原理,包括Java对象传递的方式、序列化的方式、PHP中的序列化用法介绍、Java是否能反序列化PHP的数据、Java序列化的原理以及解决Java序列化中的问题。同时还解释了序列化的概念和作用,以及代码执行序列化所需要的权限。最后指出,序列化会将对象实例的所有字段都进行序列化,使得数据能够被表示为实例的序列化数据,但只有能够解释该格式的代码才能够确定数据的内容。 ... [详细]
  • Metasploit攻击渗透实践
    本文介绍了Metasploit攻击渗透实践的内容和要求,包括主动攻击、针对浏览器和客户端的攻击,以及成功应用辅助模块的实践过程。其中涉及使用Hydra在不知道密码的情况下攻击metsploit2靶机获取密码,以及攻击浏览器中的tomcat服务的具体步骤。同时还讲解了爆破密码的方法和设置攻击目标主机的相关参数。 ... [详细]
  • 本文介绍了Python语言程序设计中文件和数据格式化的操作,包括使用np.savetext保存文本文件,对文本文件和二进制文件进行统一的操作步骤,以及使用Numpy模块进行数据可视化编程的指南。同时还提供了一些关于Python的测试题。 ... [详细]
  • 开发笔记:Python之路第一篇:初识Python
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了Python之路第一篇:初识Python相关的知识,希望对你有一定的参考价值。Python简介& ... [详细]
  • 一、Hadoop来历Hadoop的思想来源于Google在做搜索引擎的时候出现一个很大的问题就是这么多网页我如何才能以最快的速度来搜索到,由于这个问题Google发明 ... [详细]
  • VScode格式化文档换行或不换行的设置方法
    本文介绍了在VScode中设置格式化文档换行或不换行的方法,包括使用插件和修改settings.json文件的内容。详细步骤为:找到settings.json文件,将其中的代码替换为指定的代码。 ... [详细]
  • 本文分享了一个关于在C#中使用异步代码的问题,作者在控制台中运行时代码正常工作,但在Windows窗体中却无法正常工作。作者尝试搜索局域网上的主机,但在窗体中计数器没有减少。文章提供了相关的代码和解决思路。 ... [详细]
  • 本文介绍了如何使用php限制数据库插入的条数并显示每次插入数据库之间的数据数目,以及避免重复提交的方法。同时还介绍了如何限制某一个数据库用户的并发连接数,以及设置数据库的连接数和连接超时时间的方法。最后提供了一些关于浏览器在线用户数和数据库连接数量比例的参考值。 ... [详细]
  • 本文介绍了在SpringBoot中集成thymeleaf前端模版的配置步骤,包括在application.properties配置文件中添加thymeleaf的配置信息,引入thymeleaf的jar包,以及创建PageController并添加index方法。 ... [详细]
  • 背景应用安全领域,各类攻击长久以来都危害着互联网上的应用,在web应用安全风险中,各类注入、跨站等攻击仍然占据着较前的位置。WAF(Web应用防火墙)正是为防御和阻断这类攻击而存在 ... [详细]
  • Hadoop源码解析1Hadoop工程包架构解析
    1 Hadoop中各工程包依赖简述   Google的核心竞争技术是它的计算平台。Google的大牛们用了下面5篇文章,介绍了它们的计算设施。   GoogleCluster:ht ... [详细]
  • mapreduce源码分析总结
    这篇文章总结的非常到位,故而转之一MapReduce概述MapReduce是一个用于大规模数据处理的分布式计算模型,它最初是由Google工程师设计并实现的ÿ ... [详细]
  • 对于开源的东东,尤其是刚出来不久,我认为最好的学习方式就是能够看源代码和doc,測试它的样例为了方便查看源代码,关联导入源代 ... [详细]
  • MapReduce工作流程最详细解释
    MapReduce是我们再进行离线大数据处理的时候经常要使用的计算模型,MapReduce的计算过程被封装的很好,我们只用使用Map和Reduce函数,所以对其整体的计算过程不是太 ... [详细]
  • MapReduce 切片机制源码分析
     总体来说大概有以下2个大的步骤1.连接集群(yarnrunner或者是localjobrunner)2.submitter.submitJobInternal()在该方法中会创建 ... [详细]
author-avatar
49897801g9Iq
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有