热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Spark介绍与安装(包含IDEA编写spark程序)

文章目录一、Spark概述1.1、Spark官方介绍1.2.Spark特点1.3、激动人心的Spark发展史1.4、Spark为什么会流行1.4.1.原因1:优秀的数


文章目录

    • 一、Spark概述
        • 1.1、Spark官方介绍
        • 1.2. Spark特点
        • 1.3、激动人心的Spark发展史
        • 1.4、Spark为什么会流行
            • 1.4.1. 原因1:优秀的数据模型和计算抽象
            • 1.4.2. 原因2:完善的生态圈
            • 1.4.3. 扩展阅读:Spark VS Hadoop
        • 1.5、Spark运行模式
    • 第二章 Spark环境搭建
        • 2.1. local本地模式-Spark初体验
            • 2.1.1. 安装
            • 2.1.2. 启动spark-shell
            • 2.1.3. 初体验-读取本地文件
            • 2.1.4. 初体验-读取HDFS文件
        • 2.2. standalone集群模式
            • 2.2.1. 集群角色介绍
            • 2.2.2. 集群规划
            • 2.2.3. 修改配置并分发
            • 2.2.4. 启动和停止
            • 2.2.5. 查看web界面
            • 2.2.6. 测试
        • 2.3. standalone-HA高可用模式
            • 2.3.1. 原理
            • 2.3.2. 配置HA
            • 2.3.3. 启动zk集群
            • 2.3.4. 启动Spark集群
            • 2.3.5. 测试HA
        • 2.4. on yarn集群模式
            • 2.4.2. cluster模式
            • 2.4.3. client模式[了解]
            • 2.4.4. 两种模式的区别
        • 2.5. 两种模式的区别
            • 2.5.1. spark-shell
            • 2.5.2. spark-submit
            • 2.5.3. 参数总结
    • 第三章 IDEA编写Spark程序
        • 3.1. pom.xml
        • 3.2. 本地运行(统计单词数量)
        • 3.3. 集群运行(统计单词数量)


一、Spark概述


1.1、Spark官方介绍

Spark是什么
Apache Spark是用于大规模数据处理的统一分析引擎
Spark基于内存计算,提高了在大数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性,允许用户将Spark部署在大量硬件之上,形成集群。

●官网
http://spark.apache.org
http://spark.apachecn.org

在这里插入图片描述


1.2. Spark特点


与Hadoop的MapReduce相比,Spark基于内存的运算要快100倍以上,基于硬盘的运算也要快10倍以上。Spark实现了高效的DAG执行引擎,可以通过基于内存来高效处理数据流。

易用
Spark支持Java、Python、R和Scala的API,还支持超过80种高级算法,使用户可以快速构建不同的应用。而且Spark支持交互式的Python和Scala的shell,可以非常方便地在这些shell中使用Spark集群来验证解决问题的方法。

通用
Spark提供了统一的解决方案。Spark可以用于批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)。这些不同类型的处理都可以在同一个应用中无缝使用。Spark统一的解决方案非常具有吸引力,毕竟任何公司都想用统一的平台去处理遇到的问题,减少开发和维护的人力成本和部署平台的物力成本。

兼容性
Spark可以非常方便地与其他的开源产品进行融合。比如,Spark可以使用Hadoop的YARN和Apache Mesos作为它的资源管理和调度器,并且可以处理所有Hadoop支持的数据,包括HDFS、HBase和Cassandra等。这对于已经部署Hadoop集群的用户特别重要,因为不需要做任何数据迁移就可以使用Spark的强大处理能力。Spark也可以不依赖于第三方的资源管理和调度器,它实现了Standalone作为其内置的资源管理和调度框架,这样进一步降低了Spark的使用门槛,使得所有人都可以非常容易地部署和使用Spark。此外,Spark还提供了在EC2上部署Standalone的Spark集群的工具。


1.3、激动人心的Spark发展史

