热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

hive的实践部分

一.hive的事务(1)什么是事务要知道hive的事务,首先要知道什么是transaction(事务)?事务就是一组单元化操作,这些操作要么都执行,要么都不执行,是一个不可分割的工
 

一.hive的事务

(1)什么是事务

要知道hive的事务,首先要知道什么是transaction(事务)?事务就是一组单元化操作,这些操作要么都执行,要么都不执行,是一个不可分割的工作单位。

事务有四大特性:A、C、I、D (原子性、一致性、隔离性、持久性)

 Atomicity: 不可再分割的工作单位,事务中的所有操作要么都发,要么都不发。

Consistency: 事务开始之前和事务结束以后,数据库的完整性约束没有被破坏。这是说数据库事务不能破坏关系数据的完整性以及业务逻辑上的 一致性。 

Isolation: 多个事务并发访问,事务之间是隔离的

Durability: 意味着在事务完成以后,该事务锁对数据库所作的更改便持久的保存在数据库之中,并不会被回滚。 

(2)hive事务的特点与局限性

从hive的0.14版本开始支持低等级的事务

支持事务的增删改查,从hive的2.2版本开始,开始支持merge

不支持事务的begin、commit以及rollback(事务的回滚)

不支持使用update更新分桶列和分区列

想使用事务的话,文件格式必须是ORC

需要压缩工作,需要时间,资源和空间

支持S(共享锁)和X(排它锁)

不允许从一个非ACID连接写入/读取ACID表

(3)hive的事务开启

hive的事务开启有三种方式:a.通过Ambari UI-Hive Config 

               b.通过hive-xml 的配置文件添加如下内容

  
hive.support.concurrency  
true  
  
  
hive.txn.manager 
org.apache.hadoop.hive.ql.lockmgr.DbTxnManager 

  

                                     c.通过命令行,在beeline这种交互式环境下:

set hive.support.cOncurrency= true; 
set hive.enforce.bucketing = true; 
set hive.exec.dynamic.partition.mode = nonstrict; 
set hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; 
set hive.compactor.initiator.on = true; 
set hive.compactor.worker.threads = 1;

(4)hive的merge

merge的语法:

MERGE INTO AS T USING AS S ON

WHEN MATCHED [AND ] THEN UPDATE SET

WHEN MATCHED [AND ] THEN DELETE

WHEN NOT MATCHED [AND ] THEN INSERT VALUES

merge的局限性:

最多三条when语句,只支持update/delete/insert。when not matched 必须在when语句的最后面。

如果出现update和delete的时候 ,两个条件是分开的,而且必须在条件前面加上AND.像 [AND ]

(5)例子

a.创建两个事务表

CREATE TABLE IF NOT EXISTS employee ( 
emp_id int,  
emp_name string,  
dept_name string, 
work_loc string )  
PARTITIONED BY (start_date string) 
CLUSTERED BY (emp_id) INTO 2 BUCKETS STORED AS ORC TBLPROPERTIES('transactional'='true'); 


create table employee_state(
emp_id int,
emp_name string,
dept_name string,
work_loc string,
start_date string,
state string
)
STORED AS ORC;

b.开启事务(见上面的开启事务的c,一般有些默认的设置是开的,我这里就只开了自动分区和分桶)

c.插入数据

 

INSERT INTO table employee PARTITION (start_date) VALUES (1,'Will','IT','Toronto','20100701'), 
(2,'Wyne','IT','Toronto','20100701'), 
(3,'Judy','HR','Beijing','20100701'), 
(4,'Lili','HR','Beijing','20101201'), 
(5,'Mike','Sales','Beijing','20101201'), (6,'Bang','Sales','Toronto','20101201'), (7,'Wendy','Finance','Beijing','20101201');

insert into table employee_state values
(2,’Wyne’,’IT’,’Beijing’,’20100701’,’update’),
(4,’Lili’,’HR’,’Beijing’,’20101201’,’quit’),
(8,’James’,’IT’,’Toronto’,’20170101’,’new’)

d.检验数据是否被插入

 

e.这里通过merge操作,完成更新、删除、插入操作。

employe字段解释:id为2的员工之前的工作地在Toronto,现在在Beijing,state的状态为update。所以需要更新表employee中员工2的信息

id为4的员工的state状态为quit,说明目前员工已经离职,所以需要在employee表中删除关于id为4的员工的信息。

id为8的员工的state状态为new,说明是新员工,所以需要插入empoyee中。

MERGE INTO employee AS T
USING employee_state AS S
ON T.emp_id = S.emp_id and T.start_date = S.start_date
WHEN MATCHED AND S.state = 'update' THEN UPDATE SET dept_name = S.dept_name,work_loc = S.work_loc
WHEN MATCHED AND S.state = 'quit' THEN DELETE
WHEN NOT MATCHED THEN INSERT VALUES(S.emp_id,S.emp_name,S.dept_name,S.work_loc,S.start_date);
--这里目标表为employee,源表为employee_state
--这里新员工是属于第三中情况,未在目标表中匹配到,所以直接插入到目标表中。

  

