热门标签 | HotTags
当前位置:  开发笔记 > 运维 > 正文

MapReduce2.0处理机制

MapReduce(分布式计算模型)作为Hadoop家族一重要的家庭成员主要用于搜素领域,海量数据计算等问题。内部模型采用分而治之的思想。MapReduce分为两部分(Map和Reduce)。其中Shuffler是对Reduce的预处理。map和reduce的数据处理方式均采取键对的方式:即[k1

MapReduce(分布式计算模型)作为Hadoop家族一重要的家庭成员主要用于搜素领域,海量数据计算等问题。 内部模型采用分而治之的思想。MapReduce分为两部分(Map和Reduce)。其中Shuffler是对Reduce的预处理。 map和reduce的数据处理方式均采取键对的方式:即 [k1

MapReduce(分布式计算模型)作为Hadoop家族一重要的家庭成员主要用于搜素领域,海量数据计算等问题。

内部模型采用"分而治之"的思想。MapReduce分为两部分(Map和Reduce)。其中Shuffler是对Reduce的预处理。

map和reduce的数据处理方式均采取键值对的方式:即 [k1,v1]->MAP->[K2,V2]->Reduce->[k3,v3]。

MR执行流程
(1).客户端提交一个mr的jar包给JobClient(提交方式:hadoop jar ...)
(2).JobClient通过RPC和JobTracker进行通信,返回一个存放jar包的地址(HDFS)和jobId
(3).client将jar包写入到HDFS当中(path = hdfs上的地址 + jobId)
(4).开始提交任务(任务的描述信息,不是jar, 包括jobid,jar存放的位置,配置信息等等)
(5).JobTracker进行初始化任务
(6).读取HDFS上的要处理的文件,开始计算输入分片,每一个分片对应一个MapperTask
(7).TaskTracker通过心跳机制领取任务(任务的描述信息)
(8).下载所需的jar,配置文件等
(9).TaskTracker启动一个java child子进程,用来执行具体的任务(MapperTask或ReducerTask)
(10).将结果写入到HDFS当中

在hadoop2.0以上版本中JobTracker取名为RM(resourceManage) TastTracker取名为NM(nodeManage)

mapReduce操作实现wordcount功能(即从文本中读取内容,计算出每个单词出现的次数)

程序分为3个类(自定义MAP方法功能实现,自定义REDUCE方法功能实现,最后类拼凑成mapreduce模式导成jar包,在HDFS分布式功能中实现)

1.WCMapper类(实现map)

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/*
* 给wordcount写mapper
* 定义mapper
* KEYIN:k1的类型
* VALUEIN:v1的类型
*
* 重写map方法
* hadoop没有使用jdk默认的序列化机制(long->longwriteable String->Text)
*/
public class WCMapper extends Mapper {

@Override
protected void map(LongWritable key, Text value,
Mapper.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub

// 接收信息V1
String line = value.toString();
// 切分数据
String[] words = line.split(" ");
// 循环
for (String w : words) {
// 出现一次记一个1,输出
// 构一个新的key,value
context.write(new Text(w), new LongWritable(1));
}
}

}

2.WCReducer类实现reduce功能

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/*
* KEYIN k2的类型
* VALUEIN v2的类型
*
* 重写reducer方法
*/
public class WCReducer extends Reducer {

@Override
protected void reduce(Text k2, Iterable v2s,
Reducer.Context context)
throws IOException, InterruptedException {
// 接收数据
Text k3 = k2;
// 定义一个计数器
Long count = (long) 0;
// 循环v2s
for (LongWritable i : v2s) {
count += i.get();
}
// 输出
context.write(k3, new LongWritable(count));
}

}

3.wordCount类。拼凑前两个类,符合mapreduce格式


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/*
* mapReduce
*
* 组装自定义的map和reduce
*/
public class wordCount {
public static void main(String[] args) throws Exception {
// Job job=Job.instance(new Configuration()); //版本hadoop2
Job job = new Job(new Configuration()); // 版本hadoop1

// 4.注意---将main方法中的类设进去
job.setJarByClass(wordCount.class);

// 1.设置自定义Mapper
job.setMapperClass(WCMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);

// 设置mapper读入的path(hdfs路径)
FileInputFormat.setInputPaths(job, new Path("/words.txt"));

// 2.设置reduce
job.setReducerClass(WCReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);

FileOutputFormat.setOutputPath(job, new Path("/WcountResult"));

// 3.提交
job.waitForCompletion(true); // 打印进度和详情
}
}

推荐阅读
author-avatar
余挺空荡荡_833
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有