Presto函数
在 Presto 中,函数大体分为三种:scalar,aggregation 和 window 类型。分别如下:
1)scalar标量函数,简单来说就是 Java 中的一个静态方法,本身没有任何状态。
2)aggregation累积状态的函数,或聚集函数,如count,avg。如果只是单节点,单机状态可以直接用一个变量存储即可,但是presto是分布式计算引擎,状态数据会在多个节点之间传输,因此状态数据需要被序列化成 Presto 的内部格式才可以被传输。
3)window 窗口函数,如同sparkSQL中的窗口函数类似
官网地址:https://prestodb.github.io/docs/current/develop/functions.html
自定义Scalar函数的实现
定义一个java类
用 @ScalarFunction 的 Annotation 标记实现业务逻辑的静态方法。
用 @Description 描述函数的作用,这里的内容会在 SHOW FUNCTIONS 中显示。
用@SqlType 标记函数的返回值类型,如返回字符串,因此是 StandardTypes.VARCHAR。
Java 方法的返回值必须使用 Presto 内部的序列化方式,因此字符串类型必须返回 Slice, 使用 Slices.utf8Slice 方法可以方便的将 String 类型转换成 Slice 类型
package com.tunna.spark.presto.udf;
import com.facebook.presto.spi.function.Description; import com.facebook.presto.spi.function.ScalarFunction; import com.facebook.presto.spi.function.SqlType; import com.facebook.presto.spi.type.StandardTypes; import io.airlift.slice.Slice; import io.airlift.slice.Slices;
public class PrefixUDF {
@Description("输入的数据加上前缀") @ScalarFunction("tunan_prefix") @SqlType(StandardTypes.VARCHAR) public static Slice prefix(@SqlType(StandardTypes.VARCHAR)Slice input){ return Slices.utf8Slice("tunan_prefix_"+input.toStringUtf8()); } }
|
Presto插件机制
presto不能像hive那样配置自定义的udf,要采用这种插件机制实现。Presto 的插件(Plugin)机制,是 Presto 能够整合多种数据源的核心。通过实现不同的 Plugin,Presto 允许用户在不同类型的数据源之间进行 JOIN 等计算。Presto 内部的所有数据源都是通过插件机制实现, 例如 MySQL、Hive、HBase等。Presto 插件机制不仅通过加载 Connector 来实现不同数据源的访问,还通过加载 FunctionFactory 来实现 UDF 的加载。 Presto 的 Plugin 遵循 Java 中的 ServiceLoader 规范, 实现非常简单。
实现一个plugin接口如:
package com.tunna.spark.presto.udf;
import com.facebook.presto.spi.Plugin; import com.google.common.collect.ImmutableSet;
import java.util.Set;
public class ProstoUDFPlugin implements Plugin { @Override public Set<Class<?>> getFunctions() { return ImmutableSet.<Class<?>>builder() .add(PrefixUDF.class) .build(); } }
|
注册函数
在resources下创建META-INF/services目录,创建文件com.facebook.presto.spi.Plugin,拷贝Plugin的全限定名
com.tunna.spark.presto.udf.ProstoUDFPlugin
|
最后在presto的plugin目录下创建我们自己的目录,并且打包上传到我们自己的目录下,需要重启presto才能将jar中的自定义函数加载进去,如果有多个依赖,都需要放在我们创建的目录下
自定义Aggregation函数的实现
实现原理步骤
Presto 把 Aggregation 函数分解成三个步骤执行:
input(state, data): 针对每条数据,执行 input 函数。这个过程是并行执行的,因此在每个有数据的节点都会执行,最终得到多个累积的状态数据。
combine(state1, state2):将所有节点的状态数据聚合起来,多次执行,直至所有状态数据被聚合成一个最终状态,也就是 Aggregation 函数的输出结果。
output(final_state, out):最终输出结果到一个 BlockBuilder。
具体代码实现过程
- 一个继承AccumulatorState的State接口,自定义get和set方法
- 定义一个 Java 类,使用 @AggregationFunction 标记为 Aggregation 函数
- 使用 @InputFunction、 @CombineFunction、@OutputFunction 分别标记计算函数、合并结果函数和最终输出函数在 Plugin 处注册 Aggregation 函数
package com.tunna.spark.presto.aggudf;
import com.facebook.presto.spi.block.BlockBuilder; import com.facebook.presto.spi.function.*; import com.facebook.presto.spi.type.StandardTypes; import io.airlift.slice.Slice; import io.airlift.slice.Slices;
import static com.facebook.presto.spi.type.VarcharType.VARCHAR;
@AggregationFunction("tunan_concat") public class TunanAggregationUDF {
@InputFunction public static void input(StringValueState state,@SqlType(StandardTypes.VARCHAR) Slice value){
state.setStringValue(Slices.utf8Slice(isNull(state.getStringValue())+"|"+value.toStringUtf8())); }
@CombineFunction public static void combine(StringValueState state1,StringValueState state2){ state1.setStringValue(Slices.utf8Slice(isNull(state1.getStringValue())+"|"+isNull(state2.getStringValue()))); }
@OutputFunction(StandardTypes.VARCHAR) public static void output(StringValueState state, BlockBuilder builder){ VARCHAR.writeSlice(builder,state.getStringValue()); }
public static String isNull(Slice slice){ return slice ==null?"":slice.toStringUtf8(); } }
|
注册函数
public class ProstoUDFPlugin implements Plugin { @Override public Set<Class<?>> getFunctions() { return ImmutableSet.<Class<?>>builder() .add(PrefixUDF.class) .add(TunanAggregationUDF.class) // 新加的 .build(); } }
|