热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

大数据计算框架Spark之任务调度

Spark有几种资源调度设施。每个SparkApplication(SparkContext实例)独

Spark有几种资源调度设施。每个Spark Application(SparkContext实例)独立地运行在一组executor进程内。cluster manager为应用间的调度提供设施。在每个Spark应用内,如果将多个job(多个spark action)提交给不同的线程,那么他们会并行运行。

大数据计算框架Spark之任务调度

1 Application间的资源调度

集群上,每个Spark application获得独立的一组executor JVM,这组executor JVM只为那个application运行task和存储数据。如果多个用户要共享集群,有不同的策略管理资源分配,这取决于使用的cluster manager。

资源的静态分区(static partitioning)可被所有的cluster manager获得,这样每个application在他的生命周期内都可获得他能使用的最多资源。standalone、YARN、coarse-grained Mesos mode这三种模式使用的就是这种方式。

1.1控制资源使用

集群类型下,如下配置资源分配:

  1. Standalone mode: application提交到standalone mode集群,将会以FIFO的顺序运行,每个application会尽可能地使用所有可用节点,配置spark.cores.max来限制application使用节点的数目,或者设置spark.deploy.defaultCores。除了可以设置application可用内核数,还可以设置spark.executor.memory来控制内存的使用。
  2. Mesos: 为了使用静态分区(static partitioning)在Mesos集群上,spark.mesos.coarse=true,可以通过设置spark.cores.max来限制每个application的资源共享,通过设置spark.executor.memory来控制executor内存的使用。
  3. YARN: 通过设置--num-executors选项,spark YARN客户端可控制集群上有多少executor被分配(对应的配置属性为spark.executor.instances),--executor-memory(对应的配置属性spark.executor.memory)和--executor-cores(对应的配置属性spark.executor.cores)控制了分配给每个executor的资源。

应用之间无法共享内存。

1.2动态资源分配

Spark提供了依据应用的工作量动态调整资源的机制。这意味着你的application不在使用的资源会返还给集群,当需要的时候再申请分配资源,这种特性对于多应用共享集群特别有用。

这个特性默认失效,但在所有coarse-grained cluster manager上都可用,如:standalone mode, YARN mode, 和Mesos coarse-grained mode。

使用这个特性有两个要求。首先用于必须设置spark.dynamicAllocation.enabled=true,其次要设置external shuffle service在集群上的每个worker node并设置spark.shuffle.service.enabled=true。设置external shuffle service目的是executor可被移除但是不删除他们生成的shuffle文件。

设置这个变量的方式为:

  • 在standalone模式:设置spark.shuffle.service.enabled=true
  • Mesos coarse-grained模式:在所有从节点运行$SPARK_HOME/sbin/start-mesos-shuffle-service.sh设置spark.shuffle.service.enabled=true
  • YARN:详见运行spark与YARN

1.3资源分配策略

当Spark不再使用executor时就出让它,需要的时候再获取它。因为没有一个确定的方式预测将要被移除的executor是否在不久的将来会被使用,或者一个将要被添加的新executor实际上是否是空闲的,所以我们需要一系列试探来确定是移除executor(可能会移除多个)还是请求executor(可能会请求多个)。

请求策略

开启Spark application动态分配资源特性,当pending task等待被调度时,Spark application会请求额外的executor。这就意味着,当前的这些executor无法同时满足所有的task,这些task已经被提交,但是还没有执行完。

Spark轮流请求executor。当task等待的时间大于spark.dynamicAllocation.schedulerBacklogTimeout时,真正的请求(申请executor的请求)被触发,之后,如果未完成task队列存在,那么每隔spark.dynamicAllocation.sustainedSchedulerBacklogTimeout秒请求被触发一次。每一轮请求的executor数量以指数级增长。例如,第一轮请求一个executor,第二轮请求2个,第三,四轮分别请求4,8个。

按指数形式增长的动机有两个,首先,起初应用应该慎重地请求executor,以防只需几个executor就能满足需求,这和TCP慢启动类似。其次,当应用确实需要更多的executor时,应用应该能够及时地增加资源的使用。

移除策略

当executor闲置超过spark.dynamicAllocation.executorIdleTimeout秒时,就将他移除。注意,大多数情况下,executor的移除条件和请求条件是互斥的,这样如果仍然有待调度的task的情况下executor是不会被移除的。

executor优雅地退役

非动态分配资源情况下,一个Spark executor或者是由于失败而退出,或者是因相关application退出而退出。这两种情况下,不在需要与executor相关联的状态并且这些状态可以被安全地丢弃。动态分配资源的情况下,当executor被明确移除时,application仍然在运行。如果application要想使用这些由executor存储和写下的状态,就必须重新计算状态。这样就需要一种优雅的退役机制,即在executor退役前保留他的状态。

