Spark中的UDF

我们做模型想要上线,很多时候都会借助spark的udf来实现,最近在摸索这个东西,有了一点心得记录一下。

Hive/Spark SQL中的UDF

这种上线方式是经常会用到的,UDF分为三种:

  • UDF:一进对应一出
  • UDTF:一进对应多出,经常遇到的就是比如一行数组数据分为多行,类似explode lateral view
  • UDAF:多进对应一出,就是聚合函数,类似sum(),count()

UDF写完后直接在SQL中使用即可,一般写法又分为java和python两种:

  • Java:这种方式比较常见,引用hadoop/spark接口即可,最终形成一个jar包,可以包含多个模块,容易管理,借助maven工具也容易进行依赖管理。
  • Python:transform(..) using(..)的方式py的好处是线下代码直接迁移,不用大改(一般模型也是用python开发巨多),但坏处也有:
    • 依赖py文件的问题:py文件无法打包,模块之间不能复用,不太容易管理。依赖的py文件需要手动add file上去。
    • 依赖lib的问题:如果依赖第三方包,需要自定义python环境,然后打包上传,运行时选择自定义python环境。http://heloowird.com/2018/01/29/hive_python_udf/

Spark中的UDF

对于复杂的模型,例如需要特征工程之类的,我们可能就需要直接写spark代码来实现。如果核心算法有现成的spark包,例如xgboost这种还好说,如果是自己手写实现的算法,则需要将其包装成udf来实现分布式处理。

UDF也是支持是三种(这个我是基于pyspark的pandas udf来的):

  • 一对一的scalar

  • 多对多的Grouped Map

  • 多对一的Grouped Aggregate

同样的,这种方式也有java(确切来讲是scala)和python两种方式:

  • Scala(Spark):spark.udf.register(*),基本原理是实现一个function,然后注册成udf。function里的计算就是普通的计算。
  • Python(Pyspark):原理也是类似,有udf功能。更进一步的,spark 2.4.0之后提出了pandas udf,基于apache arrow,借助pandas向量计算的能力大幅提升计算性能,同时省去了py4j的序列化流程。https://www.jianshu.com/p/17117574a86b。python的坏处也和sql udf里一样,依赖问题比较蛋疼。
注意事项
  • UDF里的变量都是普通变量,不能是rdd,因为rdd是不能嵌套的,每个executor都在做udf计算,再来一个rdd,executor又得分发,这是不对的。
  • 如果想要从hdfs里获取某个文件分发给executor做udf计算,则需要在driver里先获取该文件,然后将其变为普通变量,用sc.broadcast将其分发到每个executor上。

Spark中的UDF
http://yoursite.com/2020/08/13/大数据/Spark/Spark中的UDF/
作者
Wei Lyu
发布于
2020年8月13日
许可协议