用于HadoopRDD的自定义InputFormat的Apache Spark

 咖喱2502894907 发布于 2023-01-03 09:20

我目前正在研究Apache Spark.我已经实现了一个Custom InputFormatfor Apache Hadoop,它通过TCP套接字读取键值记录.我想将此代码移植到Apache Spark并将其与hadoopRDD()函数一起使用.我的Apache Spark代码如下:

public final class SparkParallelDataLoad {

    public static void main(String[] args) {
        int iterations = 100;
        String dbNodesLocations = "";
        if(args.length < 3) {
            System.err.printf("Usage ParallelLoad   \n");
            System.exit(1);
        }
        JobConf jobConf = new JobConf();
        jobConf.set(CustomConf.confCoordinatorIP, args[0]);
        jobConf.set(CustomConf.confCoordinatorPort, args[1]);
        jobConf.set(CustomConf.confDBNodesLocations, dbNodesLocations);

        int numOfSplits = Integer.parseInt(args[2]);

        CustomInputFormat.setCoordinatorIp(args[0]);
        CustomInputFormat.setCoordinatorPort(Integer.parseInt(args[1]));

        SparkConf sparkConf = new SparkConf().setAppName("SparkParallelDataLoad");

        JavaSparkContext sc = new JavaSparkContext(sparkConf);

        JavaPairRDD records = sc.hadoopRDD(jobConf, 
                CustomInputFormat.class, LongWritable.class, Text.class, 
                numOfSplits);

        JavaRDD points = records.map(new Function, LabeledPoint>() {

            private final Log log = LogFactory.getLog(Function.class);
            /**
             * 
             */
            private static final long serialVersionUID = -1771348263117622186L;

            private final Pattern SPACE = Pattern.compile(" ");
            @Override
            public LabeledPoint call(Tuple2 tuple)
                    throws Exception {
                if(tuple == null || tuple._1() == null || tuple._2() == null)
                    return null;
                double y = Double.parseDouble(Long.toString(tuple._1.get()));
                String[] tok = SPACE.split(tuple._2.toString());
                double[] x = new double[tok.length];
                for (int i = 0; i < tok.length; ++i) {
                    if(tok[i].isEmpty() == false)
                        x[i] = Double.parseDouble(tok[i]);
                }
                return new LabeledPoint(y, Vectors.dense(x));
            }

        });

        System.out.println("Number of records: " + points.count());
        LinearRegressionModel model = LinearRegressionWithSGD.train(points.rdd(), iterations);
        System.out.println("Model weights: " + model.weights());

        sc.stop();
    }
}

在我的项目中,我还必须决定哪个Spark Worker将连接到哪个数据源(类似于"matchmake"进程与1:1的关系).因此,我创建了一个InputSplit等于数据源数量的s,以便我的数据并行发送到SparkContext.我的问题如下:

    方法的结果是否InpuSplit.getLength()会影响RecordReader返回的记录数?详细地说,我在我的测试运行中看到,在仅返回一条记录后,Job结束,只是因为我从CustomInputSplit.getLength()函数返回的值为0 .

    在Apache Spark上下文中,工作者的数量是否等于InputSplitsInputFormat至少为执行records.map()函数调用而生成的数量?

上面问题2的答案对我的项目非常重要.

谢谢,尼克

撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有