hive udf 使用方法

Hive系列文章

  1. Hive表的基本操作
  2. Hive中的集合數(shù)據(jù)類(lèi)型
  3. Hive動(dòng)態(tài)分區(qū)詳解
  4. hive中orc格式表的數(shù)據(jù)導(dǎo)入
  5. Java通過(guò)jdbc連接hive
  6. 通過(guò)HiveServer2訪問(wèn)Hive
  7. SpringBoot連接Hive實(shí)現(xiàn)自助取數(shù)
  8. hive關(guān)聯(lián)hbase表
  9. Hive udf 使用方法
  10. Hive基于UDF進(jìn)行文本分詞
  11. Hive窗口函數(shù)row number的用法
  12. 數(shù)據(jù)倉(cāng)庫(kù)之拉鏈表

hive作為一個(gè)sql查詢(xún)引擎,自帶了一些基本的函數(shù),比如count(計(jì)數(shù)),sum(求和),有時(shí)候這些基本函數(shù)滿足不了我們的需求,這時(shí)候就要寫(xiě)hive hdf(user defined funation),又叫用戶自定義函數(shù),應(yīng)用與select 語(yǔ)句中。

哪些情況滿足不了我們的需求呢,比如:

  • 需要將字段與數(shù)據(jù)庫(kù)中查詢(xún)一下,做個(gè)比對(duì);
  • 需要對(duì)數(shù)據(jù)進(jìn)行復(fù)雜處理;
  • 等等

hive udf 用法

下面是一個(gè)判斷hive表字段是否包含'100'的簡(jiǎn)單udf:

package com.js.dataclean.hive.udf.hm2

import org.apache.hadoop.hive.ql.exec.UDF;

public class IsContains100 extends UDF{

    public String evaluate(String s){

        if(s == null || s.length() == 0){
            return "0";
        }

        return s.contains("100")?"1":"0";
    }
}
Java

使用maven將其打包,進(jìn)入hive cli,輸入命令:

add jar /home/hadoop/codejar/flash_format.jar;
create temporary function isContains100 as 'com.js.dataclean.hive.udf.hm2.IsContains100';
SQL

創(chuàng)建完臨時(shí)函數(shù),即可使用這個(gè)函數(shù)了:

select isContains100('abc100def') from table limit 1;
1
SQL

hive udf 創(chuàng)建與使用步驟

  • 繼承org.apache.hadoop.hive.ql.exec.UDF類(lèi),實(shí)現(xiàn)evaluate方法;
  • 打包上傳到集群,通過(guò)create temporary function創(chuàng)建臨時(shí)函數(shù),不加temporary就創(chuàng)建了一個(gè)永久函數(shù);
  • 通過(guò)select 語(yǔ)句使用;

下面是一個(gè)例子,通過(guò)讀取mysql數(shù)據(jù)庫(kù)中的規(guī)則,為hive中的workflow返回對(duì)應(yīng)的,類(lèi)型:

type workflow
a   1
a   2
b   11
b   22
b   33

我們希望,將hive的某一個(gè)字段取值為,1,2的變?yōu)?code>a,取值為11,22,33的全部變?yōu)?code>b,就是歸類(lèi)的意思。
這個(gè)udf可以這么實(shí)現(xiàn):

package com.js.dataclean.hive.udf.hm2.workflow;

import org.apache.hadoop.hive.ql.exec.UDF;

import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @ Author: keguang
 * @ Date: 2018/12/13 16:24
 * @ version: v1.0.0
 * @ description:
 */
public class GetWorkflow extends UDF{

    private static final String host = "0.0.0.0";
    private static final String port = "3306";
    private static final String database = "root";
    private static final String userName = "root";
    private static final String password = "123456";
    private static String url = "";
    private static final String driver = "com.mysql.jdbc.Driver";
    private static Connection conn = null;
    private static Map<String, List<String>> workflowType = null;

