Bootstrap

Impala UDTF 功能实现

问题背景

需要将表中的一行记录中的 json 数组转成多行,Hive 中可以使用 UDTF 做到,然而 Impala 中没有实现 UDTF,那么该怎么办?

思考

UDTF 就是将一行转多行,首先最容易想到的就是 join,join 操作可以将一条记录转成多条记录,所以这里的关键在于构造 join 中的 on 的部分,也就是 join 的条件。

实现原理

下面以一个字符串的切割为例,很容易想到:

select
t2.name
,split_part(t2.name, t1.delimiter, t1.num)
from
(
    select ',' as delimiter, 1 as num
    union all
    select ',' as delimiter, 2 as num
    union all
    select ',' as delimiter, 3 as num
    union all
    select ',' as delimiter, 4 as num
    union all
    select ',' as delimiter, 5 as num
)t1
inner join
(
    select 'a,b,c' as name
)t2
on split_part(t2.name, t1.delimiter, t1.num) != ''

结果为:

+-------+-------------------------------------------+
| name  | split_part(t2.name, t1.delimiter, t1.num) |
+-------+-------------------------------------------+
| a,b,c | a                                         |
| a,b,c | b                                         |
| a,b,c | c                                         |
+-------+-------------------------------------------+

这里达到了我们想要的效果,由于 Impala 中并没有关于 Json 的处理函数,所以下面我们要开发 UDF 函数,通过下标获取到 Json 数组中的元素。

UDF 函数开发

Java 代码

package com.luckypeng.udf;

import org.apache.hadoop.hive.ql.exec.UDF;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;

import java.io.IOException;

/**
 * @author chenzhipeng
 * create temporary function json_at as 'com.luckypeng.udf.JsonAtUDF'
 */
public class JsonAtUDF extends UDF {
    public String evaluate(String jsonArray, Integer num) {
        if (jsonArray == null || jsonArray.isEmpty()) {
            return null;
        }
        JsonNode json;
        try {
            json = new ObjectMapper().readValue(jsonArray, JsonNode.class);
        } catch (IOException e) {
            throw new IllegalArgumentException("not valid json array: " + jsonArray);
        }
        if (json == null || json.get(num) == null) {
            return null;
        }
        return json.get(num).toString();
    }

    public static void main(String[] args) {
        JsonAtUDF udf = new JsonAtUDF();
        String jsonArray = "[1, 2, 3.1]";
        System.out.println(udf.evaluate(jsonArray, 1));

        jsonArray = "[{\"id\": 1, \"name\": \"zhangsan\"},{\"id\": 2, \"name\": \"lisi\"},{\"id\": 3, \"name\": \"wangwu\"}]";
        System.out.println(udf.evaluate(jsonArray, 2));
        System.out.println(udf.evaluate(jsonArray, 3));
    }
}

上传 jar

hadoop fs -put jsonAt.jar /user/hive/udf/

Impala 操作

创建 Impala udf

create function json_at(string, int) 
returns string location '/user/hive/udf/jsonAt.jar' symbol='com.luckypeng.udf.JsonAtUDF';

查询验证

select
t2.json_array
,udf.json_at(t2.json_array, t1.num) as json
,udf.json_get_object(udf.json_at(t2.json_array, t1.num), '$.id') as id
,udf.json_get_object(udf.json_at(t2.json_array, t1.num), '$.name') as name
from
(
    select 0 as num
    union all
    select 1 as num
    union all
    select 2 as num
    union all
    select 3 as num
    union all
    select 4 as num
)t1
inner join
(
    select '[{"id": 1, "name": "zhangsan"},{"id": 2, "name": "lisi"},{"id": 3, "name": "wangwu"}]' as json_array
    union all
    select '[{"id": 4, "name": "zhangsan"},{"id": 5, "name": "lisi"},{"id": 6, "name": "wangwu"}]' as json_array
)t2
on udf.json_at(t2.json_array, t1.num) != ''

注:这里的 json_get_object 来自于 https://github.com/nazgul33/impala-get-json-object-udf

+---------------------------------------------------------------------------------------+----------------------------+----+----------+
| json_array                                                                            | json                       | id | name     |
+---------------------------------------------------------------------------------------+----------------------------+----+----------+
| [{"id": 1, "name": "zhangsan"},{"id": 2, "name": "lisi"},{"id": 3, "name": "wangwu"}] | {"id":1,"name":"zhangsan"} | 1  | zhangsan |
| [{"id": 1, "name": "zhangsan"},{"id": 2, "name": "lisi"},{"id": 3, "name": "wangwu"}] | {"id":2,"name":"lisi"}     | 2  | lisi     |
| [{"id": 1, "name": "zhangsan"},{"id": 2, "name": "lisi"},{"id": 3, "name": "wangwu"}] | {"id":3,"name":"wangwu"}   | 3  | wangwu   |
| [{"id": 4, "name": "zhangsan"},{"id": 5, "name": "lisi"},{"id": 6, "name": "wangwu"}] | {"id":4,"name":"zhangsan"} | 4  | zhangsan |
| [{"id": 4, "name": "zhangsan"},{"id": 5, "name": "lisi"},{"id": 6, "name": "wangwu"}] | {"id":5,"name":"lisi"}     | 5  | lisi     |
| [{"id": 4, "name": "zhangsan"},{"id": 5, "name": "lisi"},{"id": 6, "name": "wangwu"}] | {"id":6,"name":"wangwu"}   | 6  | wangwu   |
+---------------------------------------------------------------------------------------+----------------------------+----+----------+

生产应用

这里可以根据需要将 t1 建成一张维表即可,一般来说 0-100 应该够用了。

小结

看似一个不可解的 UDTF 功能,其实可以绕路搞定,本质在于我们对于操作的深入理解。了解了事物的真相,我们将打开更广阔的思路。