热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

SQOOP源码分析1ToolRunner实现Window平台向Hadoop集群提交任务

SQOOP源码系列文章是把SQOOP源码详细、简单地介绍给大家,本系列文章分为3个部分,一是ToolRunner从Window本地提交MapReduce任务到HDFS,二是Sqoo

SQOOP源码系列文章是把SQOOP源码详细、简单地介绍给大家,本系列文章分为3个部分,一是ToolRunner从Window本地提交MapReduce任务到HDFS,二是Sqoop从读取配置文件到提交任务的过程分析,三是Sqoop中Map切割表数据到导入表的过程。

导读:SQOOP通过生成的MapReduce向hadoop集群提交任务,然而这个过程是怎样的呢,我们通过Hadoop提供的WordCount来模拟实现这一个过程,在往后的文章会详细地分析这些过程。

1.WordCount实现

package master.hadooptool;

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCount {

	public static class TokenizerMapper extends Mapper {
		
		private final IntWritable One= new IntWritable(1);
		private Text word = new Text();

		public void map(Object key, Text value, Mapper.Context context)
				throws IOException, InterruptedException {
			StringTokenizer itr = new StringTokenizer(value.toString());
			while (itr.hasMoreTokens()) {
				this.word.set(itr.nextToken());
				context.write(this.word, one);
			}
		}
	}

	public static class IntSumReducer extends Reducer {
		
		private IntWritable result = new IntWritable();
		
		public void reduce(Text key, Iterable values,
				Reducer.Context context)
				throws IOException, InterruptedException {
			int sum = 0;
			for (IntWritable val : values) {
				sum += val.get();
			}
			this.result.set(sum);
			context.write(key, this.result);
		}
	}
}

WordCount写好后,把其打包成wordcount.jar,放到本地

2.运行Tool实现
SqoopTool继承了Configured并且实现了Tool类,因此我们的运行工具类也需要实现。改类实现了以后,ToolRunner运行的时候,就会把本地的Jar包上传到HDFS上,不需要我们操作

package master.hadooptool;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintStream;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.util.GenericOptionsParser;

import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.PropertyConfigurator;

public class WordCountTool extends Configured implements Tool {

