热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

借鉴sqoop实现hdfs文件内容导入mysql

这次需要将hadoopmr的计算结果导入到mysql中,虽然是mr的结果导入db中,为了保险起见,还是存在hdfs上,之后读取hdfs上的结果导入db中,读取失败可重新执行单个读取导入过程。一般先动

这次需要将hadoop mr的计算结果导入到mysql中,虽然是mr的结果导入db中,为了保险起见,还是存在hdfs上,之后读取hdfs上的结果导入db中,读取失败可重新执行单个读取导入过程。


一般先动手前,有个思路,再百度看看是否有更好的实现,大略搜了一下,发现sqoop貌似实现了hdfs和各种dc之间的读取写入。这里,因为业务简单,都是insert语句不涉及事务,只是连接一个db,不涉及mr等操作,so我只是借鉴sqoop的思想,没有使用sqoop。


一般实现的思路就是,读取hdfs文件,生成对应的insert语句,导入mysql就好了。

其中需要详细考虑的几个问题如下:

1、批量导入insert,一般的数据量设置多大好些?

2、执行一般失败后重新导入数据,对于已经导入的数据如何处理?



这里的话,当然批量导入会好些,但是也要考虑hdfs的reader是一个个读取数据,如果批量导入的size太大,需要存储数据的变量占用的内存大,会导致oom。

一般批量insert的量不超过1000条就好了,我这边的话,每天的量也就2000条,so我设置成500条了。


执行一般失败后重新导入数据,感觉要先删除已有导入的数据,再次导入。因为我这边每天导入一次,根据日期做delete就行了。对于不同场景,需要自己操作了。

如果是mysql数据库的话,不用自己删除了。

因为有个语句   INSERT INTO .. ON DUPLICATE KEY UPDATE  ,如果执行时不存在重复记录,则执行新增操作,否则执行更新操作。详细的内容可以读http://www.jb51.net/article/39255.htm

如果db是mysql的话,就不用自己先删除数据了。



另 INSERT INTO TABLE (a,c) VALUES (1,3),(1,7) ON DUPLICATE KEY UPDATE c=VALUES(c);

上语句比单独执行2条语句快。



本人才疏学浅,有错误之处请不吝指教。

关于insert的数量,很有必要深入研究下。





至此说完。。。



------------------------------

贴些代码(部分是sqoop中的源码改编)



    /**
* 如果记录重复,则执行更新操作,否则执行新增数据操作
* @param numRows
* @return
*/
protected String getUpdateStatement(int numRows) {
boolean first;
StringBuilder sb = new StringBuilder();
sb.append("INSERT INTO ");
sb.append(tableName);
sb.append("(");
first = true;
for (String column : columnNames) {
if (first) {
first = false;
} else {
sb.append(", ");
}
sb.append(column);
}

sb.append(") VALUES(");
for (int i = 0; i if (i > 0) {
sb.append("),(");
}
for (int j = 0; j if (j > 0) {
sb.append(", ");
}
sb.append("?");
}
}

sb.append(") ON DUPLICATE KEY UPDATE ");

first = true;
for (String column : columnNames) {
if (first) {
first = false;
} else {
sb.append(", ");
}

sb.append(column).append("=VALUES(").append(column).append(")");
}

String query = sb.toString();
LOG.debug("Using upsert query: " + query);
return query;
}


    protected PreparedStatement getPreparedStatement(List userRecords) throws SQLException {

PreparedStatement stmt = null;

// Synchronize on connection to ensure this does not conflict
// with the operations in the update thread.
Connection cOnn= ConnectionRelate.getConn();
stmt = conn.prepareStatement(getUpdateStatement(userRecords.size()));
// Inject the record parameters into the UPDATE and WHERE clauses. This
// assumes that the update key column is the last column serialized in
// by the underlying record. Our code auto-gen process for exports was
// responsible for taking care of this constraint.
int i = 0;
for (Writable record : userRecords) {
setParam(stmt, i, record);
i += columnNames.length;
}
stmt.addBatch();

return stmt;
}



 try {
reader = new SequenceFile.Reader(FileSystem.get(new Configuration()), path, new Configuration());
StringTriWritable key = new StringTriWritable();
Writable value = (MediaScoreVector) reader.getValueClass().newInstance();
while (reader.next(key, value)) {
list.add(value);
index++;
if(index == batchSize){
index = 0;
//exp the data to the mysql
stmt = mediaSQL.getPreparedStatement(list);
stmt.executeBatch();

list.clear();
}
}

//deal with size less batchsize
if(list.size()>0) {
stmt = mediaSQL.getPreparedStatement(list);
stmt.executeBatch();

list.clear();
list = null;
}


} catch (Exception e) {
。。。。
} finally {
if(stmt!=null){

。。。。
}
。。。。
}



