热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Spark学习笔记(26)在DStream的Action操作之外也可能产生Job操作

本期内容:1.SparkStreaming产生Job的机制2.SparkStreaming的其它产生Job的方式
本期内容:
1. Spark Streaming产生Job的机制
2. Spark Streaming的其它产生Job的方式 

1. Spark Streaming产生Job的机制

Scala程序中,函数可以作为参数传递,因为函数也是对象。有函数对象不意味着函数马上就运行。Spark Streaming中,常利用线程的run来调用函数,从而导致函数的最终运行。
Spark Streaming中,Job对象包含函数成员。

NetworkWordCount程序中,DStream.print导致了Job的产生。
DStream.print:

  def print(num: Int): Unit = ssc.withScope {
    def foreachFunc: (RDD[T], Time) => Unit = {
      (rdd: RDD[T], time: Time) => {
        val firstNum =  rdd.take (num + 1)
        // scalastyle:off println
        println("-------------------------------------------")
        println("Time: " + time)
        println("-------------------------------------------")
        firstNum.take(num).foreach(println)
        if (firstNum.length > num) println("...")
        println()
        // scalastyle:on println
      }
    }
     foreachRDD (context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
  }

Spark Streaming应用程序中,除了print,saveAsObjectFiles、saveAsTextFiles 等也能调用foreachRDD,生成ForEachDStream,才能在后面产生Job。
DStream.foreachRDD:

 private def foreachRDD(
                          foreachFunc: (RDD[T], Time) => Unit,
                          displayInnerRDDOps: Boolean): Unit = {
     new ForEachDStream (this,
      context.sparkContext.clean( foreachFunc , false), displayInnerRDDOps). register ()
  }

通过register注册,新生成的ForEachDStream加入到DStreamGraph的成员outputDStreams中。
如果没有print、count、saveAsObjectFiles、saveAsTextFiles等这样的代码,DStreamGraph中outputDStreams就为空,那么DStreamGraph.generateJobs就产生结果呢?
DStreamGraph.generateJobs:

  def generateJobs(time: Time): Seq[Job] = {
    logDebug("Generating jobs for time " + time)
    val jobs = this.synchronized {
       outputStreams.flatMap  { outputStream =>
        val jobOption = outputStream.generateJob(time)
        jobOption.foreach(_.setCallSite(outputStream.creationSite))
        jobOption
      }
    }
    logDebug("Generated " + jobs.length + " jobs for time " + time)
    jobs
  }

DStreamGraph.generateJobs就会产生空的Job序列。

通过对DStream(或其子类)定制自己的方法,可以使foreachFunc的定义中不含有RDD.take这样的语句。
这样的话,foreachRDD中的foreachFunc不一定会产生Job。如果其中的函数foreachFunc里面没有Action操作,就不会触发Job。

2. Spark Streaming的其它产生Job的方式 

一定要action才会有Job吗?不是。ForEachDStream.transform就可能产生Job。ForEachDStream.transform有两个定义,是调用关系。

ForEachDStream.transform:

  /**
   * Return a new DStream in which each RDD is generated by applying a function
   * on each RDD of 'this' DStream.
   */
  def transform[U: ClassTag]( transformFunc : RDD[T] => RDD[U]): DStream[U] = ssc.withScope {
    // because the DStream is reachable from the outer object here, and because
    // DStreams can't be serialized with closures, we can't proactively check
    // it for serializability and so we pass the optional false to SparkContext.clean
    val cleanedF = context.sparkContext.clean( transformFunc , false)
    transform((r: RDD[T], t: Time) => cleanedF(r))
  }

  /**
   * Return a new DStream in which each RDD is generated by applying a function
   * on each RDD of 'this' DStream.
   */
  def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = ssc.withScope {
    // because the DStream is reachable from the outer object here, and because
    // DStreams can't be serialized with closures, we can't proactively check
    // it for serializability and so we pass the optional false to SparkContext.clean
    val cleanedF = context.sparkContext.clean(transformFunc, false)
    val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
      assert(rdds.length == 1)
      cleanedF(rdds.head.asInstanceOf[RDD[T]], time)
    }
     new TransformedDStream [U](Seq(this), realTransformFunc)
  }

其中的函数类型的参数transformFunc是输入RDD并产生一个新的RDD。最终实际会生成TransformedDStream对象。

在第8课中提到过,一般的DStream子类的Compute方法,仅仅是调用父类DStream的getOrCompute,而TransformedDStream的compte方法不是这样。
TransformedDStream.compute:

  override def compute(validTime: Time): Option[RDD[U]] = {
    val parentRDDs = parents.map { parent => parent.getOrCompute(validTime).getOrElse(
      // Guard out against parent DStream that return None instead of Some(rdd) to avoid NPE
      throw new SparkException(s"Couldn't generate RDD from parent at time $validTime"))
    }
    val transformedRDD =  transformFunc (parentRDDs, validTime)
    if (transformedRDD == null) {
      throw new SparkException("Transform function must not return null. " +
        "Return SparkContext.emptyRDD() instead to represent no element " +
        "as the result of transformation.")
    }
    Some(transformedRDD)
  }