大数据、人工智能( Artificial Intelligence )像当年的石油、电力一样, 正以前所未有的广度和深度影响所有的行业, 现在及未来公司的核心壁垒是数据, 核心竞争力来自基于大数据的人工智能的竞争。
Spark是当今大数据领域最活跃、最热门、最高效的大数据通用计算平台,
2009年诞生于美国加州大学伯克利分校AMP 实验室,
2010年通过BSD许可协议开源发布,
2013年捐赠给Apache软件基金会并切换开源协议到切换许可协议至 Apache2.0,
2014年2月,Spark 成为 Apache 的顶级项目
2014年11月, Spark的母公司Databricks团队使用Spark刷新数据排序世界记录

Spark 成功构建起了一体化、多元化的大数据处理体系。在任何规模的数据计算中, Spark 在性能和扩展性上都更具优势。
(1) Hadoop 之父Doug Cutting 指出:Use of MapReduce engine for Big Data projects will decline, replaced by Apache Spark (大数据项目的MapReduce 引擎的使用将下降,由Apache Spark 取代)
(2)Hadoop 商业发行版本的市场领导者Cloudera 、HortonWorks 、MapR 纷纷转投Spark,并把Spark 作为大数据解决方案的首选和核心计算引擎。

2014 年的如此Benchmark 测试中, Spark 秒杀Hadoop ,在使用十分之一计算资源的情况下,相同数据的排序上, Spark 比Map Reduce 快3 倍! 在没有官方PB 排序对比的情况下,首次将S park 推到了IPB 数据(十万亿条记录) 的排序,在使用190 个节点的情况下,工作负载在4 小时内完成, 同样远超雅虎之前使用3800 台主机耗时16 个小时的记录。

2015年6月, Spark 最大的集群来自腾讯–8000 个节点, 单个Job 最大分别是阿里巴巴和Databricks–1PB ,震撼人心!同时,Spark的Contributor 比2014 年涨了3 倍,达到730 人:总代码行数也比2014 年涨了2 倍多,达到40 万行。

IBM 于2015 年6 月承诺大力推进Apache Spark 项目, 并称该项目为:以数据为主导的,未来十年最重要的新的开源项目。这-承诺的核心是将Spark 嵌入IBM 业内领先的分析和商务平台,并将Spark 作为一项服务,在IBMB平台上提供给客户。IBM 还将投入超过3500 名研究和开发人员在全球10余个实验室开展与Spark 相关的项目,并将为Spark 开源生态系统无偿提供突破性的机器学习技术–IBM SystemML。同时,IBM 还将培养超过100 万名Spark 数据科学家和数据工程师。

2016 年,在有“计算界奥运会”之称的国际著名Sort Benchmark全球数据排序大赛中,由南京大学计算机科学与技术系PASA 大数据实验室、阿里巴巴和Databricks 公司组成的参赛因队NADSort,以144美元的成本完成lOOTB 标准数据集的排序处理,创下了每TB 数据排序1.44美元成本的最新世界纪录,比2014 年夺得冠军的加州大学圣地亚哥分校TritonSort团队每TB 数据4.51美元的成本降低了近70%,而这次比赛依旧使用Apache Spark 大数据计算平台,在大规模并行排序算法以及Spark 系统底层进行了大量的优化,以尽可能提高排序计算性能并降低存储资源开销,确保最终赢得比赛。

在FullStack 理想的指引下,Spark 中的Spark SQL 、SparkStreaming 、MLLib 、GraphX 、R 五大子框架和库之间可以无缝地共享数据和操作, 这不仅打造了Spark 在当今大数据计算领域其他计算框架都无可匹敌的优势, 而且使得Spark 正在加速成为大数据处理中心首选通用计算平台


1.4、Spark为什么会流行


1.4.1. 原因1:优秀的数据模型和计算抽象

