热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

sparksql操作hive_spark连接hive的两种方式

在pom中添加依赖xmlns:xsihttp:www.w3.org2001XMLSchema-instancexsi:schemaLocationhttp:maven.apa

在pom中添加依赖

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

emg

emg.spark

1.0-SNAPSHOT

org.apache.spark

spark-core_2.11

2.1.1

org.apache.spark

spark-sql_2.11

2.1.1

org.apache.spark

spark-hive_2.11

2.1.1

org.scala-lang

scala-library

2.11.8

org.scala-lang

scala-compiler

2.11.8

org.scala-lang

scala-reflect

2.11.8

org.apache.hadoop

hadoop-common

2.7.2

org.apache.hadoop

hadoop-hdfs

2.7.2

log4j

log4j

1.2.17

mysql

mysql-connector-java

5.1.38

org.apache.hive

hive-jdbc

1.1.0

net.alchim31.maven

scala-maven-plugin

3.2.2

org.apache.maven.plugins

maven-compiler-plugin

3.5.1

net.alchim31.maven

scala-maven-plugin

scala-compile-first

process-resources

add-source

compile

scala-test-compile

process-test-resources

testCompile

org.apache.maven.plugins

maven-compiler-plugin

compile

compile

org.apache.maven.plugins

maven-shade-plugin

2.4.3

package

shade

*:*

META-INF/*.SF

META-INF/*.DSA

META-INF/*.RSA

implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">

emg.branchs.EmgFilterDemo

方式1.使用sparkSQL直接连接hive

经自己测试 ,hive的metastore启动不了,只启动hiveServer2,这种方式一直报错,找不到hive的元数据库

def main(args: Array[String]): Unit = {

val Array(inpath, dt, hour) = args

val conf = new SparkConf().setAppName(this.getClass.getSimpleName)

//.setMaster("local[*]")

.setMaster("spark://192.168.40.52:7077")

val session = SparkSession.builder()

.config(conf)

// 指定hive的metastore的端口 默认为9083 在hive-site.xml中查看

.config("hive.metastore.uris", "thrift://192.168.40.51:9083")

//指定hive的warehouse目录

.config("spark.sql.warehouse.dir", "hdfs://192.168.40.51:9000/user/hive/warehouse")

//直接连接hive

.enableHiveSupport()

.getOrCreate()

import session.implicits._

val df1 = session.read.parquet(inpath)

//df1.write.saveAsTable(s"tmp.tmp_app_log_1")

df1.createOrReplaceTempView("tmp_app_log_test")

//sql的代码省略

val sql1 =

s"""

|select *

|from tmp_app_log_test

""".stripMargin

val hive_table = "dwb2.fact_mbk_offline_log_mbk_app_action_event_v2_i_h"

val sql2 = s"alter table $hive_table add if not exists partition ( dt='$dt',hour='$hour')"

session.sql(sql2)

val tmp_table =s"""tmp.app_log_${dt}_${hour}"""

val sql3 = s"""drop table IF EXISTS $tmp_table""".stripMargin

session.sql(sql3)

val df2 = session.sql(sql1)

//结果先写入临时表

df2.write.saveAsTable(tmp_table)

//结果从临时表写入分区表

val sql4 =

s"""INSERT OVERWRITE TABLE $hive_table

|PARTITION( dt='$dt',hour='$hour')

| select * from $tmp_table """.stripMargin

session.sql(sql4)

val sql5 = s"""drop table IF EXISTS $tmp_table""".stripMargin

session.sql(sql5)

}

方式2 使用jdbc连接hive

经自己测试  hive的metastore启动不了 只启动hiveServer2  jdbc连接方式可以正常使用

def main(args: Array[String]): Unit = {

//经自己测试 hive的metastore启动不了 只启动hiveServer2 jdbc连接方式可以正常使用

val conf = new SparkConf().setAppName(this.getClass.getSimpleName)

//.setMaster("local[*]")

.setMaster("spark://192.168.40.**:7077")

val session = SparkSession.builder()

.config(conf)

.getOrCreate()

//注意这里的写法 好像是hive1.3版本以上不一样了 自行百度

val url = "jdbc:hive2://192.168.40.**:10000/emg"

val username = "root"

val password = "123456"

val driverName = "org.apache.hive.jdbc.HiveDriver"

try {

Class.forName(driverName)

} catch {

case e: ClassNotFoundException =>

println("Missing Class", e)

}

val con: Connection = DriverManager.getConnection(url, username, password)

val state = con.createStatement()

import session.implicits._

var paths = "/user/emg/cxb_out/" + CurrentTime.getMonthDate() + "/" + CurrentTime.getYesterday() + "/" + CurrentTime.getHourDate() + "/"

//由于hive的元数据库启动不了 连接不上 只能用jdbc的方式将结果load进hive表中

var sql2 = "load data inpath '" + paths + "' into table result01"

try {

val assertion = state.execute(sql2)

state.execute(sql2)

println("===============================存入hvie成功==========================")

} catch {

case e: Exception => e.printStackTrace()

} finally {

if (null != con) {

con.close()

}

}

/* val sql =

"""

|create external table zz_result(id bigint,lat float,lon float,utc bigint,tags int)

|row format delimited fields terminated by '\t' location '/user/hive/zz'

""".stripMargin

state.executeQuery(sql)

println("建表成功")

try {

val assertion = state.execute(sql)

state.execute(sql)

println("===============================存入hvie成功==========================")

} catch {

case e: Exception => e.printStackTrace()

} finally {

if (null != con) {

con.close()

}

}

*/

session.close()

}



推荐阅读
author-avatar
398851412_eedcee
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有