至此说完。。。


本人才疏学浅,有错误之处请不吝指教。

关于insert的数量,很有必要深入研究下。


推荐阅读
  • 前景:当UI一个查询条件为多项选择,或录入多个条件的时候,比如查询所有名称里面包含以下动态条件,需要模糊查询里面每一项时比如是这样一个数组条件:newstring[]{兴业银行, ... [详细]
  • 本文介绍了Oracle数据库中tnsnames.ora文件的作用和配置方法。tnsnames.ora文件在数据库启动过程中会被读取,用于解析LOCAL_LISTENER,并且与侦听无关。文章还提供了配置LOCAL_LISTENER和1522端口的示例,并展示了listener.ora文件的内容。 ... [详细]
  • 本文介绍了将mysql从5.6.15升级到5.7.15的详细步骤,包括关闭访问、备份旧库、备份权限、配置文件备份、关闭旧数据库、安装二进制、替换配置文件以及启动新数据库等操作。 ... [详细]
  • 本文介绍了在CentOS上安装Python2.7.2的详细步骤,包括下载、解压、编译和安装等操作。同时提供了一些注意事项,以及测试安装是否成功的方法。 ... [详细]
  • 一、Hadoop来历Hadoop的思想来源于Google在做搜索引擎的时候出现一个很大的问题就是这么多网页我如何才能以最快的速度来搜索到,由于这个问题Google发明 ... [详细]
  • 在说Hibernate映射前,我们先来了解下对象关系映射ORM。ORM的实现思想就是将关系数据库中表的数据映射成对象,以对象的形式展现。这样开发人员就可以把对数据库的操作转化为对 ... [详细]
  • 本文介绍了使用postman进行接口测试的方法,以测试用户管理模块为例。首先需要下载并安装postman,然后创建基本的请求并填写用户名密码进行登录测试。接下来可以进行用户查询和新增的测试。在新增时,可以进行异常测试,包括用户名超长和输入特殊字符的情况。通过测试发现后台没有对参数长度和特殊字符进行检查和过滤。 ... [详细]
  • 使用Ubuntu中的Python获取浏览器历史记录原文: ... [详细]
  • ALTERTABLE通过更改、添加、除去列和约束,或者通过启用或禁用约束和触发器来更改表的定义。语法ALTERTABLEtable{[ALTERCOLUMNcolu ... [详细]
  • 在使用dedecms过程中,添加自定义字段变量很有用,但删除并不容易。本文介绍了两种常用的删除方法:执行SQL语句和手动SQL删除。 ... [详细]
  • web.py开发web 第八章 Formalchemy 服务端验证方法
    本文介绍了在web.py开发中使用Formalchemy进行服务端表单数据验证的方法。以User表单为例,详细说明了对各字段的验证要求,包括必填、长度限制、唯一性等。同时介绍了如何自定义验证方法来实现验证唯一性和两个密码是否相等的功能。该文提供了相关代码示例。 ... [详细]
  • 树莓派语音控制的配置方法和步骤
    本文介绍了在树莓派上实现语音控制的配置方法和步骤。首先感谢博主Eoman的帮助,文章参考了他的内容。树莓派的配置需要通过sudo raspi-config进行,然后使用Eoman的控制方法,即安装wiringPi库并编写控制引脚的脚本。具体的安装步骤和脚本编写方法在文章中详细介绍。 ... [详细]
  • 本文记录了在vue cli 3.x中移除console的一些采坑经验,通过使用uglifyjs-webpack-plugin插件,在vue.config.js中进行相关配置,包括设置minimizer、UglifyJsPlugin和compress等参数,最终成功移除了console。同时,还包括了一些可能出现的报错情况和解决方法。 ... [详细]
  • GreenDAO快速入门
    前言之前在自己做项目的时候,用到了GreenDAO数据库,其实对于数据库辅助工具库从OrmLite,到litePal再到GreenDAO,总是在不停的切换,但是没有真正去了解他们的 ... [详细]
  • 分享css中提升优先级属性!important的用法总结
    web前端|css教程css!importantweb前端-css教程本文分享css中提升优先级属性!important的用法总结微信门店展示源码,vscode如何管理站点,ubu ... [详细]
author-avatar
mmm3310245179
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有