Hive基于UDF進(jìn)行文本分詞

Hive系列文章

  1. Hive表的基本操作
  2. Hive中的集合數(shù)據(jù)類(lèi)型
  3. Hive動(dòng)態(tài)分區(qū)詳解
  4. hive中orc格式表的數(shù)據(jù)導(dǎo)入
  5. Java通過(guò)jdbc連接hive
  6. 通過(guò)HiveServer2訪問(wèn)Hive
  7. SpringBoot連接Hive實(shí)現(xiàn)自助取數(shù)
  8. hive關(guān)聯(lián)hbase表
  9. Hive udf 使用方法
  10. Hive基于UDF進(jìn)行文本分詞
  11. Hive窗口函數(shù)row number的用法
  12. 數(shù)據(jù)倉(cāng)庫(kù)之拉鏈表

本文大綱

UDF 簡(jiǎn)介

Hive作為一個(gè)sql查詢(xún)引擎,自帶了一些基本的函數(shù),比如count(計(jì)數(shù))sum(求和),有時(shí)候這些基本函數(shù)滿足不了我們的需求,這時(shí)候就要寫(xiě)hive hdf(user defined funation),又叫用戶自定義函數(shù)。編寫(xiě)Hive UDF的步驟:

  • 添加相關(guān)依賴(lài),創(chuàng)建項(xiàng)目,這里我用的管理工具是maven,所以我創(chuàng)建的也是一個(gè)maven 項(xiàng)目(這個(gè)時(shí)候你需要選擇合適的依賴(lài)版本,主要是Hadoop 和 Hive,可以使用hadoop versionhive --version 來(lái)分別查看版本)
  • 繼承org.apache.hadoop.hive.ql.exec.UDF類(lèi),實(shí)現(xiàn)evaluate方法,然后打包;
  • 使用 add方法添加jar 包到分布式緩存,如果jar包是上傳到$HIVE_HOME/lib/目錄以下,就不需要執(zhí)行add命令了
  • 通過(guò)create temporary function創(chuàng)建臨時(shí)函數(shù),不加temporary就創(chuàng)建了一個(gè)永久函數(shù);
  • 在SQL 中使用你創(chuàng)建的UDF;

UDF分詞

這個(gè)是一個(gè)比較常見(jiàn)的場(chǎng)景,例如公司的產(chǎn)品有每天都會(huì)產(chǎn)生大量的彈幕或者評(píng)論,這個(gè)時(shí)候我們可能會(huì)想去分析一下大家最關(guān)心的熱點(diǎn)話題是什么,或者是我們會(huì)分析最近一段時(shí)間的網(wǎng)絡(luò)趨勢(shì)是什么,但是這里有一個(gè)問(wèn)題就是你的詞庫(kù)建設(shè)的問(wèn)題,因?yàn)槟闶褂猛ㄓ玫脑~庫(kù)可能不能達(dá)到很好的分詞效果,尤其有很多網(wǎng)絡(luò)流行用語(yǔ)它是不在詞庫(kù)里的,還有一個(gè)就是停用詞的問(wèn)題了,因?yàn)楹芏鄷r(shí)候停用詞是沒(méi)有意義的,所以這里我們需要將其過(guò)濾,而過(guò)濾的方式就是通過(guò)停用詞詞表進(jìn)行過(guò)濾。

這個(gè)時(shí)候我們的解決方案主要有兩種,一種是使用第三方提供的一些詞庫(kù),還有一種是自建詞庫(kù),然后有專(zhuān)人去維護(hù),這個(gè)也是比較常見(jiàn)的一種情況。

最后一個(gè)就是我們使用的分詞工具,因?yàn)槟壳爸髁鞯姆衷~器很多,選擇不同的分詞工具可能對(duì)我們的分詞結(jié)果有很多影響。

分詞工具

1:Elasticsearch的開(kāi)源中文分詞器 IK Analysis(Star:2471)