这个机制对于shuffles特别重要。shuffle期间,executor自己的map输出写入本地磁盘。当其他的executor要获取这些文件的时候,这个executor充当了文件服务器的角色。对于那些落后的executor,他们的task执行时间比同辈要长,在shuffle完成之前,动态资源分配可能移除了一个executor,这种情形下,那个executor写入本地的文件(即executor的状态)不必重新计算。

保留shuffle文件的办法就是使用外部的shuffle服务,这是在Spark 1.2中引入的。这个外部的shuffle服务指的是长时间运行的进程,它运行与集群的每个节点上,独立于application和executor。如果这个服务可用,executor就从这个服务获shuffle file,而不是彼此之间获取shuffle file。这意味着executor生成的任何shuffle文件都可能被服务包含,即使在executor生命周期之外也是如此。

executor除了写shuffle 文件到本地硬盘,还缓存数据到硬盘或内存中。但是,当executor被移除后,缓存到内存中的数据将不可用。为了解决这一问题,默认地缓存数据到内存的executor永远不会被删除。可以通过spark.dynamicAllocation.cachedExecutorIdleTimeout配置这一行为,

2 Application内的资源调度

概述

给定的application内部(SparkContext 实例),如果多个并行的job被提交到不同的线程上,那么这些job可以同时执行。这里的job指的是Spark action及Spark action触发的计算task。Spark scheduler是线程安全的,支持spark application服务于多个请求。

默认地Spark scheduler以FIFO的顺序执行job,每个job被切分为一到多个stage(例如,map和reduce),当第一个job的stage的task启动后,这个job优先获得所有可用资源,然后才是第二,三个job......。如果队头的job不必使用整个集群,之后的job就能立即启动。如果队头的job较大,那么之后的job启动延迟会比较明显。

从Spark 0.8开始,也可以通过配置实现队列间的公平调度。Job间的task资源分配采用单循环的方式。所有job都会获得大致相同的集群资源。这就意味着,当有长job存在时,提交的短job可以立即获得资源启动运行而不必等到长job执行完毕。可以设置spark.scheduler.mode为FAIR

val conf = new SparkConf().setMaster(...).setAppName(...)  
conf.set("spark.scheduler.mode", "FAIR")  
val sc = new SparkContext(conf) 

公平调度池(可能多个)

公平调度器也支持在池中对job分组并给每个池配置不同的选项。这有助于为更重要的job设置高优先级池,例如把每个用户的job分到一组,并且给这些用户相等的资源不论有多少并行task,而不是给每个job相等的资源。

不需要任何干预,新job会进入默认池,但是可以使用spark.scheduler.pool设置job池。

sc.setLocalProperty("spark.scheduler.pool", "pool1") 

设置完后,这个线程(通过调用RDD.save, count, collect)提交的所有job都会使用这个资源池的名称。设置是针对每一个线程的,这样更容易实现一个线程运行一个用户的多个job。如果想清除与一个线程相关的池,调用:sc.setLocalProperty("spark.scheduler.pool", null)

池默认行为

默认地每个池都能获得相等的资源(在默认池中每个job都能获得相等的资源),但在每个池内部,job以FIFO 的顺序运行。例如如果为每一个用户创建一个池,这就意味着每一个用户将获得相等的资源,并且每个用户的查询都会按顺序运行而不会出现后来的查询抢占了前面查询的资源

配置池属性

可以通过修改配置文件改变池属性。每个池都支持三种属性:

  • schedulingMode: 可以是FIFO或FAIR,控制池中的job排队等候或公平地分享集群资源。
  • weight: 控制资源分配的比例。默认所有池分配资源比重都是1。如果指定一个池的比重为2,那么他获得的资源是其他池的2倍。如果将一个池的比重设的很高,比如1000,那么不论他是否有活跃的job,他总是第一个开始执行task。
  • minShare: 除了设置总体的占比之外,还可以对每个池设定一个最小资源分配(例如CPU核数)。在根据比重重新分配资源之前,公平调度器总是试图满足所有活跃池的最小资源需求。minShare属性能以另一种方式确保一个池快速地获得一定数量的资源(10个核)而不必给他更高的优先级。默认地minShare=0。

调用SparkConf.set,可以通过XML文件配置池属性:

conf.set("spark.scheduler.allocation.file", "/path/to/file") 

每个池一个,在XML文件中没有配置的池使用默认配置(调度模式 FIFO, weight 1, minShare 0),例如:

  
  
FAIR  
1  
2  
 
 
  
FIFO  
2  
3  
 

以上所述就是小编给大家介绍的《大数据计算框架Spark之任务调度》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 我们 的支持!


