[实时流基础 flink] 窗口函数

这篇具有很好参考价值的文章主要介绍了[实时流基础 flink] 窗口函数。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

尚硅谷学习笔记

6.5 窗口函数

[实时流基础 flink] 窗口函数,大后端,flink,java,spring

增量聚合函数(ReduceFunction / AggregateFunction)

窗口将数据收集起来,最基本的处理操作当然就是进行聚合。我们可以每来一个数据就在之前结果上聚合一次,这就是“增量聚合”。
典型的增量聚合函数有两个:ReduceFunction和AggregateFunction。

ReduceFunction可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样。
Flink Window API中的aggregate就突破了这个限制,可以定义更加灵活的窗口聚合操作。这个方法需要传入一个AggregateFunction的实现类作为参数。
AggregateFunction可以看作是ReduceFunction的通用版本,这里有三种类型:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。输入类型IN就是输入流中元素的数据类型;累加器类型ACC则是我们进行聚合的中间状态类型;而输出类型当然就是最终计算结果的类型了。

全窗口函数(full window functions)

我们还需要有更丰富的窗口计算方式。窗口操作中的另一大类就是全窗口函数。与增量聚合函数不同,全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。
在Flink中,全窗口函数也有两种:WindowFunction和ProcessWindowFunction。
1)窗口函数(WindowFunction)
WindowFunction字面上就是“窗口函数”,它其实是老版本的通用窗口函数接口。我们可以基于WindowedStream调用.apply()方法,传入一个WindowFunction的实现类。

stream
.keyBy(<key selector>)
.window(<window assigner>)
.apply(new MyWindowFunction());

这个类中可以获取到包含窗口所有数据的可迭代集合(Iterable),还可以拿到窗口(Window)本身的信息。
不过WindowFunction能提供的上下文信息较少,也没有更高级的功能。事实上,它的作用可以被ProcessWindowFunction全覆盖,所以之后可能会逐渐弃用。
2)处理窗口函数(ProcessWindowFunction)
ProcessWindowFunction是Window API中最底层的通用窗口函数接口。之所以说它“最底层”,是因为除了可以拿到窗口中的所有数据之外,ProcessWindowFunction还可以获取到一个“上下文对象”(Context)。这个上下文对象非常强大,不仅能够获取窗口信息,还可以访问当前的时间和状态信息。这里的时间就包括了处理时间(processing time)和事件时间水位线(event time watermark)。这就使得ProcessWindowFunction更加灵活、功能更加丰富,其实就是一个增强版的WindowFunction。
事实上,ProcessWindowFunction是Flink底层API——处理函数(process function)中的一员,关于处理函数我们会在后续章节展开讲解。
代码实现如下:

public class WindowProcessDemo {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("hadoop102", 7777)
                .map(new WaterSensorMapFunction());

        KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());

        // 1. 窗口分配器
        WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));

        SingleOutputStreamOperator<String> process = sensorWS
                .process(
                        new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
                            @Override
                            public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                                long count = elements.spliterator().estimateSize();
                                long windowStartTs = context.window().getStart();
                                long windowEndTs = context.window().getEnd();
                                String windowStart = DateFormatUtils.format(windowStartTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                String windowEnd = DateFormatUtils.format(windowEndTs, "yyyy-MM-dd HH:mm:ss.SSS");

                                out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());
                            }
                        }
                );

        process.print();

        env.execute();
    }
}
增量聚合和全窗口函数的结合使用
public class WindowAggregateAndProcessDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("hadoop102", 7777)
                .map(new WaterSensorMapFunction());


        KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());

        // 1. 窗口分配器
        WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));

        // 2. 窗口函数:
        /**
         * 增量聚合 Aggregate + 全窗口 process
         * 1、增量聚合函数处理数据: 来一条计算一条
         * 2、窗口触发时, 增量聚合的结果(只有一条) 传递给 全窗口函数
         * 3、经过全窗口函数的处理包装后,输出
         *
         * 结合两者的优点:
         * 1、增量聚合: 来一条计算一条,存储中间的计算结果,占用的空间少
         * 2、全窗口函数: 可以通过 上下文 实现灵活的功能
         */

//        sensorWS.reduce()   //也可以传两个

        SingleOutputStreamOperator<String> result = sensorWS.aggregate(
                new MyAgg(),
                new MyProcess()
        );

        result.print();



        env.execute();
    }

    public static class MyAgg implements AggregateFunction<WaterSensor, Integer, String>{

        @Override
        public Integer createAccumulator() {
            System.out.println("创建累加器");
            return 0;
        }

        @Override
        public Integer add(WaterSensor value, Integer accumulator) {
            System.out.println("调用add方法,value="+value);
            return accumulator + value.getVc();
        }

        @Override
        public String getResult(Integer accumulator) {
            System.out.println("调用getResult方法");
            return accumulator.toString();
        }

        @Override
        public Integer merge(Integer a, Integer b) {
            System.out.println("调用merge方法");
            return null;
        }
    }

	 // 全窗口函数的输入类型 = 增量聚合函数的输出类型
    public static class MyProcess extends ProcessWindowFunction<String,String,String,TimeWindow>{

        @Override
        public void process(String s, Context context, Iterable<String> elements, Collector<String> out) throws Exception {
            long startTs = context.window().getStart();
            long endTs = context.window().getEnd();
            String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
            String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

            long count = elements.spliterator().estimateSize();

            out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());

        }
    }
}

