hive udf 使用方法

Hive系列文章

  1. Hive表的基本操作
  2. Hive中的集合數(shù)據(jù)類型
  3. Hive動態(tài)分區(qū)詳解
  4. hive中orc格式表的數(shù)據(jù)導入
  5. Java通過jdbc連接hive
  6. 通過HiveServer2訪問Hive
  7. SpringBoot連接Hive實現(xiàn)自助取數(shù)
  8. hive關聯(lián)hbase表
  9. Hive udf 使用方法
  10. Hive基于UDF進行文本分詞
  11. Hive窗口函數(shù)row number的用法
  12. 數(shù)據(jù)倉庫之拉鏈表

hive作為一個sql查詢引擎,自帶了一些基本的函數(shù),比如count(計數(shù)),sum(求和),有時候這些基本函數(shù)滿足不了我們的需求,這時候就要寫hive hdf(user defined funation),又叫用戶自定義函數(shù),應用與select 語句中。

哪些情況滿足不了我們的需求呢,比如:

  • 需要將字段與數(shù)據(jù)庫中查詢一下,做個比對;
  • 需要對數(shù)據(jù)進行復雜處理;
  • 等等

hive udf 用法

下面是一個判斷hive表字段是否包含'100'的簡單udf:

package com.js.dataclean.hive.udf.hm2

import org.apache.hadoop.hive.ql.exec.UDF;

public class IsContains100 extends UDF{

    public String evaluate(String s){

        if(s == null || s.length() == 0){
            return "0";
        }

        return s.contains("100")?"1":"0";
    }
}
Java

使用maven將其打包,進入hive cli,輸入命令:

add jar /home/hadoop/codejar/flash_format.jar;
create temporary function isContains100 as 'com.js.dataclean.hive.udf.hm2.IsContains100';
SQL

創(chuàng)建完臨時函數(shù),即可使用這個函數(shù)了:

select isContains100('abc100def') from table limit 1;
1
SQL

hive udf 創(chuàng)建與使用步驟

  • 繼承org.apache.hadoop.hive.ql.exec.UDF類,實現(xiàn)evaluate方法;
  • 打包上傳到集群,通過create temporary function創(chuàng)建臨時函數(shù),不加temporary就創(chuàng)建了一個永久函數(shù);
  • 通過select 語句使用;

下面是一個例子,通過讀取mysql數(shù)據(jù)庫中的規(guī)則,為hive中的workflow返回對應的,類型:

type workflow
a   1
a   2
b   11
b   22
b   33

我們希望,將hive的某一個字段取值為,1,2的變?yōu)?code>a,取值為11,22,33的全部變?yōu)?code>b,就是歸類的意思。
這個udf可以這么實現(xiàn):

package com.js.dataclean.hive.udf.hm2.workflow;

import org.apache.hadoop.hive.ql.exec.UDF;

import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @ Author: keguang
 * @ Date: 2018/12/13 16:24
 * @ version: v1.0.0
 * @ description:
 */
public class GetWorkflow extends UDF{

    private static final String host = "0.0.0.0";
    private static final String port = "3306";
    private static final String database = "root";
    private static final String userName = "root";
    private static final String password = "123456";
    private static String url = "";
    private static final String driver = "com.mysql.jdbc.Driver";
    private static Connection conn = null;
    private static Map<String, List<String>> workflowType = null;