二.hive的udf

(1)什么是hive的udf?

User-defined function (UDF): 这提供了一种使用外部函数(在Java中)扩展功能的方法,可以在HQL中进行评估

(2)hive的udf分类

hive的udf一般分为三种:

  a.UDF:用户定义的简单函数,按行操作并为一行输出一个结果,例如大多数内置数学和字符串函数

  b.UDAF: 用户定义的聚合函数,按行或按组操作,并为每个组输出一行或一行,例如MAX和COUNT内置函数。

  c.UDTF:用户定义的表生成函数也按行运行,但结果会生成多行/表,例如EXPLODE函数。 UDTF可以在SELECT之后或在LATERAL VIEW语句之后使用。

 (3)hive的udf使用举例

  a.对于hive的udf,这里我写了一个把字符串的大写全部换成小写和一个判断字符串是否在一个array数组里面的函数

--将字符串的所有大写改成小写
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
 
public final class StringLower extends UDF {
  public Text evaluate(final Text s) {
    if (s == null) { return null; }
    return new Text(s.toString().toLowerCase());
  }
}
--判断当前字符串是否在数组里面
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.BooleanWritable;

@Description(name = "Arraycontains",
        value="_FUNC_(array, value) - Returns TRUE if the array contains value.",
        extended="Example:\n"
                + "  > SELECT _FUNC_(array(1, 2, 3), 2) FROM src LIMIT 1;\n"
                + "  true")


public class ArrayContains extends GenericUDF {
    private static final int ARRAY_IDX = 0;
    private static final int VALUE_IDX = 1;
    private static final int ARG_COUNT = 2;//这个udf函数需要参数的个数
    private static final String FUNC_NAME = "ARRAYCONTAINS";//外部名字

    private transient  ObjectInspector valueOI;
    private transient ListObjectInspector arrayOI;
    private transient ObjectInspector arrayElementOI;
    private BooleanWritable result;
    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
        //检查是否传入了两个参数
        if (arguments.length != ARG_COUNT) {
            throw new UDFArgumentException("the function" + FUNC_NAME + "accepts"
                    + ARG_COUNT + "arguments");
        }
        //检查参数是否是属于LIST类型
        if (!arguments[ARRAY_IDX].getCategory().equals(ObjectInspector.Category.LIST)) {
            throw new UDFArgumentTypeException(ARRAY_IDX, "\"" + serdeConstants.LIST_TYPE_NAME + "\""
                    + "expected at function ARRAY_CONTAINS,but"
                    + "\"" + arguments[ARRAY_IDX].getTypeName() + "\""
                    + "is found");

        }

        arrayOI = (ListObjectInspector) arguments[ARRAY_IDX];
        arrayElementOI = arrayOI.getListElementObjectInspector();

        valueOI = arguments[VALUE_IDX];

        //检查list的元素和传入的值是否属于同一个类型
        if (!ObjectInspectorUtils.compareTypes(arrayElementOI, valueOI)) {
            throw new UDFArgumentTypeException(VALUE_IDX, "\"" + arrayElementOI.getTypeName() + "\""
                    + "expectd at function ARRAY_CONTAINS,but"
                    + "\"" + valueOI.getTypeName() + "\""
                    + "is found");
        }