Spark 产生之前,已经有MapReduce这类非常成熟的计算系统存在了,并提供了高层次的API(map/reduce),把计算运行在集群中并提供容错能力,从而实现分布式计算。
虽然MapReduce提供了对数据访问和计算的抽象,但是对于数据的复用就是简单的将中间数据写到一个稳定的文件系统中(例如HDFS),所以会产生数据的复制备份,磁盘的I/O以及数据的序列化,所以在遇到需要在多个计算之间复用中间结果的操作时效率就会非常的低。而这类操作是非常常见的,例如迭代式计算,交互式数据挖掘,图计算等。
认识到这个问题后,学术界的 AMPLab 提出了一个新的模型,叫做 RDD。RDD 是一个可以容错且并行的数据结构(其实可以理解成分布式的集合,操作起来和操作本地集合一样简单),它可以让用户显式的将中间结果数据集保存在内存中,并且通过控制数据集的分区来达到数据存放处理最优化.同时 RDD也提供了丰富的 API (map、reduce、foreach、redeceByKey…)来操作数据集。后来 RDD被 AMPLab 在一个叫做 Spark 的框架中提供并开源.
简而言之,Spark 借鉴了 MapReduce 思想发展而来,保留了其分布式并行计算的优点并改进了其明显的缺陷。让中间数据存储在内存中提高了运行速度、并提供丰富的操作数据的API提高了开发速度。
在这里插入图片描述
在这里插入图片描述


1.4.2. 原因2:完善的生态圈

在这里插入图片描述
目前,Spark已经发展成为一个包含多个子项目的集合,其中包含SparkSQL、Spark Streaming、GraphX、MLlib等子项目

Spark Core:实现了 Spark 的基本功能,包含RDD、任务调度、内存管理、错误恢复、与存储系统交互等模块。

Spark SQL:用来操作结构化数据的程序包。通过 Spark SQL,我们可以使用 SQL操作数据。

Spark Streaming:Spark提供的对实时数据进行流式计算的组件。提供了用来操作数据流的API.

Spark MLlib:提供了常见的机器学习(ML)功能的程序库。

GraphX(图计算):Spark中用于图计算的API,性能良好,拥有丰富的功能和运算符,能在海量数据上自如地运行复杂的图算法。

集群管理器:Spark 设计为可以高效地在一个计算节点到数千个计算节点之间伸缩计算。


1.4.3. 扩展阅读:Spark VS Hadoop

hadoopspark
类型基础平台,包含计算,存储,调度分布式计算工具
场景大规模数据集上的批处理迭代计算,交互式计算,流计算
价格对机器要求较低,便宜对内存有要求,相对于较贵
编程范式Map+Reduce,API较为底层,算法适应性差RDD组成的DAG有向无环图,API较为顶层,方便使用
数据存储结构MapReduce中间计算结果存在HDFS 磁盘上,延迟大RDD中间结果存在内存中,延迟小
运行方式Task以进程方式维护,任务启动慢Task以线程的方式维护,任务启动快

★注意:
尽管Spark相对于Hadoop而言具有较大优势,但Spark并不能完全替代Hadoop,Spark主要用于替代Hadoop中的MapReduce计算模型。存储依然可以使用HDFS,但是中间结果可以存放在内存中;调度可以使用Spark内置的,也可以使用更成熟的调度系统YARN等
实际上,Spark已经很好地融入了Hadoop生态圈,并成为其中的重要一员,它可以借助于YARN实现资源调度管理,借助于HDFS实现分布式存储。
此外,Hadoop可以使用廉价的、异构的机器来做分布式存储与计算,但是,Spark对硬件的要求稍高一些,对内存与CPU有一定的要求


1.5、Spark运行模式

1.local本地模式(单机)–开发测试使用,分为local单线程和local-cluster多线程

2.standalone独立集群模式–开发测试使用,典型的Mater/slave模式‘’

