自定义函数

为了满足客户个性化的需求,Hive 被设计成一个很开放的系统,很多内容都支持用户定制,包括:

  • 文件格式:Text File,Sequence File
  • 内存中的数据格式: Java Integer/String, Hadoop IntWritable/Text
  • 用户提供的 map/reduce 脚本:不管什么语言,利用 stdin/stdout 传输数据
  • 用户自定义函数

User Defined Function

用户可以使用‘show functions’ 查看 function list,可以使用’describe function function-name’查看函数说明。

hive> show functions;
OK
!
!=
......
Time taken: 0.275 seconds
hive> desc function substr;
OK
substr(str, pos[, len]) - returns the substring of str that starts at pos and is of length len orsubstr(bin, pos[, len]) - returns the slice of byte array that starts at pos and is of length len
Time taken: 0.095 seconds

hive 提供的 build-in 函数包括以下几类:

  1. 关系操作符:包括 =、<>、<=、>=等
  2. 算数操作符:包括 +、-、*、/等
  3. 逻辑操作符:包括 AND、&&、OR、|| 等
  4. 复杂类型构造函数:包括 map、struct、create_union 等
  5. 复杂类型操作符:包括 A[n]、Map[key]、S.x
  6. 数学操作符:包括 ln(double a)、sqrt(double a)等
  7. 集合操作符:包括 size(Array)、sort_array(Array)等
  8. 类型转换函数: binary(string|binary)、cast(expr as )
  9. 日期函数:包括 from_unixtime(bigint unixtime[, string format])、unix_timestamp()等 10.条件函数:包括 if(boolean testCondition, T valueTrue, T valueFalseOrNull)等
  10. 字符串函数:包括 acat(string|binary A, string|binary B…)等
  11. 其他:xpath、get_json_objectscii(string str)、con

Definition

编写 Hive UDF 有两种方式:

  1. extends UDF,重写 evaluate 方法
  2. extends GenericUDF,重写 initialize、getDisplayString、evaluate 方法

Extends UDF

如下的用法是将大写转化为小写

    package test.udf;

    import org.apache.hadoop.hive.ql.exec.UDF;
    import org.apache.hadoop.io.Text;

    public class ToLowerCase extends UDF {
        public Text evaluate(final Text s) {
            if (s == null) { return null; }
            return new Text(s.toString().toLowerCase());
        }
    }

Extends GenericUDF

