SparkSQL函数定义——UDF函数,窗口函数

这篇具有很好参考价值的文章主要介绍了SparkSQL函数定义——UDF函数,窗口函数。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

1 定义UDF函数

 1.1  返回值是数组类型的UDF定义

1.2 返回字典类型的UDF定义

2 窗口函数


1 定义UDF函数

目前python仅支持UDF

两种定义方式:

1. sparksession.udf.register()

注册的UDF可以用于DSL和SQL

返回值用于DSL风格,传参内给的名字用于SQL风格

        方法一语法:

udf对象 =  sparksession.udf.register( 参数1, 参数2, 参数3)

参数1:UDF名称,可用于SQL风格

参数2:被注册成UDF的方法名

参数3:声明UDF的返回值类型

udf 对象:返回值对象,是一个 UDF对象,可用于DSL风格

2.pyspark.sql.functions.udf

仅能用于DSL风格

        方式二语法:

udf对象 =  F.udf( 参数1, 参数2 )

参数1:被注册成UDF的方法名

参数2:声明UDF的返回值类型

udf 对象:返回值对象,是一个 UDF对象,可用于DSL风格

其中F是:

from pyspark.sql import functions as F

其中,被注册成UDF的方法名是指具体的计算方法,如:

def add( x, y ):x+y

add就是将要被注册成UDF的方法名

# coding:utf8
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
from pyspark.sql import functions as F
if __name__ == '__main__':
    # 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
    sc = spark.sparkContext

    # 构建一个RDD
    rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7]).map(lambda x: [x])
    df = rdd.toDF(["num"])

    def num_ride_10(num):
        return num * 10

    # 参数1: 注册的UDF名称, 这个udf名称,仅可以用于SQL风格
    # 参数2: UDF处理逻辑,是一个单独的方法
    # 参数3: UDF的返回值类型,注意,UDF注册时候,必须声明返回值类型,并且UDF的真实返回值一定要和声明返回值一样
    # 返回值对象: 这是一个UDF对象,仅可以用于DSL语法
    # 当前这种方式定义的UDF,可以通过参数1的名称用于SQL风格,通过返回值对象用的DSL风格
    udf2 = spark.udf.register('udf1', num_ride_10, IntegerType())

    # SQL风格中使用
    # selectExpr 以SELECT的表达式执行, 表达式SQL风格的表达式(字符串)
    # select方法, 接受普通的字符串段名,或者返回值是Column 对象的计算
    df.selectExpr("udf1(num)").show()

    # DSL风格中使用
    # 返回值UDF对象, 如果作为方法使用,传入的参数一定是 Column对象
    df.select(udf2(df['num'])). show()

    # 方式2注册, 仅能用于DSL风格
    udf_3 = F.udf(num_ride_10, IntegerType())
    df.select(udf_3(df['num'])).show()

# 输出
+---------+
|udf1(num)|
+---------+
|       10|
|       20|
|       30|
|       40|
|       50|
|       60|
|       70|
+---------+

+---------+
|udf1(num)|
+---------+
|       10|
|       20|
|       30|
|       40|
|       50|
|       60|
|       70|
+---------+

+----------------+
|num_ride_10(num)|
+----------------+
|              10|
|              20|
|              30|
|              40|
|              50|
|              60|
|              70|
+----------------+

 1.1  返回值是数组类型的UDF定义

# coding:utf8
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, ArrayType
from pyspark.sql import functions as F
if __name__ == '__main__':
    # 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
    sc = spark.sparkContext

    # 构建一个RDD
    rdd = sc.parallelize([["hadoop spark flink"], ["hadoop flink java"]])
    df = rdd.toDF(["line"])

    # 注册 UDF
    def split_line(data):
        return data.split(" ")  # 返回值是一个数组对象

    # 方式1 构建UDF
    udf1 = spark.udf.register('udf1', split_line, ArrayType(StringType()))
    # DSL风格
    df.select(udf1(df["line"])).show()

    # SQL风格
    df.createTempView("line")
    spark.sql("SELECT udf1(line) FROM line").show(truncate=False)

    # 方式2构建
    udf3 = F.udf(split_line, ArrayType(StringType()))
    df.select(udf3(df["line"])).show()