        //检查此类型是否支持比较
        if (!ObjectInspectorUtils.compareSupported(valueOI)){
            throw new UDFArgumentException("this function" + FUNC_NAME
                    +"does not support comparison for"
                    +"\"" + valueOI.getTypeName() + "\""
                    + "types");

        }
        result = new BooleanWritable(false);
        return PrimitiveObjectInspectorFactory.writableBooleanObjectInspector;
    }
    @Override
    public Object evaluate(DeferredObject[] arguments) throws HiveException {
       result.set(false);
       Object array = arguments[ARRAY_IDX].get();
       Object value = arguments[VALUE_IDX].get();

       int arrayLength = arrayOI.getListLength(array);

       //检查数组是否null还是空value是否为null
        if (value == null || arrayLength <= 0)//判断value是否为空,若真则不判断右边,不然为假继续判断右边
        {
            return result;//满足条件直接返回result初始状态值
        }

        //将值与数组的每个元素进行比较,直到找到匹配项
        for (int i=0;i 

  b.然后通过编译器打包到hdfs文件系统上,通过执行hive命令构造函数

DROP FUNCTION IF EXISTS str_lower; 
DROP FUNCTION IF EXISTS Array_contains; 
CREATE FUNCTION str_lower AS 'com.data.hiveudf.udf.StringLower'  
USING JAR 'hdfs:////apps/hive/functions/df-hiveudf-1.0-SNAPSHOT.jar'; 
CREATE FUNCTION Array_contains AS 'com.data.hiveudf.gudf.ArrayContains'  
USING JAR 'hdfs:////apps/hive/functions/df-hiveudf-1.0-SNAPSHOT.jar';

  c.使用自定义函数

这里使用了另一库里的一张employee表,里面使用了string类型、array类型...。表描述与内容如下:

然后使用str_lower的函数:

使用Array_contains函数

 


推荐阅读
  • 本文介绍了Oracle数据库中tnsnames.ora文件的作用和配置方法。tnsnames.ora文件在数据库启动过程中会被读取,用于解析LOCAL_LISTENER,并且与侦听无关。文章还提供了配置LOCAL_LISTENER和1522端口的示例,并展示了listener.ora文件的内容。 ... [详细]
  • NotSupportedException无法将类型“System.DateTime”强制转换为类型“System.Object”
    本文介绍了在使用LINQ to Entities时出现的NotSupportedException异常,该异常是由于无法将类型“System.DateTime”强制转换为类型“System.Object”所导致的。同时还介绍了相关的错误信息和解决方法。 ... [详细]
  • 如何自行分析定位SAP BSP错误
    The“BSPtag”Imentionedintheblogtitlemeansforexamplethetagchtmlb:configCelleratorbelowwhichi ... [详细]
  • 基于PgpoolII的PostgreSQL集群安装与配置教程
    本文介绍了基于PgpoolII的PostgreSQL集群的安装与配置教程。Pgpool-II是一个位于PostgreSQL服务器和PostgreSQL数据库客户端之间的中间件,提供了连接池、复制、负载均衡、缓存、看门狗、限制链接等功能,可以用于搭建高可用的PostgreSQL集群。文章详细介绍了通过yum安装Pgpool-II的步骤,并提供了相关的官方参考地址。 ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • 关于我们EMQ是一家全球领先的开源物联网基础设施软件供应商,服务新产业周期的IoT&5G、边缘计算与云计算市场,交付全球领先的开源物联网消息服务器和流处理数据 ... [详细]
  • http:my.oschina.netleejun2005blog136820刚看到群里又有同学在说HTTP协议下的Get请求参数长度是有大小限制的,最大不能超过XX ... [详细]
  • 本文讨论了在openwrt-17.01版本中,mt7628设备上初始化启动时eth0的mac地址总是随机生成的问题。每次随机生成的eth0的mac地址都会写到/sys/class/net/eth0/address目录下,而openwrt-17.01原版的SDK会根据随机生成的eth0的mac地址再生成eth0.1、eth0.2等,生成后的mac地址会保存在/etc/config/network下。 ... [详细]
  • r2dbc配置多数据源
    R2dbc配置多数据源问题根据官网配置r2dbc连接mysql多数据源所遇到的问题pom配置可以参考官网,不过我这样配置会报错我并没有这样配置将以下内容添加到pom.xml文件d ... [详细]
  • web.py开发web 第八章 Formalchemy 服务端验证方法
    本文介绍了在web.py开发中使用Formalchemy进行服务端表单数据验证的方法。以User表单为例,详细说明了对各字段的验证要求,包括必填、长度限制、唯一性等。同时介绍了如何自定义验证方法来实现验证唯一性和两个密码是否相等的功能。该文提供了相关代码示例。 ... [详细]
  • 本文介绍了在使用FIS配置过程中遇到的问题以及解决方法。作者发现在配置roadmap时使用命令行参数出现了诡异现象,uglify了js文件后,html中对js的引用没有被修改。经过多次尝试和验证,联系了FIS开发人员后才得知,使用fis.config.merge会导致一些问题。通过将fis.config.merge改为fis.config.get('roadmap.path').unshift()来添加配置,问题得以解决。文章指出FIS官方文档解释不够详细,提供了解决问题的方法。 ... [详细]
  • 本文介绍了在Python张量流中使用make_merged_spec()方法合并设备规格对象的方法和语法,以及参数和返回值的说明,并提供了一个示例代码。 ... [详细]
  • 本文介绍了C#中数据集DataSet对象的使用及相关方法详解,包括DataSet对象的概述、与数据关系对象的互联、Rows集合和Columns集合的组成,以及DataSet对象常用的方法之一——Merge方法的使用。通过本文的阅读,读者可以了解到DataSet对象在C#中的重要性和使用方法。 ... [详细]
  • 本文介绍了OC学习笔记中的@property和@synthesize,包括属性的定义和合成的使用方法。通过示例代码详细讲解了@property和@synthesize的作用和用法。 ... [详细]
  • 本文介绍了多因子选股模型在实际中的构建步骤,包括风险源分析、因子筛选和体系构建,并进行了模拟实证回测。在风险源分析中,从宏观、行业、公司和特殊因素四个角度分析了影响资产价格的因素。具体包括宏观经济运行和宏经济政策对证券市场的影响,以及行业类型、行业生命周期和行业政策对股票价格的影响。 ... [详细]
author-avatar
天崖人B
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有