要求:统计出每个单词的数目,显示结果为单词 单词的长度 单词的数目
分析:由于MapReduce中的数据传输只能以
我们可以将单词与单词对应的长度拼接成一列数据进行传输
代码实现:
package com.miao.wordcount;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;/*** @ClassName WordConcat* @Description TODO 统计每个单词出现的次数,并且单词与对应长度拼接* @Date 2021-04-27 20:08:50* @Create By Miao*/
public class WordConcat extends Configured implements Tool {public int run(String[] args) throws Exception {//构建JobJob job = Job.getInstance(this.getConf(),"wordConcat");job.setJarByClass(WordConcat.class);//配置Jobjob.setInputFormatClass(TextInputFormat.class);//指定输入源TextInputFormat.setInputPaths(job,new Path("D:\\Study\\idea\\MavenProject\\count.txt"));job.setMapperClass(WCMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);job.setReducerClass(WCReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);job.setOutputFormatClass(TextOutputFormat.class);//指定输出源Path outputPath = new Path("D:\\Study\\idea\\MavenProject\\output\\five");FileSystem fs = FileSystem.get(this.getConf());if(fs.exists(outputPath)){fs.delete(outputPath,true);}TextOutputFormat.setOutputPath(job,outputPath);//提交Jobreturn job.waitForCompletion(true) ? 0 : -1;}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();int status = ToolRunner.run(conf, new WordConcat(), args);System.exit(status);}public static class WCMapper extends Mapper
}
我们可以通过定义一个类型,存储一个String属性和一个Int属性,用于实现MapReduce中的三列数据的传输
代码实现:
自定义数据类型:
package com.miao.bean;import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class WordCountBean implements Writable {private String firstKey;private int secondkey;public WordCountBean() {}public String getFirstKey() {return firstKey;}public void setFirstKey(String firstKey) {this.firstKey = firstKey;}public int getSecondkey() {return secondkey;}public void setSecondkey(int secondkey) {this.secondkey = secondkey;}@Overridepublic String toString() {return this.firstKey + "\t" + this.secondkey;}@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeUTF(this.firstKey);dataOutput.writeInt(this.secondkey);}@Overridepublic void readFields(DataInput dataInput) throws IOException {this.firstKey = dataInput.readUTF();this.secondkey = dataInput.readInt();}
}
测试:
package com.miao.wordcount;import com.miao.bean.WordCountBean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;public class WordCount extends Configured implements Tool {public int run(String[] args) throws Exception {//构建JobJob job = Job.getInstance(this.getConf(),"wordConcat");job.setJarByClass(WordCount.class);//配置Jobjob.setInputFormatClass(TextInputFormat.class);TextInputFormat.setInputPaths(job,new Path("D:\\Study\\idea\\MavenProject\\count.txt"));job.setMapperClass(WCMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);job.setReducerClass(WCReducer.class);job.setOutputKeyClass(WordCountBean.class);job.setOutputValueClass(IntWritable.class);job.setOutputFormatClass(TextOutputFormat.class);Path outputPath = new Path("D:\\Study\\idea\\MavenProject\\output\\five");FileSystem fs = FileSystem.get(this.getConf());if(fs.exists(outputPath)){fs.delete(outputPath,true);}TextOutputFormat.setOutputPath(job,outputPath);//提交Jobreturn job.waitForCompletion(true) ? 0 : -1;}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();int status = ToolRunner.run(conf, new WordCount(), args);System.exit(status);}public static class WCMapper extends Mapper
}
运行结果:
当WordCountBean这个类型数据作为key2时需要实现比较器接口
代码实现:
package com.miao.bean;import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class WCBean2 implements WritableComparable
}
测试:
package com.miao.bean;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;public class WordCount2 extends Configured implements Tool {public int run(String[] args) throws Exception {//构建JobJob job = Job.getInstance(this.getConf(),"wordConcat");job.setJarByClass(WordCount2.class);//配置Jobjob.setInputFormatClass(TextInputFormat.class);//使用程序的第一个参数作为输入TextInputFormat.setInputPaths(job,new Path("D:\\Study\\idea\\MavenProject\\count.txt"));job.setMapperClass(WCMapper.class);job.setMapOutputKeyClass(WCBean2.class);job.setMapOutputValueClass(IntWritable.class);job.setReducerClass(WCReducer.class);job.setOutputKeyClass(WCBean2.class);job.setOutputValueClass(IntWritable.class);job.setOutputFormatClass(TextOutputFormat.class);//使用程序的第二个参数作为输出路径Path outputPath = new Path("D:\\Study\\idea\\MavenProject\\output\\seven");FileSystem fs = FileSystem.get(this.getConf());if(fs.exists(outputPath)){fs.delete(outputPath,true);}TextOutputFormat.setOutputPath(job,outputPath);//提交Jobreturn job.waitForCompletion(true) ? 0 : -1;}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();int status = ToolRunner.run(conf, new WordCount2(), args);System.exit(status);}public static class WCMapper extends Mapper
}
默认排序方式为升序,如果想要实现降序排序,我们需要自定义排序器
package com.miao.sort;import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;public class WCSort extends WritableComparator {//step1:注册public WCSort(){super(Text.class,true);}//step2:实现比较@Overridepublic int compare(WritableComparable a, WritableComparable b) {//将两个比较器对象强转为要比较的Text类型Text t1 = (Text) a;Text t2 = (Text) b;//实现两个Text类型的比较,降序排序return -t1.toString().compareTo(t2.toString());}
}
最后需要在测试类中添加:
job.setSortComparatorClass(WCSort.class); //指定排序器