热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

pysparkspark客户端设置

1,SparkContext和sparkSession的区别1-1、#构建SparkContext,读取文件frompysparkimport*importosos.environ

1,SparkContext和sparkSession的区别

1-1、#构建SparkContext,读取文件

    from pyspark import *
    import os
    os.environ["YARN_CONF_DIR"] = "/etc/spark2/conf/yarn-conf"

    conf = SparkConf().setAppName("appname").setSparkHome("/opt/cloudera/parcels/SPARK2/lib/spark2").setMaster("yarn")
    sc = SparkContext(cOnf=conf)
    rdd = sc.textFile("hdfs://part*",10)
1-2、#构建SparkSession  执行hive语句(这个也能完成1中的功能,所以能用这个尽量用这个)

    from pyspark.sql import *
    spark = SparkSession.builder.appName("appName").master("yarn").enableHiveSupport().getOrCreate()
    get_duration_sql=\'\'\'
    select
    id,duration
    from
    *_db.da_*
    where
    day=\'20210126\' and duration is not null
\'\'\'
rdd=spark.sql(get_duration_sql).rdd

SparkSession是Spark 2.0引如的新概念。SparkSession为用户提供了统一的切入点,来让用户学习spark的各项功能。
 在spark的早期版本中,SparkContext是spark的主要切入点,由于RDD是主要的API,我们通过sparkcontext来创建和操作RDD。对于每个其他的API,我们需要使用不同的context。例如,对于Streming,我们需要使用StreamingContext;对于sql,使用sqlContext;对于Hive,使用hiveContext。但是随着DataSet和DataFrame的API逐渐成为标准的API,就需要为他们建立接入点。所以在spark2.0中,引入SparkSession作为DataSet和DataFrame API的切入点,SparkSession封装了SparkConf、SparkContext和SQLContext。为了向后兼容,SQLContext和HiveContext也被保存下来。
 SparkSession实质上是SQLContext和HiveContext的组合(未来可能还会加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成的
资料链接:https://blog.csdn.net/beautiful_huang/article/details/103820534

2,sparkSession的使用

    if len(sys.argv) <4:
        print(\'input error\')
    directory = sys.argv[1]
    day = sys.argv[2]
    hour = sys.argv[3]
    if hour=="24":
        hour="*"
    # 配置环境
    os.environ["YARN_CONF_DIR"] = "/etc/spark2/conf/yarn-conf"
    # 实例化spark
    spark = SparkSession.builder. \
        appName("spark_slot_eva"). \
        config("spark.sql.shuffle.partitions", 10). \
        config("spark.default.parallelism", 1600). \
        config("hive.warehouse.subdir.inherit.perms", "false"). \
        config("spark.executor.cores", "4"). \  #spark.executor.cores:顾名思义这个参数是用来指定executor的cpu内核个数,分配更多的内核意味着executor并发能力越强,能够同时执行更多的task
        config("spark.executor.instances", "200"). \ #能够启动的executor的个数
        config("spark.executor.memory", "4g"). \  #executor memory是每个节点上占用的内存。每一个节点都可使用内存
        config("spark.port.maxRetries","100").\   #最大重试次数,不设置的话默认4,容易启动spark失败
        config("spark.yarn.executor.memoryOverhead","26g").\
        enableHiveSupport(). \
        getOrCreate()
    sc = spark.sparkContext
    # 读取样本数据集和slot.conf
    dir_list=[directory,day,hour,"part*"]
    final_dir="/".join(dir_list)
    print(final_dir)
    rdd = sc.textFile(final_dir)
    # print(\'------------------rdd----------------\', rdd.first())
    # 2.计算数据的总行数
    line_num = rdd.count()
    print(\'-------------------------------------line_num--------------\', line_num)
     # 计算特征覆盖率
    dict1 = get_slot_cover(rdd,line_num)
    dict2 = get_slot_num(rdd)
    dict3 = get_slot_num_avg(rdd,line_num)
    res2hive(spark,day,hour,dict1,dict2,dict3)
spark.executor.memory  + spark.yarn.executor.memoryOverhead  <30G  在运行的有这个限制 30G为集群内存限制(公司环境限制)

 

   

spark.default.parallelism    并行度问题,如果不设置这个参数,Spark 会跟据 HDFS 中 Block 的个数去设置这一个数量,
原理是默应每个 Block 会对应一个 Task,默应情况下,如果数据量不是太多就不可以充份利用 executor 设置的资源,
就会浪费了资源。建义设置为 100个,最好 700个左右。Spark官方的建义是每一个 Core 负责 2-3 个 Task
4*200 *2(ccores*instances)

 


推荐阅读
  • 探索MLlib机器学习
    公众号后台回复关键词:pyspark,获取本项目github地址。MLlib是Spark的机器学习库,包括以下主要功能。实用工具ÿ ... [详细]
  • 《Spark核心技术与高级应用》——1.2节Spark的重要扩展
    本节书摘来自华章社区《Spark核心技术与高级应用》一书中的第1章,第1.2节Spark的重要扩展,作者于俊向海代其锋马海平,更多章节内容可以访问云栖社区“华章社区”公众号查看1. ... [详细]
  • 什么是大数据lambda架构
    一、什么是Lambda架构Lambda架构由Storm的作者[NathanMarz]提出,根据维基百科的定义,Lambda架构的设计是为了在处理大规模数 ... [详细]
  • 本文介绍了Hyperledger Fabric外部链码构建与运行的相关知识,包括在Hyperledger Fabric 2.0版本之前链码构建和运行的困难性,外部构建模式的实现原理以及外部构建和运行API的使用方法。通过本文的介绍,读者可以了解到如何利用外部构建和运行的方式来实现链码的构建和运行,并且不再受限于特定的语言和部署环境。 ... [详细]
  • 开发笔记:Spark Java API 之 CountVectorizer
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了SparkJavaAPI之CountVectorizer相关的知识,希望对你有一定的参考价值。 ... [详细]
  • 本文介绍了Python高级网络编程及TCP/IP协议簇的OSI七层模型。首先简单介绍了七层模型的各层及其封装解封装过程。然后讨论了程序开发中涉及到的网络通信内容,主要包括TCP协议、UDP协议和IPV4协议。最后还介绍了socket编程、聊天socket实现、远程执行命令、上传文件、socketserver及其源码分析等相关内容。 ... [详细]
  • 本文分享了一个关于在C#中使用异步代码的问题,作者在控制台中运行时代码正常工作,但在Windows窗体中却无法正常工作。作者尝试搜索局域网上的主机,但在窗体中计数器没有减少。文章提供了相关的代码和解决思路。 ... [详细]
  • Android Studio Bumblebee | 2021.1.1(大黄蜂版本使用介绍)
    本文介绍了Android Studio Bumblebee | 2021.1.1(大黄蜂版本)的使用方法和相关知识,包括Gradle的介绍、设备管理器的配置、无线调试、新版本问题等内容。同时还提供了更新版本的下载地址和启动页面截图。 ... [详细]
  • 本文介绍了使用PHP实现断点续传乱序合并文件的方法和源码。由于网络原因,文件需要分割成多个部分发送,因此无法按顺序接收。文章中提供了merge2.php的源码,通过使用shuffle函数打乱文件读取顺序,实现了乱序合并文件的功能。同时,还介绍了filesize、glob、unlink、fopen等相关函数的使用。阅读本文可以了解如何使用PHP实现断点续传乱序合并文件的具体步骤。 ... [详细]
  • Voicewo在线语音识别转换jQuery插件的特点和示例
    本文介绍了一款名为Voicewo的在线语音识别转换jQuery插件,该插件具有快速、架构、风格、扩展和兼容等特点,适合在互联网应用中使用。同时还提供了一个快速示例供开发人员参考。 ... [详细]
  • 利用Visual Basic开发SAP接口程序初探的方法与原理
    本文介绍了利用Visual Basic开发SAP接口程序的方法与原理,以及SAP R/3系统的特点和二次开发平台ABAP的使用。通过程序接口自动读取SAP R/3的数据表或视图,在外部进行处理和利用水晶报表等工具生成符合中国人习惯的报表样式。具体介绍了RFC调用的原理和模型,并强调本文主要不讨论SAP R/3函数的开发,而是针对使用SAP的公司的非ABAP开发人员提供了初步的接口程序开发指导。 ... [详细]
  • 本文介绍了解决java开源项目apache commons email简单使用报错的方法,包括使用正确的JAR包和正确的代码配置,以及相关参数的设置。详细介绍了如何使用apache commons email发送邮件。 ... [详细]
  • Spark Streaming和Kafka整合之路(最新版本)
    2019独角兽企业重金招聘Python工程师标准最近完成了SparkStreaming和Kafka的整合工作,耗时虽然不长,但是当中还是遇到了不少 ... [详细]
  • packagecom.bjsxt.spark.others;importorg.apache.spark.SparkConf;importorg.apache.spark.api. ... [详细]
  • 开发笔记:大三上寒假15天第5天
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了大三上寒假15天--第5天相关的知识,希望对你有一定的参考价值。昨天的下载完成后运行报错,应该是下载的spark版本和教 ... [详细]
author-avatar
陈佩儒61473
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有