3.standalone-HA高可用模式–生产环境使用,基于standalone模式,使用zk搭建高可用,避免Master是有单点故障的
4.on yarn集群模式–生产环境使用
运行在 yarn 集群之上,由 yarn 负责资源管理,Spark 负责任务调度和计算,
好处:计算资源按需伸缩,集群利用率高,共享底层存储,避免数据跨集群迁移。
FIFO
Fair
Capacity
5.on mesos集群模式–国内使用较少
运行在 mesos 资源管理器框架之上,由 mesos 负责资源管理,Spark 负责任务调度和计算
6.on cloud集群模式–中小公司未来会更多的使用云服务
比如 AWS 的 EC2,使用这个模式能很方便的访问 Amazon的 S3


第二章 Spark环境搭建

■我们选择目前企业中使用最多的稳定版Spark2.2.0
■使用Apache版还是CDH版?
1.Apache版直接下载官方编译好的基于Apache Hadoop的Spark即可
2.自己下载Spark源码基于CDH Hadoop重新编译
因为CDH5.14版 Spark基于Spark1.6版本较低,且为了推广自家的Impala对Spark SQL进行了阉割,所以要重新编译
课程资料中已经给大家提供了编译好的CHD Spark,当然也可以根据资料自己编译
(如果自己编译要求网络环境较好,使用提供的软件、仓库,细心耐心操作,耗时1个半小时左右)


2.1. local本地模式-Spark初体验


2.1.1. 安装

●下载Spark安装包
下载地址:http://spark.apache.org/downloads.html

●解压重命名=
cd /export/servers
tar spark-2.2.0-bin-2.6.0-cdh5.14.0.tgz
mv spark-2.2.0-bin-2.6.0-cdh5.14.0 spark

●注意:
如果有权限问题,可以修改为root,方便学习时操作,实际中使用运维分配的用户和权限即可
chown -R root /export/servers/spark
chgrp -R root /export/servers/spark

●解压目录说明:
bin 可执行脚本
conf 配置文件
data 示例程序使用数据
examples 示例程序
jars 依赖 jar 包
python pythonAPI
R R 语言 API
sbin 集群管理命令
yarn 整合yarn需要的东东


2.1.2. 启动spark-shell

●开箱即用
直接启动bin目录下的spark-shell:
./spark-shell

●spark-shell说明
1.直接使用./spark-shell
表示使用local 模式启动,在本机启动一个SparkSubmit进程
2.还可指定参数 --master,如:
spark-shell --master local[N] 表示在本地模拟N个线程来运行当前任务
spark-shell --master local[] 表示使用当前机器上所有可用的资源
3.不携带参数默认就是
spark-shell --master local[
]
4.后续还可以使用–master指定集群地址,表示把任务提交到集群上运行,如
./spark-shell --master spark://node01:7077
5.退出spark-shell
使用 :quit


2.1.3. 初体验-读取本地文件

●准备数据
vim /root/words.txt

hello me you her
hello you her
hello her
hello

val textFile = sc.textFile("file:///root/words.txt")
val counts = textFile.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
counts.collect//收集结果
// Array[(String, Int)] = Array((you,2), (hello,4), (me,1), (her,3))

2.1.4. 初体验-读取HDFS文件

●准备数据
上传文件到hdfs
hadoop fs -put /root/words.txt /wordcount/input/words.txt
目录如果不存在可以创建
hadoop fs -mkdir -p /wordcount/input
结束后可以删除测试文件夹
hadoop fs -rm -r /wordcount

val textFile = sc.textFile("hdfs://node01:8020/wordcount/input/words.txt")
val counts = textFile.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://node01:8020/wordcount/output")

2.2. standalone集群模式


2.2.1. 集群角色介绍

Spark是基于内存计算的大数据并行计算框架,实际中运行计算任务肯定是使用集群模式,那么我们先来学习Spark自带的standalone集群模式了解一下它的架构及运行机制。

Standalone集群使用了分布式计算中的master-slave模型,
master是集群中含有master进程的节点
slave是集群中的worker节点含有Executor进程

●Spark架构图如下(先了解):
在这里插入图片描述


2.2.2. 集群规划

node01:master
node02:slave/worker
node03:slave/worker


