在pom中添加依赖
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
emg
emg.spark
1.0-SNAPSHOT
org.apache.spark
spark-core_2.11
2.1.1
org.apache.spark
spark-sql_2.11
2.1.1
org.apache.spark
spark-hive_2.11
2.1.1
org.scala-lang
scala-library
2.11.8
org.scala-lang
scala-compiler
2.11.8
org.scala-lang
scala-reflect
2.11.8
org.apache.hadoop
hadoop-common
2.7.2
org.apache.hadoop
hadoop-hdfs
2.7.2
log4j
log4j
1.2.17
mysql
mysql-connector-java
5.1.38
org.apache.hive
hive-jdbc
1.1.0
net.alchim31.maven
scala-maven-plugin
3.2.2
org.apache.maven.plugins
maven-compiler-plugin
3.5.1
net.alchim31.maven
scala-maven-plugin
scala-compile-first
process-resources
add-source
compile
scala-test-compile
process-test-resources
testCompile
org.apache.maven.plugins
maven-compiler-plugin
compile
compile
org.apache.maven.plugins
maven-shade-plugin
2.4.3
package
shade
*:*
META-INF/*.SF
META-INF/*.DSA
META-INF/*.RSA
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
emg.branchs.EmgFilterDemo
方式1.使用sparkSQL直接连接hive
经自己测试 ,hive的metastore启动不了,只启动hiveServer2,这种方式一直报错,找不到hive的元数据库
def main(args: Array[String]): Unit = {
val Array(inpath, dt, hour) = args
val conf = new SparkConf().setAppName(this.getClass.getSimpleName)
//.setMaster("local[*]")
.setMaster("spark://192.168.40.52:7077")
val session = SparkSession.builder()
.config(conf)
// 指定hive的metastore的端口 默认为9083 在hive-site.xml中查看
.config("hive.metastore.uris", "thrift://192.168.40.51:9083")
//指定hive的warehouse目录
.config("spark.sql.warehouse.dir", "hdfs://192.168.40.51:9000/user/hive/warehouse")
//直接连接hive
.enableHiveSupport()
.getOrCreate()
import session.implicits._
val df1 = session.read.parquet(inpath)
//df1.write.saveAsTable(s"tmp.tmp_app_log_1")
df1.createOrReplaceTempView("tmp_app_log_test")
//sql的代码省略
val sql1 =
s"""
|select *
|from tmp_app_log_test
""".stripMargin
val hive_table = "dwb2.fact_mbk_offline_log_mbk_app_action_event_v2_i_h"
val sql2 = s"alter table $hive_table add if not exists partition ( dt='$dt',hour='$hour')"
session.sql(sql2)
val tmp_table =s"""tmp.app_log_${dt}_${hour}"""
val sql3 = s"""drop table IF EXISTS $tmp_table""".stripMargin
session.sql(sql3)
val df2 = session.sql(sql1)
//结果先写入临时表
df2.write.saveAsTable(tmp_table)
//结果从临时表写入分区表
val sql4 =
s"""INSERT OVERWRITE TABLE $hive_table
|PARTITION( dt='$dt',hour='$hour')
| select * from $tmp_table """.stripMargin
session.sql(sql4)
val sql5 = s"""drop table IF EXISTS $tmp_table""".stripMargin
session.sql(sql5)
}
方式2 使用jdbc连接hive
经自己测试 hive的metastore启动不了 只启动hiveServer2 jdbc连接方式可以正常使用
def main(args: Array[String]): Unit = {
//经自己测试 hive的metastore启动不了 只启动hiveServer2 jdbc连接方式可以正常使用
val conf = new SparkConf().setAppName(this.getClass.getSimpleName)
//.setMaster("local[*]")
.setMaster("spark://192.168.40.**:7077")
val session = SparkSession.builder()
.config(conf)
.getOrCreate()
//注意这里的写法 好像是hive1.3版本以上不一样了 自行百度
val url = "jdbc:hive2://192.168.40.**:10000/emg"
val username = "root"
val password = "123456"
val driverName = "org.apache.hive.jdbc.HiveDriver"
try {
Class.forName(driverName)
} catch {
case e: ClassNotFoundException =>
println("Missing Class", e)
}
val con: Connection = DriverManager.getConnection(url, username, password)
val state = con.createStatement()
import session.implicits._
var paths = "/user/emg/cxb_out/" + CurrentTime.getMonthDate() + "/" + CurrentTime.getYesterday() + "/" + CurrentTime.getHourDate() + "/"
//由于hive的元数据库启动不了 连接不上 只能用jdbc的方式将结果load进hive表中
var sql2 = "load data inpath '" + paths + "' into table result01"
try {
val assertion = state.execute(sql2)
state.execute(sql2)
println("===============================存入hvie成功==========================")
} catch {
case e: Exception => e.printStackTrace()
} finally {
if (null != con) {
con.close()
}
}
/* val sql =
"""
|create external table zz_result(id bigint,lat float,lon float,utc bigint,tags int)
|row format delimited fields terminated by '\t' location '/user/hive/zz'
""".stripMargin
state.executeQuery(sql)
println("建表成功")
try {
val assertion = state.execute(sql)
state.execute(sql)
println("===============================存入hvie成功==========================")
} catch {
case e: Exception => e.printStackTrace()
} finally {
if (null != con) {
con.close()
}
}
*/
session.close()
}