# 输出
+--------------------+
|          udf1(line)|
+--------------------+
|[hadoop, spark, f...|
|[hadoop, flink, j...|
+--------------------+

+----------------------+
|udf1(line)            |
+----------------------+
|[hadoop, spark, flink]|
|[hadoop, flink, java] |
+----------------------+

+--------------------+
|    split_line(line)|
+--------------------+
|[hadoop, spark, f...|
|[hadoop, flink, j...|
+--------------------+

1.2 返回字典类型的UDF定义

# coding:utf8
import string

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, ArrayType
from pyspark.sql import functions as F
if __name__ == '__main__':
    # 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
    sc = spark.sparkContext

    # 假设 有三个数字 1,2,3 我们传入数字, 返回数字所在序号对应的 字母 然后和数字结合形成dict返回
    # 比如传入 1 我们返回 {“num" : 1, "letters": "a"}

    rdd = sc.parallelize([[1], [2], [3]])
    df = rdd.toDF(["num"])

    def process(data):
        return {"num":data, "letters": string.ascii_letters[data]}


    # UDF的返回值是字典的话,需要用 StructType来接受

    udf1 = spark.udf.register("udf1", process, StructType().add("num", IntegerType(), nullable=True).\
                             add("letters", StringType(), nullable=True))
    df.selectExpr("udf1(num)").show(truncate=False)
    df.select(udf1(df['num'])).show(truncate=False)

# 输出结果
+---------+
|udf1(num)|
+---------+
|{1, b}   |
|{2, c}   |
|{3, d}   |
+---------+

+---------+
|udf1(num)|
+---------+
|{1, b}   |
|{2, c}   |
|{3, d}   |
+---------+

2 窗口函数

        开窗函数的引入是为了既显示聚集前的数据,又显示聚集后的数据。即在每一行的最后一列添加聚合函数的结果。开窗用于为定义一个窗口(这里的窗口是指运算将要操作的行的集合),它对一组值进行操作,不需要使用GROUP BY 子句 对数据进行分组,能够在同一行中同时返回基础行的列和聚合列

        聚合函数 和 开窗函数

聚合函数是将多行变成一行,count,acg....

开窗函数是将一行变成多行

聚合函数如果要显示其他的列必须将列加入到 group by中

开窗函数可以不使用group by, 直接将所有信息显示出来

        开窗函数分类

1.聚合开窗函数

聚合函数(列)OVER(选项),这里的选项可以是PARTITION BY 子句, 但不可以是 ORDER BY 句子

2.排序开窗函数

排序函数(列)OVER(选项),这里的选项可以是ORDER BY 子句 ,也可以是OVER( PARTITION BY子句 ORDER BY 子句  ),但不可以是PARTITION BY 子句

3.分区类型NTIL的窗口函数文章来源地址https://www.toymoban.com/news/detail-463143.html

# coding:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, ArrayType
from pyspark.sql import functions as F
if __name__ == '__main__':
    # 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
    sc = spark.sparkContext

    rdd = sc.parallelize([
        ("张三", "class_1", 99),
        ("张1", "class_2", 19),
        ("张2", "class_2", 19),
        ("张3", "class_2", 29),
        ("张4", "class_3", 49),
        ("张5", "class_3", 12),
        ("张6", "class_3", 14),
        ("张7", "class_3", 32),
        ("张8", "class_3", 22),
        ("张22", "class_4", 24),
        ("张11", "class_4", 49)
    ])
    schema = StructType().add("name", StringType()).\
        add("class", StringType()).\
        add("score", IntegerType())

    df = rdd.toDF(schema=schema)

    df.createTempView("stu")

    # 聚合窗口函数 的展示
    spark.sql(
        """
            SELECT *, avg(score) OVER() AS avg_score FROM stu
        """
    ).show()

    # 排序相关的 窗口函数计算
    # RANK over ,DENSE_RANK over  ROW_NUMBER over
    spark.sql("""
        SELECT *, ROW_NUMBER() OVER(ORDER BY score DESC) AS row_number_rank,
        DENSE_RANK() OVER(PARTITION BY class ORDER BY score DESC) AS dense_rank,
        RANK() OVER(order by score) AS rank
        FROM stu
    """).show()

    # NTILE
    spark.sql("""
        SELECT *, NTILE(6) OVER(ORDER BY score DESC) from stu
    """).show()
# 输出

+----+-------+-----+-----------------+
|name|  class|score|        avg_score|
+----+-------+-----+-----------------+
|张三|class_1|   99|33.45454545454545|
| 张1|class_2|   19|33.45454545454545|
| 张2|class_2|   19|33.45454545454545|
| 张3|class_2|   29|33.45454545454545|
| 张4|class_3|   49|33.45454545454545|
| 张5|class_3|   12|33.45454545454545|
| 张6|class_3|   14|33.45454545454545|
| 张7|class_3|   32|33.45454545454545|
| 张8|class_3|   22|33.45454545454545|
|张22|class_4|   24|33.45454545454545|
|张11|class_4|   49|33.45454545454545|
+----+-------+-----+-----------------+


+----+-------+-----+---------------+----------+----+
|name|  class|score|row_number_rank|dense_rank|rank|
+----+-------+-----+---------------+----------+----+
|张三|class_1|   99|              1|         1|  11|
| 张3|class_2|   29|              5|         1|   7|
| 张1|class_2|   19|              8|         2|   3|
| 张2|class_2|   19|              9|         2|   3|
| 张4|class_3|   49|              2|         1|   9|
| 张7|class_3|   32|              4|         2|   8|
| 张8|class_3|   22|              7|         3|   5|
| 张6|class_3|   14|             10|         4|   2|
| 张5|class_3|   12|             11|         5|   1|
|张11|class_4|   49|              3|         1|   9|
|张22|class_4|   24|              6|         2|   6|
+----+-------+-----+---------------+----------+----+


+----+-------+-----+-----------------------------------------------------------------------------------------------+
|name|  class|score|ntile(6) OVER (ORDER BY score DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)|
+----+-------+-----+-----------------------------------------------------------------------------------------------+
|张三|class_1|   99|                                                                                              1|
| 张4|class_3|   49|                                                                                              1|
|张11|class_4|   49|                                                                                              2|
| 张7|class_3|   32|                                                                                              2|
| 张3|class_2|   29|                                                                                              3|
|张22|class_4|   24|                                                                                              3|
| 张8|class_3|   22|                                                                                              4|
| 张1|class_2|   19|                                                                                              4|
| 张2|class_2|   19|                                                                                              5|
| 张6|class_3|   14|                                                                                              5|
| 张5|class_3|   12|                                                                                              6|
+----+-------+-----+-----------------------------------------------------------------------------------------------+

到了这里,关于SparkSQL函数定义——UDF函数,窗口函数的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 0基础学习PyFlink——用户自定义函数之UDF

    PyFlink中关于用户定义方法有: UDF:用户自定义函数。 UDTF:用户自定义表值函数。 UDAF:用户自定义聚合函数。 UDTAF:用户自定义表值聚合函数。 这些字母可以拆解如下: UD表示User Defined(用户自定义); F表示Function(方法); T表示Table(表); A表示Aggregate(聚合); Aggr

    2024年02月08日
    浏览(14)
  • SQL Server用户定义的函数(UDF)使用详解

    与编程语言中的函数一样,SQL Server 用户定义函数是接受参数、执行操作(如复杂计算)并将该操作的结果作为值返回的例程。返回值可以是单个标量值,也可以是结果集。 模块化编程。可以创建一次函数,将其存储在数据库中,并在程序中调用它任意次数。可以独立于程序

    2023年04月12日
    浏览(14)
  • 【Flink-1.17-教程】-【四】Flink DataStream API(3)转换算子(Transformation)【用户自定义函数(UDF)】

    用户自定义函数( user-defined function , UDF ),即用户可以根据自身需求,重新实现算子的逻辑。 用户自定义函数分为: 函数类 、 匿名函数 、 富函数类 。 Flink 暴露了所有 UDF 函数的接口,具体实现方式为接口或者抽象类,例如 MapFunction 、 FilterFunction 、 ReduceFunction 等。所

    2024年01月23日
    浏览(18)
  • 7、hive shell客户端与属性配置、内置运算符、函数(内置运算符与自定义UDF运算符)

    1、apache-hive-3.1.2简介及部署(三种部署方式-内嵌模式、本地模式和远程模式)及验证详解 2、hive相关概念详解–架构、读写文件机制、数据存储 3、hive的使用示例详解-建表、数据类型详解、内部外部表、分区表、分桶表 4、hive的使用示例详解-事务表、视图、物化视图、DDL

    2024年02月09日
    浏览(15)
  • python-自定义函数(定义调用、默认参数、返回值)

    本篇文章讲解了python中自定义函数的一些知识点,包括了函数的定义和调用,默认参数,函数返回,其中也添加了比较高级的用法,能适应任何场合 函数是什么:函数是一段可执行的代码块,用于执行特定的任务或完成特定的操作。它由函数名、参数(可选)和函数体组成。

    2024年02月09日
    浏览(17)
  • 微信小程序自定义函数返回值

    两种自定义函数返回值,你们更喜欢那个

    2024年02月12日
    浏览(18)
  • 【Python入门篇】——Python函数(函数介绍,函数的定义,函数的参数和函数的返回值)

    作者简介: 辭七七,目前大一,正在学习C/C++,Java,Python等 作者主页: 七七的个人主页 文章收录专栏: Python入门,本专栏主要内容为Python的基础语法,Python中的选择循环语句,Python函数,Python的数据容器等。 欢迎大家点赞 👍 收藏 ⭐ 加关注哦!💖💖 函数:是组织好的

    2024年02月08日
    浏览(35)
  • C语言:当函数定义时遗漏函数返回值类型以及函数遗漏return语句

    相关阅读 C语言 https://blog.csdn.net/weixin_45791458/category_12423166.html?spm=1001.2014.3001.5482          函数定义时需要明确给出返回值的类型,比如int main();表示主函数返回一个整数值,void func();表示func函数不返回值(但会函数也会返回,这是需要区分的)。         void main();这种写法

    2024年02月19日
    浏览(13)
  • 05-python之函数-函数的定义/函数的参数/函数返回值/函数说明文档/函数的嵌套使用/函数变量的作用域

    对应输出如上,没有使用len()函数,对应的子算出字符的长度,但是代码整体写的就很别扭。代码过于重复,代码中唯一不一样的地方就是被统计的字符串不同。同时对应的,代码整体也就会比较低效。可以使用函数,优化过程,先定义函数。 同样的输出,效果一样,两者

    2024年01月19日
    浏览(21)
  • VScode函数跳转,再返回,快捷键设置和自定义

    Win10: ctrl + 鼠标左键 Ubuntu:同上操作 win10: alt + leftarrow ← leftarrow ← ubuntu下: ctrl + alt + - 注意ubuntu下的, - 使用数字小键盘好像不咋好使,使用主键盘区的 - 才可以! 🤷‍♂️因此,在ubuntu下进行自定义设置返回原处按键,设置成和win10下面的操作一致,即 alt + leftarr

    2024年02月11日
    浏览(14)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包