2.2.3. 修改配置并分发

●修改Spark配置文件
cd /export/servers/spark/conf
mv spark-env.sh.template spark-env.sh
vim spark-env.sh

#配置java环境变量
export JAVA_HOME=/export/servers/jdk1.8
#指定spark Master的IP
export SPARK_MASTER_HOST=node01
#指定spark Master的端口
export SPARK_MASTER_PORT=7077

mv slaves.template slaves
vim slaves

node02
node03

●通过scp 命令将配置文件分发到其他机器上
scp -r /export/servers/spark node02:/export/servers
scp -r /export/servers/spark node03:/export/servers


2.2.4. 启动和停止

●集群启动和停止
在主节点上启动spark集群
/export/servers/spark/sbin/start-all.sh

在主节点上停止spark集群
/export/servers/spark/sbin/stop-all.sh

●单独启动和停止
在 master 安装节点上启动和停止 master:
start-master.sh
stop-master.sh
在 Master 所在节点上启动和停止worker(work指的是slaves 配置文件中的主机名)
start-slaves.sh
stop-slaves.sh


2.2.5. 查看web界面

正常启动spark集群后,查看spark的web界面,查看相关信息。
http://node01:8080/


2.2.6. 测试

●需求
使用集群模式运行Spark程序读取HDFS上的文件并执行WordCount

●集群模式启动spark-shell
/export/servers/spark/bin/spark-shell --master spark://node01:7077

●运行程序

sc.textFile("hdfs://node01:8020/wordcount/input/words.txt")
.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
.saveAsTextFile("hdfs://node01:8020/wordcount/output2")

●注意
集群模式下程序是在集群上运行的,不要直接读取本地文件,应该读取hdfs上的
因为程序运行在集群上,具体在哪个节点上我们运行并不知道,其他节点可能并没有那个数据文件


2.3. standalone-HA高可用模式


2.3.1. 原理

Spark Standalone集群是Master-Slaves架构的集群模式,和大部分的Master-Slaves结构集群一样,存在着Master单点故障的问题。
如何解决这个单点故障的问题,Spark提供了两种方案:
1.基于文件系统的单点恢复(Single-Node Recovery with Local File System)–只能用于开发或测试环境。
2.基于zookeeper的Standby Masters(Standby Masters with ZooKeeper)–可以用于生产环境。
在这里插入图片描述


2.3.2. 配置HA

该HA方案使用起来很简单,首先启动一个ZooKeeper集群,然后在不同节点上启动Master,注意这些节点需要具有相同的zookeeper配置。
●先停止Sprak集群
/export/servers/spark/sbin/stop-all.sh

●在node01上配置:
vim /export/servers/spark/conf/spark-env.sh

●注释掉Master配置
#export SPARK_MASTER_HOST=node01
●在spark-env.sh添加SPARK_DAEMON_JAVA_OPTS,内容如下:

export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=node01:2181,node02:2181,node03:2181 -Dspark.deploy.zookeeper.dir=/spark"

参数说明
spark.deploy.recoveryMode:恢复模式
spark.deploy.zookeeper.url:ZooKeeper的Server地址
spark.deploy.zookeeper.dir:保存集群元数据信息的文件、目录。包括Worker、Driver、Application信息。

●scp到其他节点
scp /export/servers/spark/conf/spark-env.sh node02:/export/servers/spark/conf/
scp /export/servers/spark/conf/spark-env.sh node03:/export/servers/spark/conf/


2.3.3. 启动zk集群

zkServer.sh status
zkServer.sh stop
zkServer.sh start


2.3.4. 启动Spark集群

●node01上启动Spark集群执行
/export/servers/spark/sbin/start-all.sh

●在node02上再单独只起个master:
/export/servers/spark/sbin/start-master.sh

●注意:
在普通模式下启动spark集群
只需要在主节点上执行start-all.sh 就可以了
在高可用模式下启动spark集群
先需要在任意一台主节点上执行start-all.sh
然后在另外一台主节点上单独执行start-master.sh

