作者:李林1108_965 | 来源:互联网 | 2023-06-06 20:05
我使用hadoop 3.1.2迈出了第一步,我拥有像这样的数据集:
id station; city; temperature
1; New York; 14
3; New York; 20
2; Bristol; 29
8; Rome; -10
30; Bristol; 2
10; Rome; 0
1; New York; 10
8; Rome; 10
通过Hadoop,使用mapreduce,我应该得到:ID站的分组以及平均温度。
但是,我对所有城市都不感兴趣,例如,仅对站号为1、8的城市不感兴趣。
场景:计算城市的总/平均温度
地图(键,值)->键:ID站,名称为city,值:温度。
减少:按ID站+名称城市分组,并获取每个站的平均温度。
得到类似的东西
City - Station; Average Temperature
New York - 1; 7
Rome - 8; 0
代码如下:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class SingleMapperReducer
{
public static void main(String[] args) throws Exception {
Configuration cOnf= new Configuration();
Job job = new Job(conf,"City Temperature Job");
job.setMapperClass(TemperatureMapper.class);
job.setReducerClass(TemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputvalueClass(IntWritable.class);
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
system.exit(job.waitForCompletion(true) ? 0 : 1);
}
/*
Id,City,Temperature
1; New York; 14
3; New York; 20
2; Bristol; 29
1; Rome; 20
2; Rome; -10
2; Bristol; 2
3; New YOrk; 10
1; Rome; 10
*/
private static class TemperatureMapper
extends Mapper {
public void map(Object key,Text value,Context context)
throws IOException,InterruptedException {
String txt = value.toString();
String[] tokens = txt.split(";");
String id = tokens[0].trim();
String temperature = tokens[2].trim();
if(id.equals("1"))
{
id="New York - 1";
}
else if(cat.equals("8"))
{
id="Rome - 8";
}
if (temperature.compareTo("Temperature") != 0)
context.write(new Text(id),new IntWritable(Integer.parseInt(temperature)));
}
}
private static class TemperatureReducer //on id city
extends Reducer {
private IntWritable result = new IntWritable();
public void reduce(Text key,Iterable values,Context context) throws IOException,InterruptedException {
int sum = 0;
int n = 0;
for (IntWritable val : values) {
sum += val.get();
n +=1;
}
result.set(sum/n);
context.write(key,result);
}
}
}
您认为它会起作用吗?
我在ID站上开发过滤器的代码部分,是否正确?
还有其他方法可以应用此过滤器吗?
感谢那些想帮助我的人!
更新26/11
@ cricket_007 @ amey-shirke
谢谢!我试图执行代码,进行建议的更改:
if (id.equals ("1") || id.equals ("8")) {
id = id + "-" + tokens [1];
context.write (new Text (id),new IntWritable (Integer.parseInt (temperature)));
}
更多这些
Configuration cOnf= new Configuration ();
Job job = Job.getInstance (conf,"word count");
job.setJarByClass(SingleMapperReducer.class);
-
系统执行了该过程,但是给了我一个空的输出文件。
p.s我在一个简单的“ wordcount”情况下尝试了hadoop的mapreduce框架,它可以工作。
可能会发生什么?
需要注意的几点:
- 如果仅关注ID 1和8,只需context.write()即可。符合
(temperature.compareTo("Temperature") != 0)
- 这段代码没有帮助,不需要
BASE64