IK中文分詞器在Elasticsearch上的使用。原生IK中文分詞是從文件系統(tǒng)中讀取詞典,es-ik本身可擴(kuò)展成從不同的源讀取詞典。目前提供從sqlite3數(shù)據(jù)庫(kù)中讀取。es-ik-plugin-sqlite3使用方法: 1. 在elasticsearch.yml中設(shè)置你的sqlite3詞典的位置: ik_analysis_db_path: /opt/ik/dictionary.db

2:開(kāi)源的java中文分詞庫(kù) IKAnalyzer(Star:343)

IK Analyzer 是一個(gè)開(kāi)源的,基于java語(yǔ)言開(kāi)發(fā)的輕量級(jí)的中文分詞工具包。從2006年12月推出1.0版開(kāi)始, IKAnalyzer已經(jīng)推出了4個(gè)大版本。最初,它是以開(kāi)源項(xiàng)目Luence為應(yīng)用主體的,結(jié)合詞典分詞和文法分析算法的中文分詞組件。從3.0版本開(kāi)始,IK發(fā)展為面向Java的公用分詞組件,獨(dú)立于Lucene項(xiàng)目

3:java開(kāi)源中文分詞 Ansj(Star:3019)

Ansj中文分詞 這是一個(gè)ictclas的java實(shí)現(xiàn).基本上重寫(xiě)了所有的數(shù)據(jù)結(jié)構(gòu)和算法.詞典是用的開(kāi)源版的ictclas所提供的.并且進(jìn)行了部分的人工優(yōu)化 分詞速度達(dá)到每秒鐘大約200萬(wàn)字左右,準(zhǔn)確率能達(dá)到96%以上。

目前實(shí)現(xiàn)了.中文分詞. 中文姓名識(shí)別 . 詞性標(biāo)注、用戶自定義詞典,關(guān)鍵字提取,自動(dòng)摘要,關(guān)鍵字標(biāo)記等功能。

可以應(yīng)用到自然語(yǔ)言處理等方面,適用于對(duì)分詞效果要求高的各種項(xiàng)目.

4:結(jié)巴分詞 ElasticSearch 插件(Star:188)

elasticsearch官方只提供smartcn這個(gè)中文分詞插件,效果不是很好,好在國(guó)內(nèi)有medcl大神(國(guó)內(nèi)最早研究es的人之一)寫(xiě)的兩個(gè)中文分詞插件,一個(gè)是ik的,一個(gè)是mmseg的

5:Java分布式中文分詞組件 - word分詞(Star:672)

word分詞是一個(gè)Java實(shí)現(xiàn)的分布式的中文分詞組件,提供了多種基于詞典的分詞算法,并利用ngram模型來(lái)消除歧義。能準(zhǔn)確識(shí)別英文、數(shù)字,以及日期、時(shí)間等數(shù)量詞,能識(shí)別人名、地名、組織機(jī)構(gòu)名等未登錄詞

6:Java開(kāi)源中文分詞器jcseg(Star:400)

Jcseg是什么? Jcseg是基于mmseg算法的一個(gè)輕量級(jí)開(kāi)源中文分詞器,同時(shí)集成了關(guān)鍵字提取,關(guān)鍵短語(yǔ)提取,關(guān)鍵句子提取和文章自動(dòng)摘要等功能,并且提供了最新版本的lucene, solr, elasticsearch的分詞接口, Jcseg自帶了一個(gè) jcseg.properties文件...

7:中文分詞庫(kù)Paoding

庖丁中文分詞庫(kù)是一個(gè)使用Java開(kāi)發(fā)的,可結(jié)合到Lucene應(yīng)用中的,為互聯(lián)網(wǎng)、企業(yè)內(nèi)部網(wǎng)使用的中文搜索引擎分詞組件。Paoding填補(bǔ)了國(guó)內(nèi)中文分詞方面開(kāi)源組件的空白,致力于此并希翼成為互聯(lián)網(wǎng)網(wǎng)站首選的中文分詞開(kāi)源組件。 Paoding中文分詞追求分詞的高效率和用戶良好體驗(yàn)。

