热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

spaksql把hive导入mysql,SparkSql实现Mysql到hive的数据流动

今天去面试了一波,因为调度系统采用了SparkSql实现数据从Mysql到hive,在这一点上面试官很明显很不满我对于Spark的理解,1

今天去面试了一波,因为调度系统采用了SparkSql实现数据从Mysql到hive,在这一点上面试官很明显很不满我对于Spark的理解,19年的第一个面试就这么挂了。 有问题不怕,怕的是知道了问题还得过且过。现在就来梳理下我的项目是怎么使用Spark导数的 第一步:把mysql中的表放入内存

properties.put("user", dbUser);

properties.put("password", dbPassword);

properties.put("driver", dbDriver);

Dataset bizdateDS = sparkSession.read().jdbc(

dbUrl,

dbTableName,

properties

);

其中:org.apache.spark.sql.Dataset(这里面试官问我怎么把mysql的数据转化到Spark,我没答上来)

第二步:创建数据库与表 2.1 创建库

String createDBSQL = "CREATE DATABASE IF NOT EXISTS " + hiveDBName + " LOCATION '" + dbPath + "'";

sparkSession.sql(createDBSQL);

```

2.2创建表

分成两步,第一步读取Mysql元数据字段,第二步把这些字段创建出来

2.2.1 读取mysql字段

StructType structType = bizdateDS.schema();

StructField[] structFields = structType.fields();

/*

structField是StructType中的字段。

param:name此字段的名称。

param:dataType此字段的数据类型。

param:nullable指示此字段的值是否为空值。

param:metadata此字段的元数据。 如果未修改列的内容(例如,在选择中),则应在转换期间保留元数据。

*/

2.2.2 创建字段

String sourceType; //Name of the type used in JSON serialization.

String columnName;

String targetType;

StructField structField;

SparkDataTypeEnum sparkDataType;

StringBuilder createBuilder = new StringBuilder(capacity);

createBuilder.append("CREATE TABLE IF NOT EXISTS ").append(realHiveTableName).append(" (");

List dbTableColumns = Lists.newArrayList();

Map dbTableColumnTypeMap = Maps.newHashMap();

//把Mysql中的每个字段都提取出来

for (int i = 0, len = structFields.length; i

structField = structFields[i];

sourceType = structField.dataType().typeName();

columnName = structField.name();

if (sourceType.contains("(")) { //处理类似varchar(20)

sourceType = sourceType.substring(0, sourceType.indexOf("("));

}

sparkDataType = SparkDataTypeEnum.getItemByType(sourceType);

if (null != sparkDataType) {

targetType = sparkDataType.getHiveDataType().getType();

//时间戳字段强转成string字段

if(targetType.equals("timestamps")) targetType.equals("string");

} else {

targetType = HiveDataTypeEnum.STRING.getType();

}

dbTableColumns.add(columnName);

dbTableColumnTypeMap.put(columnName, targetType);

if (i != 0) {

createBuilder.append(",");

}

createBuilder.append(columnName).append(" ").append(targetType);

}

createBuilder.append(") PARTITIONED by (").append(partitionColumn)

.append(" STRING) ");

sparkSession.sql(createTableSQL);

2.3 对比字段

我们在2.2中,如果hive有字段了,那么就不会创建表。

问题在于,如果hive中的字段比mysql中的少怎么办?

2.3.1 获取hive中的表字段

HiveUtil connectionToHive = new HiveUtil("org.apache.hive.jdbc.HiveDriver", hiveUrl, hiveUser, hivePassword);

public List getTableColumns(String dbName,String tableName) throws SQLException {

ResultSet rs = null;

try {

if (!this.validateTableExist(tableName)) {

return null;

}

DatabaseMetaData metaData = connection.getMetaData();

rs = metaData.getColumns(null, dbName, tableName.toUpperCase(), "%");

List columns = new ArrayList();

while (rs.next()) {

columns.add(rs.getString("COLUMN_NAME").toLowerCase());

}

return columns;

} catch (SQLException e) {

throw e;

} finally {

if (null != rs) {

rs.close();

}

}

}

2.3.2 对比字段并且添加:

for (String dbTableColumn : dbTableColumns) {

if (StringUtil.hasCapital(dbTableColumn)) {

DingDingAlert.sendMsg(dbTableName + "的" + dbTableColumn + "是大写字段,替换成小写");

logger.warn(dbTableName + "的" + dbTableColumn + "是大写的,把他替换成小写");

sb.append("\n " + GetTime.getTimeStamp("yyyy-MM-dd HH:mm:ss") + "| WARN |" + "表" + hiveTableName + "在hive中不存在,程序关闭");

dbTableColumn = StringUtil.convertStringToLowerCase(dbTableColumn, false);

}

if (!hiveTableColumns.contains(dbTableColumn)) {

alterColumns.add(dbTableColumn);

}

}

2.4 将内存中的表存入hive

bizdateDS.createOrReplaceTempView(tmpTableName); //注意这里不是直接从mysql抽到hive,而是先从Mysql抽到内存中

insert hive_table select hive中的已经有的表的字段 from tmpTableName

##很明显的,如果不是需要和hive已经有的表交互根本用不到jdbc



推荐阅读
  • GreenDAO快速入门
    前言之前在自己做项目的时候,用到了GreenDAO数据库,其实对于数据库辅助工具库从OrmLite,到litePal再到GreenDAO,总是在不停的切换,但是没有真正去了解他们的 ... [详细]
  • 本文整理了Java中org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc.getTypeInfo()方法的一些代码示例,展 ... [详细]
  • 本文整理了Java中org.apache.pig.backend.executionengine.ExecException.<init>()方法的一些代码 ... [详细]
  • 本文介绍了解决java开源项目apache commons email简单使用报错的方法,包括使用正确的JAR包和正确的代码配置,以及相关参数的设置。详细介绍了如何使用apache commons email发送邮件。 ... [详细]
  • 开发笔记:(002)spring容器中bean初始化销毁时执行的方法及其3种实现方式
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了(002)spring容器中bean初始化销毁时执行的方法及其3种实现方式相关的知识,希望对你有一定的参考价值。 ... [详细]
  • 本文详细介绍了GetModuleFileName函数的用法,该函数可以用于获取当前模块所在的路径,方便进行文件操作和读取配置信息。文章通过示例代码和详细的解释,帮助读者理解和使用该函数。同时,还提供了相关的API函数声明和说明。 ... [详细]
  • 本文讨论了Alink回归预测的不完善问题,指出目前主要针对Python做案例,对其他语言支持不足。同时介绍了pom.xml文件的基本结构和使用方法,以及Maven的相关知识。最后,对Alink回归预测的未来发展提出了期待。 ... [详细]
  • 本文讨论了如何使用Web.Config进行自定义配置节的配置转换。作者提到,他将msbuild设置为详细模式,但转换却忽略了带有替换转换的自定义部分的存在。 ... [详细]
  • Activiti7流程定义开发笔记
    本文介绍了Activiti7流程定义的开发笔记,包括流程定义的概念、使用activiti-explorer和activiti-eclipse-designer进行建模的方式,以及生成流程图的方法。还介绍了流程定义部署的概念和步骤,包括将bpmn和png文件添加部署到activiti数据库中的方法,以及使用ZIP包进行部署的方式。同时还提到了activiti.cfg.xml文件的作用。 ... [详细]
  • 目录浏览漏洞与目录遍历漏洞的危害及修复方法
    本文讨论了目录浏览漏洞与目录遍历漏洞的危害,包括网站结构暴露、隐秘文件访问等。同时介绍了检测方法,如使用漏洞扫描器和搜索关键词。最后提供了针对常见中间件的修复方式,包括关闭目录浏览功能。对于保护网站安全具有一定的参考价值。 ... [详细]
  • 本文介绍了在Ubuntu系统中清理残余配置文件和无用内容的方法,包括清理残余配置文件、清理下载缓存包、清理不再需要的包、清理无用的语言文件和清理无用的翻译内容。通过这些清理操作可以节省硬盘空间,提高系统的运行效率。 ... [详细]
  • 项目运行环境配置及可行性分析
    本文介绍了项目运行环境配置的要求,包括Jdk1.8、Tomcat7.0、Mysql、HBuilderX等工具的使用。同时对项目的技术可行性、操作可行性、经济可行性、时间可行性和法律可行性进行了分析。通过对数据库的设计和功能模块的设计,确保系统的完整性和安全性。在系统登录、系统功能模块、管理员功能模块等方面进行了详细的介绍和展示。最后提供了JAVA毕设帮助、指导、源码分享和调试部署的服务。 ... [详细]
  • 感谢大家对IT十八掌大数据的支持,今天的作业如下:1.实践PreparedStament的CRUD操作。2.对比Statement和PreparedStatement的大批量操作耗时?(1 ... [详细]
  • MySQL锁--(深入浅出读书笔记)
    MySQL锁的概述1.针对不同的引擎,采用不同的锁机制;(表锁,页面锁,行锁)myisam和memory存储引擎:表级锁;BOB存储引擎:页面锁,表级 ... [详细]
  • 我有一个带有H2数据库的springboot应用程序。该应用程序会在启动时引导数据库,为此,我在 ... [详细]
author-avatar
互联网控军
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有