0%

ODPS Hive UDF UDAF UDTF 编写

之前一直以为udf是一个很高深很复杂的东西,但是经过实际编写后,发现它其实很简单。掌握这几个的特性,对写sql、抽样本、提特征会提升不少效率。

UDF为输入几个字段,返回一个值,比如trimlength等函数;UDAF为聚合函数,如minmax,而UDTF则是拆成多行的函数,例如explode函数。

UDF

下面是阿里云官方文档提供的udf示例

1
2
3
4
5
6
7
from odps.udf import annotate
@annotate("bigint,bigint->bigint")
class MyPlus(object):
def evaluate(self, arg0, arg1):
if None in (arg0, arg1):
return None
return arg0 + arg1

这里的evaluate函数可以传入多个参数(字段),返回一列。输入和输出参数的类型通过注解配置。

UDAF

UDAF为聚合函数,配合group by使用
下面展示的是求平均值函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@annotate('double->double')
class Average(BaseUDAF):
def new_buffer(self):
return [0, 0]
def iterate(self, buffer, number):
if number is not None:
buffer[0] += number
buffer[1] += 1
def merge(self, buffer, pbuffer):
buffer[0] += pbuffer[0]
buffer[1] += pbuffer[1]
def terminate(self, buffer):
if buffer[1] == 0:
return 0.0
return buffer[0] / buffer[1]

BaseUDAF.new_buffer():实现此方法返回聚合函数的中间值的bufferbuffer必须是marshallableObject(例如LIST、DICT),并且buffer的大小不应该随数据量递增。在极限情况下,buffer Marshal过后的大小不应该超过2MB。

BaseUDAF.iterate(buffer[, args, ...]):实现此方法将args聚合到中间值buffer中。

BaseUDAF.merge(buffer, pbuffer):实现此方法将两个中间值buffer聚合到一起,即将pbuffer合并到buffer中。

BaseUDAF.terminate(buffer):实现此方法将中间值buffer转换为MaxCompute SQL的基本类型。

UDTF

UDTF是用来拆成多行用的,常常和LATERAL VIEW一起用。explode就是常见的UDTF

1
SELECT pageid, adid FROM pageAds LATERAL VIEW explode(adid_list) adTable AS adid;

样例代码:

1
2
3
4
5
6
7
8
9
10
11
12
#coding:utf-8
# explode.py
from odps.udf import annotate
from odps.udf import BaseUDTF
@annotate('string -> string')
class Explode(BaseUDTF):
"""将string按逗号分隔输出成多条记录。
"""
def process(self, arg):
props = arg.split(',')
for p in props:
self.forward(p)

其中,每一条记录都会对应调用一次process,而每调用一次self.forward()就会生成一行记录。

感悟

啊,最近写UDF真是写上瘾了,UDF大大扩展了SQL能力,写好一个UDF,可以减少很多复杂的SQL语句,同时还可以使代码变的更加易读。

参考资料

阿里云Python UDF文档