计算 array 中去重后元素个数

    package test.udf;
    import org.apache.hadoop.hive.ql.exec.Description;
    import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
    import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
    import org.apache.hadoop.hive.ql.metadata.HiveException;
    import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
    import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
    import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
    import org.apache.hadoop.io.IntWritable;

    /**
     * UDF:
     * Get nubmer of objects with duplicate elements eliminated

     */
    @Description(name = "array_uniq_element_number", value = "_FUNC_(array) - Returns nubmer of objects with duplicate elements eliminated.", extended = "Example:\n"
                    + "  > SELECT _FUNC_(array(1, 2, 2, 3, 3)) FROM src LIMIT 1;\n" + "  3")
    public class UDFArrayUniqElementNumber extends GenericUDF {

            private static final int ARRAY_IDX = 0;
            private static final int ARG_COUNT = 1; // Number of arguments to this UDF
            private static final String FUNC_NAME = "ARRAY_UNIQ_ELEMENT_NUMBER"; // External Name

            private ListObjectInspector arrayOI;
            private ObjectInspector arrayElementOI;
            private final IntWritable result = new IntWritable(-1);

            public ObjectInspector initialize(ObjectInspector[] arguments)
                            throws UDFArgumentException {

                    // Check if two arguments were passed
                    if (arguments.length != ARG_COUNT) {
                            throw new UDFArgumentException("The function " + FUNC_NAME
                                            + " accepts " + ARG_COUNT + " arguments.");
                    }

                    // Check if ARRAY_IDX argument is of category LIST
                    if (!arguments[ARRAY_IDX].getCategory().equals(Category.LIST)) {
                            throw new UDFArgumentTypeException(ARRAY_IDX, "\""
                                            + org.apache.hadoop.hive.serde.Constants.LIST_TYPE_NAME
                                            + "\" " + "expected at function ARRAY_CONTAINS, but "
                                            + "\"" + arguments[ARRAY_IDX].getTypeName() + "\" "
                                            + "is found");
                    }

                    arrayOI = (ListObjectInspector) arguments[ARRAY_IDX];
                    arrayElementOI = arrayOI.getListElementObjectInspector();

                    return PrimitiveObjectInspectorFactory.writableIntObjectInspector;
            }

            public IntWritable evaluate(DeferredObject[] arguments)
                            throws HiveException {

                    result.set(0);

                    Object array = arguments[ARRAY_IDX].get();
                    int arrayLength = arrayOI.getListLength(array);
                    if (arrayLength <= 1) {
                            result.set(arrayLength);
                            return result;
                    }

                    //element compare; Algorithm complexity: O(N^2)
                    int num = 1;
                    int i, j;
                    for(i = 1; i < arrayLength; i++)
                    {
                            Object listElement = arrayOI.getListElement(array, i);
                            for(j = i - 1; j >= 0; j--)
                            {
                                    if (listElement != null) {
                                            Object tmp = arrayOI.getListElement(array, j);
                                            if (ObjectInspectorUtils.compare(tmp, arrayElementOI, listElement,
                                                            arrayElementOI) == 0) {
                                                    break;
                                            }
                                    }
                            }
                            if(-1 == j)
                            {
                                    num++;
                            }
                    }

                    result.set(num);
                    return result;
            }

            public String getDisplayString(String[] children) {
                    assert (children.length == ARG_COUNT);
                    return "array_uniq_element_number(" + children[ARRAY_IDX]+ ")";
            }
    }

Usage

临时添加 UDF

hive> select * from test;
OK
Hello
wORLD
ZXM
ljz
Time taken: 13.76 seconds
hive> add jar /home/work/udf.jar;
Added /home/work/udf.jar to class path
Added resource: /home/work/udf.jar
hive> create temporary function mytest as 'test.udf.ToLowerCase';
OK
Time taken: 0.103 seconds
hive> show functions;
......
mytest
......
hive> select mytest(test.name) from test;
......
OK
hello
world
zxm
ljz
Time taken: 38.218 seconds

这种方式在会话结束后,函数自动销毁,因此每次打开新的会话,都需要重新 add jar 并且 create temporary function。

进入会话前自动创建

使用 hive -i 参数在进入 hive 时自动初始化

    $ cat hive_init
    add jar /home/work/udf.jar;
    create temporary function mytest as 'test.udf.ToLowerCase';
    $ hive -i hive_init
    Logging initialized using configuration in file:/home/work/hive/hive-0.8.1/conf/hive-log4j.properties
    Hive history file=/tmp/work/hive_job_log_work_201209200147_1951517527.txt
    hive> show functions;
    ......
    mytest
    ......
    hive> select mytest(test.name) from test;
    ......
    OK
    hello
    world
    zxm
    ljz

方法 2 和方法 1 本质上是相同的,区别在于方法 2 在会话初始化时自动完成

注册为内部函数

和前两者相比,第三种方式直接将用户的自定义函数作为注册为内置函数,未来使用起来非常简单,但这种方式也非常危险,一旦出错,将是灾难性的,因此,建议如果不是特别通用,并且固化下来的函数,还是使用前两种方式比较靠谱。

UDAF