	public static void main(String[] args) throws Exception {

		PropertyConfigurator.configure("log4j.properties");
		System.setProperty("HADOOP_USER_NAME", "hadoop");
		
		//加载Hadoop配置文件到Configuration中
		String HADOOP_CONFS[] = { "core-site.xml", "hdfs-site.xml", "mapred-site.xml", "yarn-site.xml",
				"hive2-site.xml", "hbase-site.xml" };
		Configuration cOnf= new Configuration();
		for (String name : HADOOP_CONFS) {
			File file = new File(name);
			if (!file.exists()) {
				continue;
			}
			FileInputStream in = null;
			try {
				in = new FileInputStream(file);
			} catch (FileNotFoundException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			conf.addResource(in);
		}

		WordCountTool tool = new WordCountTool();
		tool.setConf(conf);

		//调用ToolRunner来运行文件
		ToolRunner.run(tool.getConf(), tool, new String[] {});
	}


	public int run(String[] args) throws Exception {
	
		Configuration cOnf=super.getConf();
		
		conf.setBoolean("mapreduce.app-submission.cross-platform", true);//设置跨平台提交
		conf.set("tmpjars", "file:/E:/hadoop/wordcount.jar"); // 加载wordcontjar文件,注意路径格式
		
		// job.getConfiguration().set("mapred.jar","E:/hadoop/wordcount.jar");
		//mapred.jar是MapReduce所在的文件,tmpjars是MapReduce依赖库,我们没有依赖库,选择其中一个就可以,注意路径格式
		
		//构建Job
		Job job = Job.getInstance(conf, "wordcount");
		job.setJarByClass(WordCountTool.class);
		job.setMapperClass(WordCount.TokenizerMapper.class);
		job.setCombinerClass(WordCount.IntSumReducer.class);
		job.setReducerClass(WordCount.IntSumReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		//设置输入输出路径,需要准备一个文本格式的文件放到HDFS中
		FileInputFormat.addInputPath(job, new Path("/wordcount/wordcount.txt"));
		FileOutputFormat.setOutputPath(job, new Path("/wordcount/result"));

		int n=job.waitForCompletion(true)?0:-1;
		
		return n;
	}

}

3.依赖库和配置文件
MavenDependency的pom.xml依赖


		
			junit
			junit
			3.8.1
			test
		
		
			org.apache.hadoop
			hadoop-common
			2.8.1
		
		
			org.apache.hadoop
			hadoop-mapreduce-client-common
			2.8.1
		
		
			org.apache.hadoop
			hadoop-mapreduce-client-jobclient
			2.8.1
			provided
		
		
			org.apache.hadoop
			hadoop-hdfs-client
			2.8.1
			provided
		
	

core-site.xml


	
		fs.defaultFS
		hdfs://hadoopmaster:9000
	
	
		fs.hdfs.impl
		org.apache.hadoop.hdfs.DistributedFileSystem
		The FileSystem for hdfs: uris.
	
	
		hadoop.tmp.dir
		/soft/hadoop_data/tmp
	
	
		hadoop.proxyuser.root.hosts
		*
	
	
		hadoop.proxyuser.root.groups
		*
	
		    
		hadoop.proxyuser.hadoop.hosts
		*
	
	
		hadoop.proxyuser.hadoop.groups
		*
	

mapred-site.xml

 
  
    mapreduce.framework.name
    yarn
  

yarn-site.xml


 
    yarn.resourcemanager.hostname
    hadoopmaster
  
  
    yarn.nodemanager.aux-services
    mapreduce_shuffle
  
  
    yarn.nodemanager.aux-services.mapreduce_shuffle.class
    org.apache.hadoop.mapred.ShuffleHandler
  
  
    yarn.nodemanager.vmem-check-enabled
    false
    Whether virtual memory limits will be enforced for containers
  
  
    yarn.nodemanager.vmem-pmem-ratio
    4
    Ratio between virtual memory to physical memory when setting memory limits for containers
  

在以上的配置文件中,需要把hadoopmaster改成自己Hadoop集群的IP

4.运行结果
SQOOP源码分析1----ToolRunner实现Window平台向Hadoop集群提交任务

5.总结
Hadoop提供ToolRunner远程向Hadoop集群提交MapReduce任务,在Job中配置好tmpjars和mapred.jar后,集群会把Jar上传到HDFS,然后运行任务。
在现实的操作中还会遇到很多问题,需要我们冷静地分析,然后解决问题,不断学习、不断进步。
最后,有什么问题可以给我留言。。。


推荐阅读
  • 我们在之前的文章中已经初步介绍了Cloudera。hadoop基础----hadoop实战(零)-----hadoop的平台版本选择从版本选择这篇文章中我们了解到除了hadoop官方版本外很多 ... [详细]
  • 本文介绍了在Mac上搭建php环境后无法使用localhost连接mysql的问题,并通过将localhost替换为127.0.0.1或本机IP解决了该问题。文章解释了localhost和127.0.0.1的区别,指出了使用socket方式连接导致连接失败的原因。此外,还提供了相关链接供读者深入了解。 ... [详细]
  • 本文介绍了在Windows环境下如何配置php+apache环境,包括下载php7和apache2.4、安装vc2015运行时环境、启动php7和apache2.4等步骤。希望对需要搭建php7环境的读者有一定的参考价值。摘要长度为169字。 ... [详细]
  • 大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记
    本文介绍了大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记,包括outputFormat接口实现类、自定义outputFormat步骤和案例。案例中将包含nty的日志输出到nty.log文件,其他日志输出到other.log文件。同时提供了一些相关网址供参考。 ... [详细]
  • 本文介绍了在sqoop1.4.*版本中,如何实现自定义分隔符的方法及步骤。通过修改sqoop生成的java文件,并重新编译,可以满足实际开发中对分隔符的需求。具体步骤包括修改java文件中的一行代码,重新编译所需的hadoop包等。详细步骤和编译方法在本文中都有详细说明。 ... [详细]
  • Hadoop源码解析1Hadoop工程包架构解析
    1 Hadoop中各工程包依赖简述   Google的核心竞争技术是它的计算平台。Google的大牛们用了下面5篇文章,介绍了它们的计算设施。   GoogleCluster:ht ... [详细]
  • 对于开源的东东,尤其是刚出来不久,我认为最好的学习方式就是能够看源代码和doc,測试它的样例为了方便查看源代码,关联导入源代 ... [详细]
  • Hadoop之Yarn
    目录1Hadoop1.x和Hadoop2.x架构区别2Yarn概述3Yarn基本架构4Yarn工作机制5作业提交全过程6资源调度器7任务的推测执行1Hadoop1.x和Hadoo ... [详细]
  •     这里使用自己编译的hadoop-2.7.0版本部署在windows上,记得几年前,部署hadoop需要借助于cygwin,还需要开启ssh服务,最近发现,原来不需要借助cy ... [详细]
  • Maven构建Hadoop,
    Maven构建Hadoop工程阅读目录序Maven安装构建示例下载系列索引 序  上一篇,我们编写了第一个MapReduce,并且成功的运行了Job,Hadoop1.x是通过ant ... [详细]
  • mapreduce源码分析总结
    这篇文章总结的非常到位,故而转之一MapReduce概述MapReduce是一个用于大规模数据处理的分布式计算模型,它最初是由Google工程师设计并实现的ÿ ... [详细]
  • 《Spark核心技术与高级应用》——1.2节Spark的重要扩展
    本节书摘来自华章社区《Spark核心技术与高级应用》一书中的第1章,第1.2节Spark的重要扩展,作者于俊向海代其锋马海平,更多章节内容可以访问云栖社区“华章社区”公众号查看1. ... [详细]
  • MapReduce工作流程最详细解释
    MapReduce是我们再进行离线大数据处理的时候经常要使用的计算模型,MapReduce的计算过程被封装的很好,我们只用使用Map和Reduce函数,所以对其整体的计算过程不是太 ... [详细]
  • MapReduce 切片机制源码分析
     总体来说大概有以下2个大的步骤1.连接集群(yarnrunner或者是localjobrunner)2.submitter.submitJobInternal()在该方法中会创建 ... [详细]
  • 毕设做到后半部分,需要实现将用户在一段时间(比如1天)内产生的新数据导入HDFS的功能,这样数据仓库中的数据才能和数据库中的数据同步在新建了一个PyDev项目后,需要如下操作(拣最 ... [详细]
author-avatar
乃_黄包11_753
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有