8:中文分詞器mmseg4j

mmseg4j 用 Chih-Hao Tsai 的 MMSeg 算法(http://technology.chtsai.org/mmseg/ )實(shí)現(xiàn)的中文分詞器,并實(shí)現(xiàn) lucene 的 analyzer 和 solr 的TokenizerFactory 以方便在Lucene和Solr中使...

9:中文分詞Ansj(Star:3015)

Ansj中文分詞 這是一個(gè)ictclas的java實(shí)現(xiàn).基本上重寫(xiě)了所有的數(shù)據(jù)結(jié)構(gòu)和算法.詞典是用的開(kāi)源版的ictclas所提供的.并且進(jìn)行了部分的人工優(yōu)化 內(nèi)存中中文分詞每秒鐘大約100萬(wàn)字(速度上已經(jīng)超越ictclas) 文件讀取分詞每秒鐘大約30萬(wàn)字 準(zhǔn)確率能達(dá)到96%以上 目前實(shí)現(xiàn)了....

10:Lucene中文分詞庫(kù)ICTCLAS4J

ictclas4j中文分詞系統(tǒng)是sinboy在中科院張華平和劉群老師的研制的FreeICTCLAS的基礎(chǔ)上完成的一個(gè)java開(kāi)源分詞項(xiàng)目,簡(jiǎn)化了原分詞程序的復(fù)雜度,旨在為廣大的中文分詞愛(ài)好者一個(gè)更好的學(xué)習(xí)機(jī)會(huì)。

代碼實(shí)現(xiàn)

第一步:引入依賴(lài)

這里我們引入了兩個(gè)依賴(lài),其實(shí)是兩個(gè)不同分詞工具

<dependency>
  <groupId>org.ansj</groupId>
  <artifactId>ansj_seg</artifactId>
  <version>5.1.6</version>
  <scope>compile</scope>
</dependency>
<dependency>
  <groupId>com.janeluo</groupId>
  <artifactId>ikanalyzer</artifactId>
  <version>2012_u6</version>
</dependency>
Java

在開(kāi)始之前我們先寫(xiě)一個(gè)demo 玩玩,讓大家有個(gè)基本的認(rèn)識(shí)

@Test
public  void testAnsjSeg() {
    String str = "我叫李太白,我是一個(gè)詩(shī)人,我生活在唐朝" ;
    // 選擇使用哪種分詞器 BaseAnalysis ToAnalysis NlpAnalysis  IndexAnalysis
    Result result = ToAnalysis.parse(str);
    System.out.println(result);
    KeyWordComputer kwc = new KeyWordComputer(5);
    Collection<Keyword> keywords = kwc.computeArticleTfidf(str);
    System.out.println(keywords);
}
Java

輸出結(jié)果

我/r,叫/v,李太白/nr,,/w,我/r,是/v,一個(gè)/m,詩(shī)人/n,,/w,我/r,生活/vn,在/p,唐朝/t
[李太白/24.72276098504223, 詩(shī)人/3.0502185968368885, 唐朝/0.8965677022546215, 生活/0.6892230219652541]

第二步:引入停用詞詞庫(kù)

因?yàn)槭峭S迷~詞庫(kù),本身也不是很大,所以我直接放在項(xiàng)目里了,當(dāng)然你也可以放在其他地方,例如HDFS 上

第三步:編寫(xiě)UDF

代碼很簡(jiǎn)單我就不不做詳細(xì)解釋了,需要注意的是GenericUDF 里面的一些方法的使用規(guī)則,至于代碼設(shè)計(jì)的好壞以及還有什么改進(jìn)的方案我們后面再說(shuō),下面兩套實(shí)現(xiàn)的思路幾乎是一致的,不一樣的是在使用的分詞工具上的不一樣