●查看node01和node02
http://node01:8080/
http://node02:8080/
可以观察到有一台状态为StandBy


2.3.5. 测试HA

●测试主备切换
1.在node01上使用jps查看master进程id
2.使用kill -9 id号强制结束该进程
3.稍等片刻后刷新node02的web界面发现node02为Alive
在这里插入图片描述
●测试集群模式提交任务
1.集群模式启动spark-shell
/export/servers/spark/bin/spark-shell --master spark://node01:7077,node02:7077

2.运行程序

sc.textFile("hdfs://node01:8020/wordcount/input/words.txt")
.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
.saveAsTextFile("hdfs://node01:8020/wordcount/output3")

2.4. on yarn集群模式

2.4.1. 准备工作
1.安装启动Hadoop(需要使用HDFS和YARN,已经ok)

2.安装单机版Spark(已经ok)
注意:不需要集群,因为把Spark程序提交给YARN运行本质上是把字节码给YARN集群上的JVM运行,但是得有一个东西帮我去把任务提交上个YARN,所以需要一个单机版的Spark,里面的有spark-shell命令,spark-submit命令

3.修改配置:
在spark-env.sh ,添加HADOOP_CONF_DIR配置,指明了hadoop的配置文件的位置
vim /export/servers/spark/conf/spark-env.sh

export HADOOP_CONF_DIR=/export/servers/hadoop/etc/hadoop

2.4.2. cluster模式

●说明
在企业生产环境中大部分都是cluster部署模式运行Spark应用
Spark On YARN的Cluster模式 指的是Driver程序运行在YARN集群上

●补充Driver是什么:
运行应用程序的main()函数并创建SparkContext的进程

●图解
在这里插入图片描述
●运行示例程序
spark-shell是一个简单的用来测试的交互式窗口
spark-submit用来提交打成jar包的任务

/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 2 \
--queue default \
/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/examples/jars/spark-examples_2.11-2.2.0.jar \
10

●查看界面
http://node01:8088/cluster
在这里插入图片描述


2.4.3. client模式[了解]

●说明
学习测试时使用,开发不用,了解即可
Spark On YARN的Client模式 指的是Driver程序运行在提交任务的客户端

●图解
在这里插入图片描述

●运行示例程序

/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 2 \
--queue default \
/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/examples/jars/spark-examples_2.11-2.2.0.jar \
10

2.4.4. 两种模式的区别

Cluster和Client模式最本质的区别是:Driver程序运行在哪里!
运行在YARN集群中就是Cluster模式,
运行在客户端就是Client模式
当然还有由本质区别延伸出来的区别,面试的时候能简单说出几点就行
●cluster模式:生产环境中使用该模式
1.Driver程序在YARN集群中
2.应用的运行结果不能在客户端显示
3.该模式下Driver运行ApplicattionMaster这个进程中,如果出现问题,yarn会重启ApplicattionMaster(Driver)

●client模式:
1.Driver运行在Client上的SparkSubmit进程中
2.应用程序运行结果会在客户端显示


2.5. 两种模式的区别


2.5.1. spark-shell

spark-shell是Spark自带的交互式Shell程序,方便用户进行交互式编程,用户可以在该命令行下可以用scala编写spark程序,适合学习测试时使用!

●示例
spark-shell可以携带参数
spark-shell --master local[N] 数字N表示在本地模拟N个线程来运行当前任务

spark-shell --master local[*] 表示使用当前机器上所有可用的资源
默认不携带参数就是–master local[
]

spark-shell --master spark://node01:7077,node02:7077 表示运行在集群上


2.5.2. spark-submit

spark-submit命令用来提交jar包给spark集群/YARN
spark-shell交互式编程确实很方便我们进行学习测试,但是在实际中我们一般是使用IDEA开发Spark应用程序打成jar包交给Spark集群/YARN去执行。
spark-submit命令是我们开发时常用的!!!