Hive 查询数据时,有些聚类函数在 HQL 没有自带,需要用户自定义实现 •用户自定义聚合函数: Sum, Average…… n – 1 •UDAF(User- Defined Aggregation Funcation) 用法 •一下两个包是必须的 import org.apache.hadoop.hive.ql.exec.UDAF 和 org.apache.hadoop.hive.ql.exec.UDAFEvaluator 开发步骤 •函数类需要继承 UDAF 类,内部类 Evaluator 实 UDAFEvaluator 接口 •Evaluator 需要实现 init、iterate、terminatePartial、merge、terminate 这几个函数 a)init 函数实现接口 UDAFEvaluator 的 init 函数。 b)iterate 接收传入的参数,并进行内部的轮转。其返回类型为 boolean。 c)terminatePartial 无参数,其为 iterate 函数轮转结束后,返回轮转数据,terminatePartial 类似于 hadoop 的 Combiner。 d)merge 接收 terminatePartial 的返回结果,进行数据 merge 操作,其返回类型为 boolean。 e)terminate 返回最终的聚集函数结果。 执行步骤 •执行求平均数函数的步骤 a)将 java 文件编译成 Avg_test.jar。 b)进入 hive 客户端添加 jar 包: hive>add jar /run/jar/Avg_test.jar。 c)创建临时函数: hive>create temporary function avg_test ‘hive.udaf.Avg’; d)查询语句: hive>select avg_test(scores.math) from scores; e)销毁临时函数: hive>drop temporary function avg_test;

UDAF 代码示例 public class MyAvg extends UDAF {

public static class AvgEvaluator implements UDAFEvaluator { } public void init() {} public boolean iterate(Double o) {} public AvgState terminatePartial() {} public boolean terminatePartial(Double o) { } public Double terminate() {}

}

UDTF

UDTF(User-Defined Table-Generating Functions) 用来解决 输入一行输出多行(On-to-many maping) 的需求。 开发步骤 •UDTF 步骤: •必须继承 org.apache.Hadoop.hive.ql.udf.generic.GenericUDTF •实现 initialize, process, close 三个方法 •UDTF 首先会 •调用 initialize 方法,此方法返回 UDTF 的返回行的信息(返回个数,类型) 初始化完成后,会调用 process 方法,对传入的参数进行处理,可以通过 forword()方法把结果返回 •最后 close()方法调用,对需要清理的方法进行清理 使用方法 •UDTF 有两种使用方法,一种直接放到 select 后面,一种和 lateral view 一起使用 •直接 select 中使用:select explodemap(properties) as (col1,col2) from src; •不可以添加其他字段使用:select a, explode_map(properties) as (col1,col2) from src •不可以嵌套调用:select explode_map(explode_map(properties)) from src •不可以和 group by/cluster by/distribute by/sort by 一起使用:select explode_map(properties) as (col1,col2) from src group by col1, col2 •和 lateral view 一起使用:select src.id, mytable.col1, mytable.col2 from src lateral view explode_map(properties) mytable as col1, col2; 此方法更为方便日常使用。执行过程相当于单独执行了两次抽取,然后 union 到一个表里。 lateral view • Lateral View 语法 •lateralView: LATERAL VIEW udtf(expression) tableAlias AS columnAlias (’,’ columnAlias) fromClause: FROM baseTable (lateralView)_

•Lateral View 用于 UDTF(user-defined table generating functions)中将行转成列,例如 explode(). •目前 Lateral View 不支持有上而下的优化。如果使用 Where 子句,查询可能将不被编译。解决方法见: 此时,在查询之前执行 set hive.optimize.ppd=false; • 例子 •pageAds。它有两个列 string pageid Array adid_list " front_page" [1, 2, 3] “contact_page " [ 3, 4, 5] •SELECT pageid, adid FROM pageAds LATERAL VIEW explode(adid_list) adTable AS adid; •将输出如下结果 string pageid int adid “front_page” 1 ……. “contact_page” 3

代码示例 public class MyUDTF extends GenericUDTF{ public StructObjectInspector initialize(ObjectInspector[] args) {} public void process(Object[] args) throws HiveException { } }

上一页