ansj的實(shí)現(xiàn)

/**
 * Chinese words segmentation with user-dict in com.kingcall.dic
 * use Ansj(a java open source analyzer)
 */

// 這個(gè)信息就是你每次使用desc 進(jìn)行獲取函數(shù)信息的時(shí)候返回的
@Description(name = "ansj_seg", value = "_FUNC_(str) - chinese words segment using ansj. Return list of words.",
        extended = "Example: select _FUNC_('我是測(cè)試字符串') from src limit 1;\n"
                + "[\"我\", \"是\", \"測(cè)試\", \"字符串\"]")

public class AnsjSeg extends GenericUDF {
    private transient ObjectInspectorConverters.Converter[] converters;
    private static final String userDic = "/app/stopwords/com.kingcall.dic";

    //load userDic in hdfs
    static {
        try {
            FileSystem fs = FileSystem.get(new Configuration());
            FSDataInputStream in = fs.open(new Path(userDic));
            BufferedReader br = new BufferedReader(new InputStreamReader(in));

            String line = null;
            String[] strs = null;
            while ((line = br.readLine()) != null) {
                line = line.trim();
                if (line.length() > 0) {
                    strs = line.split("\t");
                    strs[0] = strs[0].toLowerCase();
                    DicLibrary.insert(DicLibrary.DEFAULT, strs[0]); //ignore nature and freq
                }
            }
            MyStaticValue.isNameRecognition = Boolean.FALSE;
            MyStaticValue.isQuantifierRecognition = Boolean.TRUE;
        } catch (Exception e) {
            System.out.println("Error when load userDic" + e.getMessage());
        }
    }

    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
        if (arguments.length < 1 || arguments.length > 2) {
            throw new UDFArgumentLengthException(
                    "The function AnsjSeg(str) takes 1 or 2 arguments.");
        }

        converters = new ObjectInspectorConverters.Converter[arguments.length];
        converters[0] = ObjectInspectorConverters.getConverter(arguments[0], PrimitiveObjectInspectorFactory.writableStringObjectInspector);
        if (2 == arguments.length) {
            converters[1] = ObjectInspectorConverters.getConverter(arguments[1], PrimitiveObjectInspectorFactory.writableIntObjectInspector);
        }
        return ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
    }

    @Override
    public Object evaluate(DeferredObject[] arguments) throws HiveException {
        boolean filterStop = false;
        if (arguments[0].get() == null) {
            return null;
        }
        if (2 == arguments.length) {
            IntWritable filterParam = (IntWritable) converters[1].convert(arguments[1].get());
            if (1 == filterParam.get()) filterStop = true;
        }

        Text s = (Text) converters[0].convert(arguments[0].get());
        ArrayList<Text> result = new ArrayList<>();

        if (filterStop) {
            for (Term words : DicAnalysis.parse(s.toString()).recognition(StopLibrary.get())) {
                if (words.getName().trim().length() > 0) {
                    result.add(new Text(words.getName().trim()));
                }
            }
        } else {
            for (Term words : DicAnalysis.parse(s.toString())) {
                if (words.getName().trim().length() > 0) {
                    result.add(new Text(words.getName().trim()));
                }
            }
        }
        return result;
    }

    @Override
    public String getDisplayString(String[] children) {
        return getStandardDisplayString("ansj_seg", children);
    }
}
Java






ikanalyzer的實(shí)現(xiàn)

@Description(name = "ansj_seg", value = "_FUNC_(str) - chinese words segment using Iknalyzer. Return list of words.",
        extended = "Example: select _FUNC_('我是測(cè)試字符串') from src limit 1;\n"
                + "[\"我\", \"是\", \"測(cè)試\", \"字符串\"]")
