使用java Mapreduce处理JSON

 小美女如果的事 发布于 2022-12-14 10:02

我是hadoop mapreduce的新手

我有输入文本文件,其中数据已存储如下.这里只有几个元组(data.txt)

{"author":"Shari?f Qa?sim","book":"al- Rabi?? al-manshu?d"}
{"author":"Na?s?ir Nimri?","book":"Adi?b ?Abba?si?"}
{"author":"Muz?affar ?Abd al-Maji?d Kammu?nah","book":"Asma?? Alla?h al-h?usna? al-wa?ridah fi? muh?kam kita?bih"}
{"author":"H?asan Mus?t?afa? Ah?mad","book":"al- Jabhah al-sharqi?yah wa-ma?a?rikuha? fi? h?arb Ramad?a?n"}
{"author":"Rafi?qah Sali?m H?ammu?d","book":"Ta?li?m fi? al-Bah?rayn"}

这是我的java文件,我应该编写我的代码(CombineBooks.java)

package org.hwone;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.GenericOptionsParser;

//TODO import necessary components

/*
*  Modify this file to combine books from the same other into
*  single JSON object. 
*  i.e. {"author": "Tobias Wells", "books": [{"book":"A die in the country"},{"book": "Dinky died"}]}
*  Beaware that, this may work on anynumber of nodes! 
*
*/

public class CombineBooks {

  //TODO define variables and implement necessary components

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args)
                .getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: CombineBooks  ");
      System.exit(2);
    }

    //TODO implement CombineBooks

    Job job = new Job(conf, "CombineBooks");

    //TODO implement CombineBooks

    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

我的任务是在"question-2"目录中返回的"CombineBooks.java"中创建一个Hadoop程序.该程序应该执行以下操作:给定输入作者 - 书元组,map-reduce程序应该生成一个JSON对象,其中包含来自JSON数组中同一作者的所有书籍,即

{"author": "Tobias Wells", "books":[{"book":"A die in the country"},{"book": "Dinky died"}]} 

知道如何做到这一点?

1 个回答
  • 首先,您尝试使用的JSON对象不适用于您.要解决这个问题:

      到这里下载为zip:https://github.com/douglascrockford/JSON-java

      提取到子目录org/json/*中的sources文件夹

    接下来,代码的第一行生成一个包"org.json",这是不正确的,你要创建一个单独的包,例如"my.books".

    第三,在这里使用组合器是没用的.

    这是我最终得到的代码,它可以解决您的问题:

    package my.books;
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    import org.json.*;
    
    import javax.security.auth.callback.TextInputCallback;
    
    public class CombineBooks {
    
        public static class Map extends Mapper<LongWritable, Text, Text, Text>{
    
            public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
    
                String author;
                String book;
                String line = value.toString();
                String[] tuple = line.split("\\n");
                try{
                    for(int i=0;i<tuple.length; i++){
                        JSONObject obj = new JSONObject(tuple[i]);
                        author = obj.getString("author");
                        book = obj.getString("book");
                        context.write(new Text(author), new Text(book));
                    }
                }catch(JSONException e){
                    e.printStackTrace();
                }
            }
        }
    
        public static class Reduce extends Reducer<Text,Text,NullWritable,Text>{
    
            public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{
    
                try{
                    JSONObject obj = new JSONObject();
                    JSONArray ja = new JSONArray();
                    for(Text val : values){
                        JSONObject jo = new JSONObject().put("book", val.toString());
                        ja.put(jo);
                    }
                    obj.put("books", ja);
                    obj.put("author", key.toString());
                    context.write(NullWritable.get(), new Text(obj.toString()));
                }catch(JSONException e){
                    e.printStackTrace();
                }
            }
        }
    
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            if (args.length != 2) {
                System.err.println("Usage: CombineBooks <in> <out>");
                System.exit(2);
            }
    
            Job job = new Job(conf, "CombineBooks");
            job.setJarByClass(CombineBooks.class);
            job.setMapperClass(Map.class);
            job.setReducerClass(Reduce.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            job.setOutputKeyClass(NullWritable.class);
            job.setOutputValueClass(Text.class);
            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);
    
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }
    

    这是我项目的文件夹结构:

    src
    src/my
    src/my/books
    src/my/books/CombineBooks.java
    src/org
    src/org/json
    src/org/json/zip
    src/org/json/zip/BitReader.java
    ...
    src/org/json/zip/None.java
    src/org/json/JSONStringer.java
    src/org/json/JSONML.java
    ...
    src/org/json/JSONException.java
    

    这是输入

    [localhost:CombineBooks]$ hdfs dfs -cat /example.txt
    {"author":"author1", "book":"book1"}
    {"author":"author1", "book":"book2"}
    {"author":"author1", "book":"book3"}
    {"author":"author2", "book":"book4"}
    {"author":"author2", "book":"book5"}
    {"author":"author3", "book":"book6"}
    

    要运行的命令:

    hadoop jar ./bookparse.jar my.books.CombineBooks /example.txt /test_output
    

    这是输出:

    [pivhdsne:CombineBooks]$ hdfs dfs -cat /test_output/part-r-00000
    {"books":[{"book":"book3"},{"book":"book2"},{"book":"book1"}],"author":"author1"}
    {"books":[{"book":"book5"},{"book":"book4"}],"author":"author2"}
    {"books":[{"book":"book6"}],"author":"author3"}
    

    您可以使用三个选项将org.json.*类放入集群中:

      org.json.*类打包到jar文件中(可以使用GUI IDE轻松完成).这是我在答案中使用的选项

      将包含org.json.*每个集群节点上的类的jar文件放入其中一个CLASSPATH目录中(请参阅yarn.application.classpath)

      将包含的jar文件org.json.*放入HDFS(hdfs dfs -put <org.json jar> <hdfs path>)并使用job.addFileToClassPath对此jar文件的调用可用于在集群上执行作业的所有任务.在我的回答你应该添加job.addFileToClassPath(new Path("<jar_file_on_hdfs_location>"));main

    2022-12-14 10:05 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有