我目前正在研究Apache Spark.我已经实现了一个Custom InputFormat
for Apache Hadoop,它通过TCP套接字读取键值记录.我想将此代码移植到Apache Spark并将其与hadoopRDD()
函数一起使用.我的Apache Spark代码如下:
public final class SparkParallelDataLoad { public static void main(String[] args) { int iterations = 100; String dbNodesLocations = ""; if(args.length < 3) { System.err.printf("Usage ParallelLoad\n"); System.exit(1); } JobConf jobConf = new JobConf(); jobConf.set(CustomConf.confCoordinatorIP, args[0]); jobConf.set(CustomConf.confCoordinatorPort, args[1]); jobConf.set(CustomConf.confDBNodesLocations, dbNodesLocations); int numOfSplits = Integer.parseInt(args[2]); CustomInputFormat.setCoordinatorIp(args[0]); CustomInputFormat.setCoordinatorPort(Integer.parseInt(args[1])); SparkConf sparkConf = new SparkConf().setAppName("SparkParallelDataLoad"); JavaSparkContext sc = new JavaSparkContext(sparkConf); JavaPairRDD records = sc.hadoopRDD(jobConf, CustomInputFormat.class, LongWritable.class, Text.class, numOfSplits); JavaRDD points = records.map(new Function , LabeledPoint>() { private final Log log = LogFactory.getLog(Function.class); /** * */ private static final long serialVersionUID = -1771348263117622186L; private final Pattern SPACE = Pattern.compile(" "); @Override public LabeledPoint call(Tuple2 tuple) throws Exception { if(tuple == null || tuple._1() == null || tuple._2() == null) return null; double y = Double.parseDouble(Long.toString(tuple._1.get())); String[] tok = SPACE.split(tuple._2.toString()); double[] x = new double[tok.length]; for (int i = 0; i < tok.length; ++i) { if(tok[i].isEmpty() == false) x[i] = Double.parseDouble(tok[i]); } return new LabeledPoint(y, Vectors.dense(x)); } }); System.out.println("Number of records: " + points.count()); LinearRegressionModel model = LinearRegressionWithSGD.train(points.rdd(), iterations); System.out.println("Model weights: " + model.weights()); sc.stop(); } }
在我的项目中,我还必须决定哪个Spark Worker将连接到哪个数据源(类似于"matchmake"进程与1:1的关系).因此,我创建了一个InputSplit
等于数据源数量的s,以便我的数据并行发送到SparkContext
.我的问题如下:
方法的结果是否InpuSplit.getLength()
会影响RecordReader
返回的记录数?详细地说,我在我的测试运行中看到,在仅返回一条记录后,Job结束,只是因为我从CustomInputSplit.getLength()
函数返回的值为0 .
在Apache Spark上下文中,工作者的数量是否等于InputSplits
我InputFormat
至少为执行records.map()
函数调用而生成的数量?
上面问题2的答案对我的项目非常重要.
谢谢,尼克