1,SparkContext和sparkSession的区别
1-1、#构建SparkContext,读取文件 from pyspark import * import os os.environ["YARN_CONF_DIR"] = "/etc/spark2/conf/yarn-conf" conf = SparkConf().setAppName("appname").setSparkHome("/opt/cloudera/parcels/SPARK2/lib/spark2").setMaster("yarn") sc = SparkContext(cOnf=conf) rdd = sc.textFile("hdfs://part*",10) 1-2、#构建SparkSession 执行hive语句(这个也能完成1中的功能,所以能用这个尽量用这个) from pyspark.sql import * spark = SparkSession.builder.appName("appName").master("yarn").enableHiveSupport().getOrCreate() get_duration_sql=\'\'\' select id,duration from *_db.da_* where day=\'20210126\' and duration is not null \'\'\' rdd=spark.sql(get_duration_sql).rdd
SparkSession是Spark 2.0引如的新概念。SparkSession为用户提供了统一的切入点,来让用户学习spark的各项功能。
在spark的早期版本中,SparkContext是spark的主要切入点,由于RDD是主要的API,我们通过sparkcontext来创建和操作RDD。对于每个其他的API,我们需要使用不同的context。例如,对于Streming,我们需要使用StreamingContext;对于sql,使用sqlContext;对于Hive,使用hiveContext。但是随着DataSet和DataFrame的API逐渐成为标准的API,就需要为他们建立接入点。所以在spark2.0中,引入SparkSession作为DataSet和DataFrame API的切入点,SparkSession封装了SparkConf、SparkContext和SQLContext。为了向后兼容,SQLContext和HiveContext也被保存下来。
SparkSession实质上是SQLContext和HiveContext的组合(未来可能还会加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成的
资料链接:https://blog.csdn.net/beautiful_huang/article/details/103820534
2,sparkSession的使用
if len(sys.argv) <4: print(\'input error\') directory = sys.argv[1] day = sys.argv[2] hour = sys.argv[3] if hour=="24": hour="*" # 配置环境 os.environ["YARN_CONF_DIR"] = "/etc/spark2/conf/yarn-conf" # 实例化spark spark = SparkSession.builder. \ appName("spark_slot_eva"). \ config("spark.sql.shuffle.partitions", 10). \ config("spark.default.parallelism", 1600). \ config("hive.warehouse.subdir.inherit.perms", "false"). \ config("spark.executor.cores", "4"). \ #spark.executor.cores:顾名思义这个参数是用来指定executor的cpu内核个数,分配更多的内核意味着executor并发能力越强,能够同时执行更多的task config("spark.executor.instances", "200"). \ #能够启动的executor的个数 config("spark.executor.memory", "4g"). \ #executor memory是每个节点上占用的内存。每一个节点都可使用内存 config("spark.port.maxRetries","100").\ #最大重试次数,不设置的话默认4,容易启动spark失败 config("spark.yarn.executor.memoryOverhead","26g").\ enableHiveSupport(). \ getOrCreate() sc = spark.sparkContext # 读取样本数据集和slot.conf dir_list=[directory,day,hour,"part*"] final_dir="/".join(dir_list) print(final_dir) rdd = sc.textFile(final_dir) # print(\'------------------rdd----------------\', rdd.first()) # 2.计算数据的总行数 line_num = rdd.count() print(\'-------------------------------------line_num--------------\', line_num) # 计算特征覆盖率 dict1 = get_slot_cover(rdd,line_num) dict2 = get_slot_num(rdd) dict3 = get_slot_num_avg(rdd,line_num) res2hive(spark,day,hour,dict1,dict2,dict3)
spark.executor.memory + spark.yarn.executor.memoryOverhead <30G 在运行的有这个限制 30G为集群内存限制(公司环境限制)
spark.default.parallelism 并行度问题,如果不设置这个参数,Spark 会跟据 HDFS 中 Block 的个数去设置这一个数量,
原理是默应每个 Block 会对应一个 Task,默应情况下,如果数据量不是太多就不可以充份利用 executor 设置的资源,
就会浪费了资源。建义设置为 100个,最好 700个左右。Spark官方的建义是每一个 Core 负责 2-3 个 Task
4*200 *2(ccores*instances)