在http://spark.apache.org/docs/0.8.0/cluster-overview.html上阅读了一些文档后,我得到了一些我想澄清的问题.
以Spark为例:
JavaSparkContext spark = new JavaSparkContext( new SparkConf().setJars("...").setSparkHome....); JavaRDDfile = spark.textFile("hdfs://..."); // step1 JavaRDD words = file.flatMap(new FlatMapFunction () { public Iterable call(String s) { return Arrays.asList(s.split(" ")); } }); // step2 JavaPairRDD pairs = words.map(new PairFunction () { public Tuple2 call(String s) { return new Tuple2 (s, 1); } }); // step3 JavaPairRDD counts = pairs.reduceByKey(new Function2 () { public Integer call(Integer a, Integer b) { return a + b; } }); counts.saveAsTextFile("hdfs://...");
因此,假设我有3个节点集群,节点1作为主节点运行,并且上面的驱动程序已被正确判断(比如application-test.jar).所以现在我在主节点上运行此代码,我相信在SparkContext
创建之后,application-test.jar文件将被复制到工作节点(并且每个工作人员将为该应用程序创建一个目录).
那么现在我的问题是:示例任务中的step1,step2和step3是否会被发送给工作人员?如果是,那么工人如何执行呢?喜欢java -cp "application-test.jar" step1
等等?
为了清楚地了解如何创建和调度任务,我们必须了解执行模型在Spark中的工作原理.简而言之,spark中的应用程序分三步执行:
创建RDD图
根据RDD图创建执行计划.在此步骤中创建阶段
根据计划生成任务,并在工作人员之间安排任务
在你的单词计数示例中,RDD图非常简单,如下所示:
file - > lines - > words - > per-word count - > global word count - > output
基于该图,创建了两个阶段.阶段创建规则基于管理尽可能多的窄变换的想法.在您的示例中,窄变换按字数统计.因此,你得到两个阶段
file - > lines - > words - > per-word count
全局字数 - >输出
一旦计算出阶段,spark就会从阶段产生任务.第一阶段将创建ShuffleMapTasks,最后一个阶段将创建ResultTasks,因为在最后阶段,包含一个动作操作以产生结果.
要生成的任务数取决于文件的分发方式.假设您在三个不同的节点中有3个不同的文件,第一个阶段将生成3个任务:每个分区一个任务.
因此,您不应直接将步骤映射到任务.任务属于某个阶段,与分区相关.
通常,为一个阶段运行的任务数正好是最终RDD的分区数,但由于RDD可以共享(因此ShuffleMapStages
)它们的数量因RDD /阶段共享而异.请参阅RDD如何在DAG下工作?
创建时SparkContext
,每个worker都会启动一个执行程序.这是一个单独的进程(JVM),它也会加载你的jar.执行程序连接回驱动程序.现在,司机可以向他们发送命令,像flatMap
,map
并reduceByKey
在你的榜样.当驱动程序退出时,执行程序关闭.
RDD有点像分裂成分区的大数组,每个执行程序都可以容纳其中的一些分区.
一个任务是序列化您从驱动程序发送到执行命令Function
的对象.执行程序反序列化命令(这是可能的,因为它已经加载了你的jar),并在分区上执行它.
(这是一个概念性的概述.我正在浏览一些细节,但我希望它有用.)
回答您的具体问题:不,每个步骤都没有启动新流程.SparkContext
构建时,每个工作程序都会启动一个新进程.