** 6.6 其他API**

触发器(Trigger)

触发器主要是用来控制窗口什么时候触发计算。所谓的“触发计算”,本质上就是执行窗口函数,所以可以认为是计算得到结果并输出的过程。
基于WindowedStream调用.trigger()方法,就可以传入一个自定义的窗口触发器(Trigger)。

stream.keyBy(...)
.window(...)
.trigger(new MyTrigger())
移除器(Evictor)

移除器主要用来定义移除某些数据的逻辑。基于WindowedStream调用.evictor()方法,就可以传入一个自定义的移除器(Evictor)。Evictor是一个接口,不同的窗口类型都有各自预实现的移除器。文章来源地址https://www.toymoban.com/news/detail-848605.html

stream.keyBy(...)
.window(...)
.evictor(new MyEvictor())

到了这里,关于[实时流基础 flink] 窗口函数的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • Flink窗口函数

    Flink窗口函数

    Flink窗口函数是指对数据流中的数据进行分组和聚合操作的函数。 FlinkSQL支持对一个特定的窗口的聚合。例如有用户想统计在过去的1分钟内有多少用户点击了某个的网页。在这种情况下,我们可以定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算

    2023年04月24日
    浏览(8)
  • 【API篇】八、Flink窗口函数

    【API篇】八、Flink窗口函数

    上一节的窗口分配器,指明了窗口类型,知道了数据属于哪个窗口并收集。而窗口函数,则是定义如何对这些数据做计算操作。 增量聚合 : 来一条数据,计算一条数据 ,窗口触发的时候输出计算结果 全窗口函数 : 数据来了不计算,存起来 ,窗口触发的时候,计算并输出

    2024年02月08日
    浏览(7)
  • 大数据-玩转数据-Flink窗口函数

    前面指定了窗口的分配器, 接着我们需要来指定如何计算, 这事由window function来负责. 一旦窗口关闭, window function 去计算处理窗口中的每个元素. window function 可以是ReduceFunction,AggregateFunction,or ProcessWindowFunction中的任意一种. ReduceFunction,AggregateFunction更加高效, 原因就是Flink可以对

    2024年02月11日
    浏览(10)
  • Flink--7、窗口(窗口的概念、分类、API、分配器、窗口函数)、触发器、移除器

    Flink--7、窗口(窗口的概念、分类、API、分配器、窗口函数)、触发器、移除器

                           星光下的赶路人star的个人主页                        内心的平静始于不再让他人掌控你的感情 在批处理统计中,我们可以等待一批数据都到齐后,统一处理。但是在实时处理统计中,我们是来一

    2024年02月08日
    浏览(12)
  • Flink 窗口(1)—— 基础概念

    Flink 窗口(1)—— 基础概念

    窗口:将无限数据切割成有限的“数据块”进行处理,以便更高效地处理无界流 在处理无界数据流时,把无界流进行切分,每一段数据分别进行聚合,结果只输出一次。这就相当于将无界流的聚合转化为了有界数据集的聚合 在 Flink 中,窗口可以把流切割成有限大小的多个“

    2024年02月04日
    浏览(29)
  • Flink与Spring Boot集成实践:搭建实时数据处理平台

    在当今数据风暴的时代,实时数据处理已经成为众多企业关注的热点。Apache Flink作为一个高性能、可扩展的实时计算框架,在实时数据处理领域占据着举足轻重的地位。Spring Boot则以其快速开发、简化配置而广受欢迎,将两者结合,我们可以快速地搭建起一个实时数据处理平

    2024年04月27日
    浏览(51)
  • [flink 实时流基础]源算子和转换算子

    [flink 实时流基础]源算子和转换算子

    Flink可以从各种来源获取数据,然后构建DataStream进行转换处理。一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator)。所以,source就是我们整个处理程序的输入端。 在Flink1.12以前,旧的添加source的方式,是调用执行环境的addSource()方

    2024年04月11日
    浏览(11)
  • 使用flink实现《实时数据分析》的案例 java版

    本文档介绍了使用Java和Flink实现实时数据分析的案例。该案例使用Flink的流处理功能,从Kafka主题中读取数据,进行实时处理和分析,并将结果输出到Elasticsearch中。 Java 8 Flink 1.13.2 Kafka 2.8.0 Elasticsearch 7.13.4 本案例使用Kafka作为数据源,从一个名为 user_behavior 的主题中读取数据。

    2024年02月08日
    浏览(10)
  • 使用flink实现《实时监控和日志分析》的案例 java版

    本文档介绍了使用Java和Flink实现实时监控和日志分析的案例。该案例旨在通过实时监控和日志分析来提高系统的可靠性和性能。 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kjPKQuIf-1686052913444)(./architecture.png)] 如上图所示,该系统由以下组件组成

    2024年02月06日
    浏览(11)
  • [大数据 Flink,Java实现不同数据库实时数据同步过程]

    目录 🌮前言: 🌮实现Mysql同步Es的过程包括以下步骤: 🌮配置Mysql数据库连接 🌮在Flink的配置文件中,添加Mysql数据库的连接信息。可以在flink-conf.yaml文件中添加如下配置: 🌮在Flink程序中,使用JDBCInputFormat来连接Mysql数据库,并定义查询语句,获取需要同步的数据。具体代

    2024年02月10日
    浏览(13)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包