    static {
        url = "jdbc:mysql://" + host + ":" + port + "/" + database;
        try {
            Class.forName(driver);
            conn = DriverManager.getConnection(url, userName, password);
            workflowType = getWorkflowType(conn);
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    private static Map<String, List<String>> getWorkflowType(Connection conn){
        Map<String, List<String>> workflowType = new HashMap<>();
        String sql = "select * from flash_player_workflow";
        PreparedStatement ps = null;
        try {
            ps = conn.prepareStatement(sql);
            ResultSet rs = ps.executeQuery();
            while (rs.next()){
                String workflow = rs.getString("workflow");
                String type = rs.getString("flag");

                List<String> workflows = workflowType.get(type);
                if(workflows == null){
                    workflows = new ArrayList<>();
                }
                workflows.add(workflow);
                workflowType.put(type, workflows);
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }finally {

            // 關閉鏈接
            if(conn != null){
                try {
                    conn.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
        return workflowType;

    }

    public String evaluate(String s){
        assert workflowType != null;

        for(String type:workflowType.keySet()){
            List<String> workflows = workflowType.get(type);
            if(workflows.contains(s)){
                return type;
            }
        }

        return s;
    }

}
Java

查看hive function的用法:

查month 相關的函數(shù)

show functions like '*month*';
SQL

查看 add_months 函數(shù)的用法

desc function add_months;
SQL

查看 add_months 函數(shù)的詳細說明并舉例

desc function extended add_months;
SQL

hive 中的 UDAF

可以看出,udf就是一個輸入一個輸出,輸入一個性別,返回'男'或者'女',如果我們想實現(xiàn)select date,count(1) from table,統(tǒng)計每天的流量呢?這就是一個分組統(tǒng)計,顯然是多個輸入,一個輸出,這時候udf已經(jīng)不能滿足我們的需要,就需要寫udaf,user defined aggregare function(用戶自定義聚合函數(shù))。

這里寫一個字符串連接函數(shù),相當于concat的功能,將多行輸入,合并為一個字符串:

package com.js.dataclean.hive.udaf.hm2;

import com.js.dataclean.utils.StringUtil;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;

/**
 * 實現(xiàn)字符串連接聚合的UDAF
 * @version v1.0.0
 * @Author:keguang
 * @Date:2018/10/22 14:36
 */
public class MutiStringConcat extends UDAF{
    public static class SumState{
        private String sumStr;
    }

    public static class SumEvaluator implements UDAFEvaluator{
        SumState sumState;

        public SumEvaluator(){
            super();
            sumState = new SumState();
            init();
        }

        @Override
        public void init() {
            sumState.sumStr = "";
        }

        /**
         * 來了一行數(shù)據(jù)
         * @param s
         * @return
         */
        public boolean iterate(String s){
            if(!StringUtil.isNull(s)){
                sumState.sumStr += s;
            }
            return true;
        }

        /**
         * 狀態(tài)傳遞
         * @return
         */
        public SumState terminatePartial() {
            return sumState;
        }

        /**
         * 子任務合并
         * @param state
         * @return
         */
        public boolean merge(SumState state){
            if(state != null){
                sumState.sumStr += state.sumStr;
            }
            return true;
        }

        /**
         * 返回最終結果
         * @return
         */
        public String terminate(){
            return sumState.sumStr;
        }
    }
}
Java

用法,與udf一樣,還是需要打包并且到hive cli中注冊使用。

關于UDAF開發(fā)注意點:

  • 需要import org.apache.hadoop.hive.ql.exec.UDAF以及org.apache.hadoop.hive.ql.exec.UDAFEvaluator,這兩個包都是必須的

  • 函數(shù)類需要繼承UDAF類,內部類Evaluator實現(xiàn)UDAFEvaluator接口

  • Evaluator需要實現(xiàn) init、iterate、terminatePartial、merge、terminate這幾個函數(shù)

    • init函數(shù)類似于構造函數(shù),用于UDAF的初始化

    • iterate接收傳入的參數(shù),并進行內部的輪轉。其返回類型為boolean

    • terminatePartial無參數(shù),其為iterate函數(shù)輪轉結束后,返回亂轉數(shù)據(jù),iterate和terminatePartial類似于hadoop的Combiner

    • merge接收terminatePartial的返回結果,進行數(shù)據(jù)merge操作,其返回類型為boolean

    • terminate返回最終的聚集函數(shù)結果










作者:柯廣的網(wǎng)絡日志

微信公眾號:Java大數(shù)據(jù)與數(shù)據(jù)倉庫