示例:计算π
cd /export/servers/spark

/export/servers/spark/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://node01:7077 \
--executor-memory 1g \
--total-executor-cores 2 \
/export/servers/spark/examples/jars/spark-examples_2.11-2.2.0.jar \
10

2.5.3. 参数总结

●Master参数形式


Master形式解释
local本地以一个worker线程运行(例如非并行的情况).
local[N]本地以K worker 线程 (理想情况下, N设置为你机器的CPU核数).
local[*]本地以本机同样核数的线程运行.
spark://HOST:PORT连接到指定的Spark standalone cluster master. 端口是你的master集群配置的端口,缺省值为7077.
mesos://HOST:PORT连接到指定的Mesos 集群. Port是你配置的mesos端口, 默认5050. 或者使用ZK,格式为 mesos://zk://…
yarn-client以client模式连接到YARN cluster. 集群的位置基于HADOOP_CONF_DIR 变量找到.
yarn-cluster以cluster模式连接到YARN cluster. 集群的位置基于HADOOP_CONF_DIR 变量找到.

其他参数示例

–master spark://node01:7077 指定 Master 的地址
–name “appName” 指定程序运行的名称
–class 程序的main方法所在的类
–jars xx.jar 程序额外使用的 jar 包
–driver-memory 512m Driver运行所需要的内存, 默认1g
–executor-memory 2g 指定每个 executor 可用内存为 2g, 默认1g
–executor-cores 1 指定每一个 executor 可用的核数
–total-executor-cores 2 指定整个集群运行任务使用的 cup 核数为 2 个
–queue default 指定任务的对列
–deploy-mode 指定运行模式(client/cluster)

●注意:
如果 worker 节点的内存不足,那么在启动 spark-submit的时候,就不能为 executor分配超出 worker 可用的内存容量。
如果–executor-cores超过了每个 worker 可用的 cores,任务处于等待状态。
如果–total-executor-cores即使超过可用的 cores,默认使用所有的。以后当集群其他的资源释放之后,就会被该程序所使用。
如果内存或单个 executor 的 cores 不足,启动 spark-submit 就会报错,任务处于等待状态,不能正常执行。


第三章 IDEA编写Spark程序


3.1. pom.xml


4.0.0cn.itcastSparkDemo1.0-SNAPSHOTaliyunhttp://maven.aliyun.com/nexus/content/groups/public/clouderahttps://repository.cloudera.com/artifactory/cloudera-repos/jbosshttp://repository.jboss.com/nexus/content/groups/public1.81.8UTF-82.11.82.112.7.42.2.0org.scala-langscala-library${scala.version}org.apache.sparkspark-core_2.11${spark.version}org.apache.sparkspark-sql_2.11${spark.version}org.apache.sparkspark-hive_2.11${spark.version}org.apache.sparkspark-hive-thriftserver_2.11${spark.version}org.apache.sparkspark-streaming_2.11${spark.version}org.apache.sparkspark-streaming-kafka-0-10_2.11${spark.version}org.apache.sparkspark-sql-kafka-0-10_2.11${spark.version}org.apache.hadoophadoop-client2.7.4org.apache.hbasehbase-client1.3.1org.apache.hbasehbase-server1.3.1com.typesafeconfig1.3.3mysqlmysql-connector-java5.1.38src/main/scalasrc/test/scalaorg.apache.maven.pluginsmaven-compiler-plugin3.5.1net.alchim31.mavenscala-maven-plugin3.2.2compiletestCompile-dependencyfile${project.build.directory}/.scala_dependenciesorg.apache.maven.pluginsmaven-surefire-plugin2.18.1falsetrue**/*Test.***/*Suite.*org.apache.maven.pluginsmaven-shade-plugin2.3packageshade*:*META-INF/*.SFMETA-INF/*.DSAMETA-INF/*.RSA


3.2. 本地运行(统计单词数量)

