PrestoUDF开发

Presto函数

在 Presto 中,函数大体分为三种:scalar,aggregation 和 window 类型。分别如下:

1)scalar标量函数,简单来说就是 Java 中的一个静态方法,本身没有任何状态。

2)aggregation累积状态的函数,或聚集函数,如count,avg。如果只是单节点,单机状态可以直接用一个变量存储即可,但是presto是分布式计算引擎,状态数据会在多个节点之间传输,因此状态数据需要被序列化成 Presto 的内部格式才可以被传输。

3)window 窗口函数,如同sparkSQL中的窗口函数类似

官网地址:https://prestodb.github.io/docs/current/develop/functions.html

自定义Scalar函数的实现

定义一个java类

  1. 用 @ScalarFunction 的 Annotation 标记实现业务逻辑的静态方法。

  2. 用 @Description 描述函数的作用,这里的内容会在 SHOW FUNCTIONS 中显示。

  3. 用@SqlType 标记函数的返回值类型,如返回字符串,因此是 StandardTypes.VARCHAR。

  4. Java 方法的返回值必须使用 Presto 内部的序列化方式,因此字符串类型必须返回 Slice, 使用 Slices.utf8Slice 方法可以方便的将 String 类型转换成 Slice 类型

package com.tunna.spark.presto.udf;

import com.facebook.presto.spi.function.Description;
import com.facebook.presto.spi.function.ScalarFunction;
import com.facebook.presto.spi.function.SqlType;
import com.facebook.presto.spi.type.StandardTypes;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;

public class PrefixUDF {

@Description("输入的数据加上前缀") //描述
@ScalarFunction("tunan_prefix") //方法名称
@SqlType(StandardTypes.VARCHAR) //返回类型
public static Slice prefix(@SqlType(StandardTypes.VARCHAR)Slice input){
// Slices.utf8Slice 方法可以方便的将 String 类型转换成 Slice 类型
return Slices.utf8Slice("tunan_prefix_"+input.toStringUtf8());
}
}

Presto插件机制

presto不能像hive那样配置自定义的udf,要采用这种插件机制实现。Presto 的插件(Plugin)机制,是 Presto 能够整合多种数据源的核心。通过实现不同的 Plugin,Presto 允许用户在不同类型的数据源之间进行 JOIN 等计算。Presto 内部的所有数据源都是通过插件机制实现, 例如 MySQL、Hive、HBase等。Presto 插件机制不仅通过加载 Connector 来实现不同数据源的访问,还通过加载 FunctionFactory 来实现 UDF 的加载。 Presto 的 Plugin 遵循 Java 中的 ServiceLoader 规范, 实现非常简单。

实现一个plugin接口如:

package com.tunna.spark.presto.udf;


import com.facebook.presto.spi.Plugin;
import com.google.common.collect.ImmutableSet;

import java.util.Set;

public class ProstoUDFPlugin implements Plugin {
@Override
public Set<Class<?>> getFunctions() {
return ImmutableSet.<Class<?>>builder()
.add(PrefixUDF.class)
.build();
}
}

注册函数

在resources下创建META-INF/services目录,创建文件com.facebook.presto.spi.Plugin,拷贝Plugin的全限定名

com.tunna.spark.presto.udf.ProstoUDFPlugin

最后在presto的plugin目录下创建我们自己的目录,并且打包上传到我们自己的目录下,需要重启presto才能将jar中的自定义函数加载进去,如果有多个依赖,都需要放在我们创建的目录下

自定义Aggregation函数的实现

实现原理步骤

Presto 把 Aggregation 函数分解成三个步骤执行:

  1. input(state, data): 针对每条数据,执行 input 函数。这个过程是并行执行的,因此在每个有数据的节点都会执行,最终得到多个累积的状态数据。

  2. combine(state1, state2):将所有节点的状态数据聚合起来,多次执行,直至所有状态数据被聚合成一个最终状态,也就是 Aggregation 函数的输出结果。

  3. output(final_state, out):最终输出结果到一个 BlockBuilder。

具体代码实现过程

  1. 一个继承AccumulatorState的State接口,自定义get和set方法
  2. 定义一个 Java 类,使用 @AggregationFunction 标记为 Aggregation 函数
  3. 使用 @InputFunction、 @CombineFunction、@OutputFunction 分别标记计算函数、合并结果函数和最终输出函数在 Plugin 处注册 Aggregation 函数
package com.tunna.spark.presto.aggudf;


import com.facebook.presto.spi.block.BlockBuilder;
import com.facebook.presto.spi.function.*;
import com.facebook.presto.spi.type.StandardTypes;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;

import static com.facebook.presto.spi.type.VarcharType.VARCHAR;

@AggregationFunction("tunan_concat") // Agg方法名
public class TunanAggregationUDF {

@InputFunction //输入函数
public static void input(StringValueState state,@SqlType(StandardTypes.VARCHAR) Slice value){

state.setStringValue(Slices.utf8Slice(isNull(state.getStringValue())+"|"+value.toStringUtf8()));
}

@CombineFunction //合并函数
public static void combine(StringValueState state1,StringValueState state2){
state1.setStringValue(Slices.utf8Slice(isNull(state1.getStringValue())+"|"+isNull(state2.getStringValue())));
}

@OutputFunction(StandardTypes.VARCHAR) //输出函数
public static void output(StringValueState state, BlockBuilder builder){
VARCHAR.writeSlice(builder,state.getStringValue());
}


// 判断null值
public static String isNull(Slice slice){
return slice ==null?"":slice.toStringUtf8();
}
}

注册函数

public class ProstoUDFPlugin implements Plugin {
@Override
public Set<Class<?>> getFunctions() {
return ImmutableSet.<Class<?>>builder()
.add(PrefixUDF.class)
.add(TunanAggregationUDF.class) // 新加的
.build();
}
}

Author: Tunan
Link: http://yerias.github.io/2020/05/04/presto/3/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.