public class IknalyzerSeg extends GenericUDF {
    private transient ObjectInspectorConverters.Converter[] converters;
    //用來(lái)存放停用詞的集合
    Set<String> stopWordSet = new HashSet<String>();

    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
        if (arguments.length < 1 || arguments.length > 2) {
            throw new UDFArgumentLengthException(
                    "The function AnsjSeg(str) takes 1 or 2 arguments.");
        }
        //讀入停用詞文件
        BufferedReader StopWordFileBr = null;
        try {
            StopWordFileBr = new BufferedReader(new InputStreamReader(new FileInputStream(new File("stopwords/baidu_stopwords.txt"))));
            //初如化停用詞集
            String stopWord = null;
            for(; (stopWord = StopWordFileBr.readLine()) != null;){
                stopWordSet.add(stopWord);
            }
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }

        converters = new ObjectInspectorConverters.Converter[arguments.length];
        converters[0] = ObjectInspectorConverters.getConverter(arguments[0], PrimitiveObjectInspectorFactory.writableStringObjectInspector);
        if (2 == arguments.length) {
            converters[1] = ObjectInspectorConverters.getConverter(arguments[1], PrimitiveObjectInspectorFactory.writableIntObjectInspector);
        }
        return ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.writableStringObjectInspector);

    }

    @Override
    public Object evaluate(DeferredObject[] arguments) throws HiveException {
        boolean filterStop = false;
        if (arguments[0].get() == null) {
            return null;
        }
        if (2 == arguments.length) {
            IntWritable filterParam = (IntWritable) converters[1].convert(arguments[1].get());
            if (1 == filterParam.get()) filterStop = true;
        }
        Text s = (Text) converters[0].convert(arguments[0].get());
        StringReader reader = new StringReader(s.toString());
        IKSegmenter iks = new IKSegmenter(reader, true);
        List<Text> list = new ArrayList<>();
        if (filterStop) {
            try {
                Lexeme lexeme;
                while ((lexeme = iks.next()) != null) {
                    if (!stopWordSet.contains(lexeme.getLexemeText())) {
                        list.add(new Text(lexeme.getLexemeText()));
                    }
                }
            } catch (IOException e) {
            }
        } else {
            try {
                Lexeme lexeme;
                while ((lexeme = iks.next()) != null) {
                    list.add(new Text(lexeme.getLexemeText()));
                }
            } catch (IOException e) {
            }
        }
        return list;
    }

    @Override
    public String getDisplayString(String[] children) {
        return "Usage: evaluate(String str)";
    }
}
Java

第四步:編寫(xiě)測(cè)試用例

GenericUDF 給我們提供了一些方法,這些方法可以用來(lái)構(gòu)建測(cè)試需要的環(huán)境和參數(shù),這樣我們就可以測(cè)試這些代碼了

@Test
public void testAnsjSegFunc() throws HiveException {
    AnsjSeg udf = new AnsjSeg();
    ObjectInspector valueOI0 = PrimitiveObjectInspectorFactory.javaStringObjectInspector;
    ObjectInspector valueOI1 = PrimitiveObjectInspectorFactory.javaIntObjectInspector;
    ObjectInspector[] init_args = {valueOI0, valueOI1};
    udf.initialize(init_args);

    Text str = new Text("我是測(cè)試字符串");

    GenericUDF.DeferredObject valueObj0 = new GenericUDF.DeferredJavaObject(str);
    GenericUDF.DeferredObject valueObj1 = new GenericUDF.DeferredJavaObject(0);
    GenericUDF.DeferredObject[] args = {valueObj0, valueObj1};
    ArrayList<Object> res = (ArrayList<Object>) udf.evaluate(args);
    System.out.println(res);
}