    static {
        url = "jdbc:mysql://" + host + ":" + port + "/" + database;
        try {
            Class.forName(driver);
            conn = DriverManager.getConnection(url, userName, password);
            workflowType = getWorkflowType(conn);
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    private static Map<String, List<String>> getWorkflowType(Connection conn){
        Map<String, List<String>> workflowType = new HashMap<>();
        String sql = "select * from flash_player_workflow";
        PreparedStatement ps = null;
        try {
            ps = conn.prepareStatement(sql);
            ResultSet rs = ps.executeQuery();
            while (rs.next()){
                String workflow = rs.getString("workflow");
                String type = rs.getString("flag");

                List<String> workflows = workflowType.get(type);
                if(workflows == null){
                    workflows = new ArrayList<>();
                }
                workflows.add(workflow);
                workflowType.put(type, workflows);
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }finally {

            // 關(guān)閉鏈接
            if(conn != null){
                try {
                    conn.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
        return workflowType;

    }

    public String evaluate(String s){
        assert workflowType != null;

        for(String type:workflowType.keySet()){
            List<String> workflows = workflowType.get(type);
            if(workflows.contains(s)){
                return type;
            }
        }

        return s;
    }

}
Java

查看hive function的用法:

查month 相關(guān)的函數(shù)

show functions like '*month*';
SQL

查看 add_months 函數(shù)的用法

desc function add_months;
SQL

查看 add_months 函數(shù)的詳細(xì)說(shuō)明并舉例

desc function extended add_months;
SQL

hive 中的 UDAF

可以看出,udf就是一個(gè)輸入一個(gè)輸出,輸入一個(gè)性別,返回'男'或者'女',如果我們想實(shí)現(xiàn)select date,count(1) from table,統(tǒng)計(jì)每天的流量呢?這就是一個(gè)分組統(tǒng)計(jì),顯然是多個(gè)輸入,一個(gè)輸出,這時(shí)候udf已經(jīng)不能滿足我們的需要,就需要寫(xiě)udaf,user defined aggregare function(用戶自定義聚合函數(shù))。

這里寫(xiě)一個(gè)字符串連接函數(shù),相當(dāng)于concat的功能,將多行輸入,合并為一個(gè)字符串:

package com.js.dataclean.hive.udaf.hm2;

import com.js.dataclean.utils.StringUtil;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;

/**
 * 實(shí)現(xiàn)字符串連接聚合的UDAF
 * @version v1.0.0
 * @Author:keguang
 * @Date:2018/10/22 14:36
 */
public class MutiStringConcat extends UDAF{
    public static class SumState{
        private String sumStr;
    }

    public static class SumEvaluator implements UDAFEvaluator{
        SumState sumState;

        public SumEvaluator(){
            super();
            sumState = new SumState();
            init();
        }

        @Override
        public void init() {
            sumState.sumStr = "";
        }

        /**
         * 來(lái)了一行數(shù)據(jù)
         * @param s
         * @return
         */
        public boolean iterate(String s){
            if(!StringUtil.isNull(s)){
                sumState.sumStr += s;
            }
            return true;
        }

        /**
         * 狀態(tài)傳遞
         * @return
         */
        public SumState terminatePartial() {
            return sumState;
        }

        /**
         * 子任務(wù)合并
         * @param state
         * @return
         */
        public boolean merge(SumState state){
            if(state != null){
                sumState.sumStr += state.sumStr;
            }
            return true;
        }

        /**
         * 返回最終結(jié)果
         * @return
         */
        public String terminate(){
            return sumState.sumStr;
        }
    }
}
Java

用法,與udf一樣,還是需要打包并且到hive cli中注冊(cè)使用。

關(guān)于UDAF開(kāi)發(fā)注意點(diǎn):

  • 需要import org.apache.hadoop.hive.ql.exec.UDAF以及org.apache.hadoop.hive.ql.exec.UDAFEvaluator,這兩個(gè)包都是必須的

  • 函數(shù)類(lèi)需要繼承UDAF類(lèi),內(nèi)部類(lèi)Evaluator實(shí)現(xiàn)UDAFEvaluator接口

  • Evaluator需要實(shí)現(xiàn) init、iterate、terminatePartial、merge、terminate這幾個(gè)函數(shù)

    • init函數(shù)類(lèi)似于構(gòu)造函數(shù),用于UDAF的初始化

    • iterate接收傳入的參數(shù),并進(jìn)行內(nèi)部的輪轉(zhuǎn)。其返回類(lèi)型為boolean

    • terminatePartial無(wú)參數(shù),其為iterate函數(shù)輪轉(zhuǎn)結(jié)束后,返回亂轉(zhuǎn)數(shù)據(jù),iterate和terminatePartial類(lèi)似于hadoop的Combiner

    • merge接收terminatePartial的返回結(jié)果,進(jìn)行數(shù)據(jù)merge操作,其返回類(lèi)型為boolean

    • terminate返回最終的聚集函數(shù)結(jié)果










作者:柯廣的網(wǎng)絡(luò)日志

微信公眾號(hào):Java大數(shù)據(jù)與數(shù)據(jù)倉(cāng)庫(kù)