和别的DStream子类不同,TransformedDStream的compute方法还调用了transformFunc,函数transformFunc是被马上执行的。这就不会等到JobScheduler调度后再执行。
transformFunc 其中如果有count、print等action操作,就也会触发这个Job的执行。这其实可以理解为是个漏洞。
此前说的各种操作是lazy级别,不能马上拿到结果。而由于transformFunc不接受Spark的统一调度,这样可以根据计算结果做出判断再后续操作。不会因为lazy级别而不能必须做后续的transform。

推荐阅读
  • 向QTextEdit拖放文件的方法及实现步骤
    本文介绍了在使用QTextEdit时如何实现拖放文件的功能,包括相关的方法和实现步骤。通过重写dragEnterEvent和dropEvent函数,并结合QMimeData和QUrl等类,可以轻松实现向QTextEdit拖放文件的功能。详细的代码实现和说明可以参考本文提供的示例代码。 ... [详细]
  • Java序列化对象传给PHP的方法及原理解析
    本文介绍了Java序列化对象传给PHP的方法及原理,包括Java对象传递的方式、序列化的方式、PHP中的序列化用法介绍、Java是否能反序列化PHP的数据、Java序列化的原理以及解决Java序列化中的问题。同时还解释了序列化的概念和作用,以及代码执行序列化所需要的权限。最后指出,序列化会将对象实例的所有字段都进行序列化,使得数据能够被表示为实例的序列化数据,但只有能够解释该格式的代码才能够确定数据的内容。 ... [详细]
  • 本文介绍了机器学习手册中关于日期和时区操作的重要性以及其在实际应用中的作用。文章以一个故事为背景,描述了学童们面对老先生的教导时的反应,以及上官如在这个过程中的表现。同时,文章也提到了顾慎为对上官如的恨意以及他们之间的矛盾源于早年的结局。最后,文章强调了日期和时区操作在机器学习中的重要性,并指出了其在实际应用中的作用和意义。 ... [详细]
  • 本博文基于《Amalgamationofproteinsequence,structureandtextualinformationforimprovingprote ... [详细]
  • Iamtryingtomakeaclassthatwillreadatextfileofnamesintoanarray,thenreturnthatarra ... [详细]
  • 在Android开发中,使用Picasso库可以实现对网络图片的等比例缩放。本文介绍了使用Picasso库进行图片缩放的方法,并提供了具体的代码实现。通过获取图片的宽高,计算目标宽度和高度,并创建新图实现等比例缩放。 ... [详细]
  • 本文分享了一个关于在C#中使用异步代码的问题,作者在控制台中运行时代码正常工作,但在Windows窗体中却无法正常工作。作者尝试搜索局域网上的主机,但在窗体中计数器没有减少。文章提供了相关的代码和解决思路。 ... [详细]
  • CSS3选择器的使用方法详解,提高Web开发效率和精准度
    本文详细介绍了CSS3新增的选择器方法,包括属性选择器的使用。通过CSS3选择器,可以提高Web开发的效率和精准度,使得查找元素更加方便和快捷。同时,本文还对属性选择器的各种用法进行了详细解释,并给出了相应的代码示例。通过学习本文,读者可以更好地掌握CSS3选择器的使用方法,提升自己的Web开发能力。 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • XML介绍与使用的概述及标签规则
    本文介绍了XML的基本概念和用途,包括XML的可扩展性和标签的自定义特性。同时还详细解释了XML标签的规则,包括标签的尖括号和合法标识符的组成,标签必须成对出现的原则以及特殊标签的使用方法。通过本文的阅读,读者可以对XML的基本知识有一个全面的了解。 ... [详细]
  • 成功安装Sabayon Linux在thinkpad X60上的经验分享
    本文分享了作者在国庆期间在thinkpad X60上成功安装Sabayon Linux的经验。通过修改CHOST和执行emerge命令,作者顺利完成了安装过程。Sabayon Linux是一个基于Gentoo Linux的发行版,可以将电脑快速转变为一个功能强大的系统。除了作为一个live DVD使用外,Sabayon Linux还可以被安装在硬盘上,方便用户使用。 ... [详细]
  • 本文讨论了如何使用GStreamer来删除H264格式视频文件中的中间部分,而不需要进行重编码。作者提出了使用gst_element_seek(...)函数来实现这个目标的思路,并提到遇到了一个解决不了的BUG。文章还列举了8个解决方案,希望能够得到更好的思路。 ... [详细]
  • 本文介绍了如何使用n3-charts绘制以日期为x轴的数据,并提供了相应的代码示例。通过设置x轴的类型为日期,可以实现对日期数据的正确显示和处理。同时,还介绍了如何设置y轴的类型和其他相关参数。通过本文的学习,读者可以掌握使用n3-charts绘制日期数据的方法。 ... [详细]
  • 本文介绍了解决java开源项目apache commons email简单使用报错的方法,包括使用正确的JAR包和正确的代码配置,以及相关参数的设置。详细介绍了如何使用apache commons email发送邮件。 ... [详细]
  • 本文整理了常用的CSS属性及用法,包括背景属性、边框属性、尺寸属性、可伸缩框属性、字体属性和文本属性等,方便开发者查阅和使用。 ... [详细]
author-avatar
扈菁雄佳纯
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有