推荐阅读
  • 图解redis的持久化存储机制RDB和AOF的原理和优缺点
    本文通过图解的方式介绍了redis的持久化存储机制RDB和AOF的原理和优缺点。RDB是将redis内存中的数据保存为快照文件,恢复速度较快但不支持拉链式快照。AOF是将操作日志保存到磁盘,实时存储数据但恢复速度较慢。文章详细分析了两种机制的优缺点,帮助读者更好地理解redis的持久化存储策略。 ... [详细]
  • 本文介绍了Java高并发程序设计中线程安全的概念与synchronized关键字的使用。通过一个计数器的例子,演示了多线程同时对变量进行累加操作时可能出现的问题。最终值会小于预期的原因是因为两个线程同时对变量进行写入时,其中一个线程的结果会覆盖另一个线程的结果。为了解决这个问题,可以使用synchronized关键字来保证线程安全。 ... [详细]
  • 在重复造轮子的情况下用ProxyServlet反向代理来减少工作量
    像不少公司内部不同团队都会自己研发自己工具产品,当各个产品逐渐成熟,到达了一定的发展瓶颈,同时每个产品都有着自己的入口,用户 ... [详细]
  • 本文介绍了RxJava在Android开发中的广泛应用以及其在事件总线(Event Bus)实现中的使用方法。RxJava是一种基于观察者模式的异步java库,可以提高开发效率、降低维护成本。通过RxJava,开发者可以实现事件的异步处理和链式操作。对于已经具备RxJava基础的开发者来说,本文将详细介绍如何利用RxJava实现事件总线,并提供了使用建议。 ... [详细]
  • 上图是InnoDB存储引擎的结构。1、缓冲池InnoDB存储引擎是基于磁盘存储的,并将其中的记录按照页的方式进行管理。因此可以看作是基于磁盘的数据库系统。在数据库系统中,由于CPU速度 ... [详细]
  • 深入解析Linux下的I/O多路转接epoll技术
    本文深入解析了Linux下的I/O多路转接epoll技术,介绍了select和poll函数的问题,以及epoll函数的设计和优点。同时讲解了epoll函数的使用方法,包括epoll_create和epoll_ctl两个系统调用。 ... [详细]
  • Java序列化对象传给PHP的方法及原理解析
    本文介绍了Java序列化对象传给PHP的方法及原理,包括Java对象传递的方式、序列化的方式、PHP中的序列化用法介绍、Java是否能反序列化PHP的数据、Java序列化的原理以及解决Java序列化中的问题。同时还解释了序列化的概念和作用,以及代码执行序列化所需要的权限。最后指出,序列化会将对象实例的所有字段都进行序列化,使得数据能够被表示为实例的序列化数据,但只有能够解释该格式的代码才能够确定数据的内容。 ... [详细]
  • Android中高级面试必知必会,积累总结
    本文介绍了Android中高级面试的必知必会内容,并总结了相关经验。文章指出,如今的Android市场对开发人员的要求更高,需要更专业的人才。同时,文章还给出了针对Android岗位的职责和要求,并提供了简历突出的建议。 ... [详细]
  • Centos7.6安装Gitlab教程及注意事项
    本文介绍了在Centos7.6系统下安装Gitlab的详细教程,并提供了一些注意事项。教程包括查看系统版本、安装必要的软件包、配置防火墙等步骤。同时,还强调了使用阿里云服务器时的特殊配置需求,以及建议至少4GB的可用RAM来运行GitLab。 ... [详细]
  • 阿,里,云,物,联网,net,core,客户端,czgl,aliiotclient, ... [详细]
  • t-io 2.0.0发布-法网天眼第一版的回顾和更新说明
    本文回顾了t-io 1.x版本的工程结构和性能数据,并介绍了t-io在码云上的成绩和用户反馈。同时,还提到了@openSeLi同学发布的t-io 30W长连接并发压力测试报告。最后,详细介绍了t-io 2.0.0版本的更新内容,包括更简洁的使用方式和内置的httpsession功能。 ... [详细]
  • 本文介绍了Web学习历程记录中关于Tomcat的基本概念和配置。首先解释了Web静态Web资源和动态Web资源的概念,以及C/S架构和B/S架构的区别。然后介绍了常见的Web服务器,包括Weblogic、WebSphere和Tomcat。接着详细讲解了Tomcat的虚拟主机、web应用和虚拟路径映射的概念和配置过程。最后简要介绍了http协议的作用。本文内容详实,适合初学者了解Tomcat的基础知识。 ... [详细]
  • 2021最新总结网易/腾讯/CVTE/字节面经分享(附答案解析)
    本文分享作者在2021年面试网易、腾讯、CVTE和字节等大型互联网企业的经历和问题,包括稳定性设计、数据库优化、分布式锁的设计等内容。同时提供了大厂最新面试真题笔记,并附带答案解析。 ... [详细]
  • JVM:33 如何查看JVM的Full GC日志
    1.示例代码packagecom.webcode;publicclassDemo4{publicstaticvoidmain(String[]args){byte[]arr ... [详细]
  • 初识java关于JDK、JRE、JVM 了解一下 ... [详细]
author-avatar
fvcvb_974
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有