@Test
public void testIkSegFunc() throws HiveException {
    IknalyzerSeg udf = new IknalyzerSeg();
    ObjectInspector valueOI0 = PrimitiveObjectInspectorFactory.javaStringObjectInspector;
    ObjectInspector valueOI1 = PrimitiveObjectInspectorFactory.javaIntObjectInspector;
    ObjectInspector[] init_args = {valueOI0, valueOI1};
    udf.initialize(init_args);

    Text str = new Text("我是測(cè)試字符串");

    GenericUDF.DeferredObject valueObj0 = new GenericUDF.DeferredJavaObject(str);
    GenericUDF.DeferredObject valueObj1 = new GenericUDF.DeferredJavaObject(0);
    GenericUDF.DeferredObject[] args = {valueObj0, valueObj1};
    ArrayList<Object> res = (ArrayList<Object>) udf.evaluate(args);
    System.out.println(res);
}
Java

我們看到加載停用詞沒(méi)有找到,但是整體還是跑起來(lái)了,因?yàn)樽x取不到HDFS 上的文件

但是我們第二個(gè)樣例是不需要從HDFS 上加載停用詞信息,所以可以完美的測(cè)試運(yùn)行

后來(lái)為了能在外部更新文件,我將其放在了HDFS 上,和AnsjSeg 中的代碼一樣

第五步:創(chuàng)建UDF 并使用

add jar /Users/liuwenqiang/workspace/code/idea/HiveUDF/target/HiveUDF-0.0.4.jar;
create temporary function ansjSeg as 'com.kingcall.bigdata.HiveUDF.AnsjSeg';
select ansjSeg("我是字符串,你是啥");
-- 開(kāi)啟停用詞過(guò)濾
select ansjSeg("我是字符串,你是啥",1);
create temporary function ikSeg as 'com.kingcall.bigdata.HiveUDF.IknalyzerSeg';
select ikSeg("我是字符串,你是啥");
select ikSeg("我是字符串,你是啥",1);

上面方法的第二個(gè)參數(shù),就是是否開(kāi)啟停用詞過(guò)濾,我們使用ikSeg函數(shù)演示一下

下面我們嘗試獲取一下函數(shù)的描述信息

如果沒(méi)有寫(xiě)的話,就是下面的這樣的

其它應(yīng)用場(chǎng)景

通過(guò)編寫(xiě)Hive UDF可以輕松幫我們實(shí)現(xiàn)大量常見(jiàn)需求,其它應(yīng)該場(chǎng)景還有:

  • ip地址轉(zhuǎn)地區(qū):將上報(bào)的用戶日志中的ip字段轉(zhuǎn)化為國(guó)家-省-市格式,便于做地域分布統(tǒng)計(jì)分析;
  • 使用Hive SQL計(jì)算的標(biāo)簽數(shù)據(jù),不想編寫(xiě)Spark程序,可以通過(guò)UDF在靜態(tài)代碼塊中初始化連接池,利用Hive啟動(dòng)的并行MR任務(wù),并行快速導(dǎo)入大量數(shù)據(jù)到codis中,應(yīng)用于一些推薦業(yè)務(wù);
  • 還有其它sql實(shí)現(xiàn)相對(duì)復(fù)雜的任務(wù),都可以編寫(xiě)永久Hive UDF進(jìn)行轉(zhuǎn)化;

總結(jié)

  1. 這一節(jié)我們學(xué)習(xí)了一個(gè)比較常見(jiàn)的UDF,通過(guò)實(shí)現(xiàn)GenericUDF 抽象類(lèi)來(lái)實(shí)現(xiàn),這一節(jié)的重點(diǎn)在于代碼的實(shí)現(xiàn)以及對(duì)GenericUDF類(lèi)中方法的理解
  2. 上面的代碼實(shí)現(xiàn)上有一個(gè)問(wèn)題,那就是關(guān)于停用詞的加載,就是我們能不能動(dòng)態(tài)加載停用詞呢?

  1. 作者:柯廣的網(wǎng)絡(luò)日志

    微信公眾號(hào):Java大數(shù)據(jù)與數(shù)據(jù)倉(cāng)庫(kù)