package cn.itcast.sparkhelloimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object WordCount {def main(args: Array[String]): Unit = {//1.创建SparkContextval config = new SparkConf().setAppName("wc").setMaster("local[*]")val sc = new SparkContext(config)
sc.setLogLevel("WARN")//2.读取文件//A Resilient Distributed Dataset (RDD)弹性分布式数据集//可以简单理解为分布式的集合,但是spark对它做了很多的封装,//让程序员使用起来就像操作本地集合一样简单,这样大家就很happy了val fileRDD: RDD[String] = sc.textFile("D:\\授课\\190429\\资料\\data\\words.txt")//3.处理数据//3.1对每一行按空切分并压平形成一个新的集合中装的一个个的单词//flatMap是对集合中的每一个元素进行操作,再进行压平val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" "))//3.2每个单词记为1val wordAndOneRDD: RDD[(String, Int)] = wordRDD.map((_,1))//3.3根据key进行聚合,统计每个单词的数量//wordAndOneRDD.reduceByKey((a,b)=>a+b)//第一个_:之前累加的结果//第二个_:当前进来的数据val wordAndCount: RDD[(String, Int)] = wordAndOneRDD.reduceByKey(_+_)//4.收集结果val result: Array[(String, Int)] = wordAndCount.collect()result.foreach(println)}
}

3.3. 集群运行(统计单词数量)

●修改代码

package cn.itcast.sparkhelloimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object WordCount {def main(args: Array[String]): Unit = {//1.创建SparkContextval config = new SparkConf().setAppName("wc")//.setMaster("local[*]") val sc = new SparkContext(config)sc.setLogLevel("WARN")//2.读取文件//A Resilient Distributed Dataset (RDD)弹性分布式数据集//可以简单理解为分布式的集合,但是spark对它做了很多的封装,//让程序员使用起来就像操作本地集合一样简单,这样大家就很happy了val fileRDD: RDD[String] = sc.textFile(args(0)) //文件输入路径//3.处理数据//3.1对每一行按空切分并压平形成一个新的集合中装的一个个的单词//flatMap是对集合中的每一个元素进行操作,再进行压平val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" "))//3.2每个单词记为1val wordAndOneRDD: RDD[(String, Int)] = wordRDD.map((_,1))//3.3根据key进行聚合,统计每个单词的数量//wordAndOneRDD.reduceByKey((a,b)=>a+b)//第一个_:之前累加的结果//第二个_:当前进来的数据val wordAndCount: RDD[(String, Int)] = wordAndOneRDD.reduceByKey(_+_)wordAndCount.saveAsTextFile(args(1))//文件输出路径//4.收集结果//val result: Array[(String, Int)] = wordAndCount.collect()//result.foreach(println)}
}

●打包
在这里插入图片描述

●上传
在这里插入图片描述

●执行命令提交到Spark-HA集群

/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/bin/spark-submit \
--class cn.itcast.sparkhello.WordCount \
--master spark://node01:7077,node02:7077 \
--executor-memory 1g \
--total-executor-cores 2 \
/root/wc.jar \
hdfs://node01:8020/aa.txt \
hdfs://node01:8020/cc

●执行命令提交到YARN集群

/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/bin/spark-submit \
--class cn.itcast.sparkhello.WordCount \
--master yarn \
--deploy-mode cluster \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 2 \
--queue default \
/root/wc.jar \
hdfs://node01:8020/wordcount/input/words.txt \
hdfs://node01:8020/wordcount/output5


好了,以上内容就到这里了。不知道小编本篇内容有没有帮助到你呢。欢迎路过的朋友关注小编哦。各位朋友关注点赞是小编坚持下去的动力。小编会继续为大家分享更多的知识哦~~~。

我是小哪吒,我是一名互联网行业的工具人,小编的座右铭:“我不生产代码,我只做代码的搬运工”…哈哈哈,我们下期见哦,Bye~


当你的才华还撑不起你的野心时,你就该努力。

推荐阅读
author-avatar
无正道
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有