说说FLINK细粒度滑动窗口如何处理

这篇具有很好参考价值的文章主要介绍了说说FLINK细粒度滑动窗口如何处理。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

分析&回答

Flink的窗口机制是其底层核心之一,也是高效流处理的关键。Flink窗口分配的基类是WindowAssigner抽象类,下面的类图示出了Flink能够提供的所有窗口类型。

说说FLINK细粒度滑动窗口如何处理,大数据,flink,java,大数据

Flink窗口分为滚动(tumbling)、滑动(sliding)和会话(session)窗口三大类,本文要说的是滑动窗口。

下图示出一个典型的统计用户访问的滑动窗口,来自官方文档。

说说FLINK细粒度滑动窗口如何处理,大数据,flink,java,大数据

假设每两条虚线之间代表1分钟时间差,那么窗口大小(size)就是2分钟,滑动步长(slide)是1分钟。若时间特征为事件时间,代码如下。

dataStream .keyBy("userId") .window(SlidingEventTimeWindows.of(Time.minutes(2), Time.minutes(1))); 由图可知,当前滑动窗口与上一个滑动窗口会有重叠。在窗口大小size是步长slide的2倍的情况下,(几乎)每个DataStream元素都会处于2个窗口内。

我们简单参考一下相关的Flink源码,以加深理解。以下是窗口算子WindowOperator的processElement()方法的部分源码。

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        final Collection<W> elementWindows = windowAssigner.assignWindows(
            element.getValue(), element.getTimestamp(), windowAssignerContext);
        boolean isSkippedElement = true;
        final K key = this.<K>getKeyedStateBackend().getCurrentKey();
 
        if (windowAssigner instanceof MergingWindowAssigner) {
            // 会话窗口的处理逻辑,略去
        } else {
            for (W window : elementWindows) {
                if (isWindowLate(window)) {
                    continue;
                }
                isSkippedElement = false;
                windowState.setCurrentNamespace(window);
                windowState.add(element.getValue());
 
                triggerContext.key = key;
                triggerContext.window = window;
                TriggerResult triggerResult = triggerContext.onElement(element);
 
                if (triggerResult.isFire()) {
                    ACC contents = windowState.get();
                    if (contents == null) {
                        continue;
                    }
                    emitWindowContents(window, contents);
                }
                if (triggerResult.isPurge()) {
                    windowState.clear();
                }
                registerCleanupTimer(window);
            }
        }
        // 最后是侧输出迟到数据的逻辑,略去
    }
复制代码

该方法先调用WindowAssigner.assignWindows()方法,根据输入元素的时间戳判断它应该属于哪些窗口。接着遍历所有窗口,将该元素加入对应的窗口状态(即缓存)中,并根据触发器返回的TriggerResult决定是输出(fire)还是清除(purge)窗口的内容,emitWindowContents()方法会调用用户函数。最后,还要调用registerCleanupTimer()方法注册计时器用来在窗口彻底过期时清除窗口状态。

以下是SlidingEventTimeWindows.assignWindows()方法的源码。

    @Override
    public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
        if (timestamp > Long.MIN_VALUE) {
            List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
            long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
            for (long start = lastStart;
                 start > timestamp - size;
                 start -= slide) {
                windows.add(new TimeWindow(start, start + size));
            }
            return windows;
        } else {
            throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
                "Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
                "'DataStream.assignTimestampsAndWatermarks(...)'?");
        }
    }
 
    public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
        return timestamp - (timestamp - offset + windowSize) % windowSize;
    }
复制代码

这段代码就不难理解了,先调用getWindowStartWithOffset()方法根据元素的时间戳计算出其窗口的起点时间戳,再逐次循环向后滑动,产生size / slide个窗口。我们可以将size / slide叫做“粒度”,亦即上述代码中返回的Collection集合的大小。粒度越大(“细”),滑动窗口之间的重合也越大。

代码读完了,有一个貌似稀松平常的需求:

以3分钟的频率实时计算App内各个子模块近24小时的PV和UV。

直觉上我们需要用粒度为1440 / 3 = 480的滑动窗口来实现它,但是细粒度的滑动窗口会带来性能问题,有两点:

状态 由代码可知,WindowOperator内维护了窗口本身的内部状态windowState(类型为InternalAppendingState)。对于一个元素,会将其写入对应的(key, window)二元组所圈定的状态中。可见,如果粒度为480,那么每个元素到来,更新windowState时都要遍历480个窗口并写入,开销是非常大的。在采用HDFS/RocksDB作为状态后端时,checkpoint的瓶颈也尤其明显。

定时器 在Flink中,定时器的实际实现是TimerHeapInternalTimer类,并且是用Flink自己实现的优先队列维护在堆内存中的。而在WindowOperator中,每一个(key, window)二元组都需要注册两个定时器:一是触发器注册的定时器,用于决定窗口数据何时输出;二是registerCleanupTimer()方法注册的清理定时器,用于在窗口彻底过期(如allowedLateness过期)之后及时清理掉窗口的内部状态。细粒度滑动窗口会造成维护的定时器增多,内存负担加重。

在官方文档Windows最后一节的最后,也有如下的提醒:

Flink creates one copy of each element per window to which it belongs. Given this, tumbling windows keep one copy of each element (an element belongs to exactly one window unless it is dropped late). In contrast, sliding windows create several of each element, as explained in the Window Assigners section. Hence, a sliding window of size 1 day and slide 1 second might not be a good idea.

可能有看官会问:预聚合不能解决细粒度窗口的问题吗?答案是不能。预聚合只是让AggregateFunction/ReduceFunction之后的数据量降低,但是进入WindowOperator的窗口状态的数据还是没变的。换句话说,就算触发器实现为FIRE_AND_PURGE,遍历大量窗口并写入状态的开销也是无法消除的。

扯了这么多,有解决方案吗?

当然是有的,办法总比困难多。我们一般使用 滚动窗口+在线存储+读时聚合 的思路作为workaround。简单来讲就是:

弃用滑动窗口,用长度等于原滑动窗口步长的滚动窗口代替; 每个滚动窗口将其周期内的数据做聚合,打入外部在线存储(内存数据库如Redis,LSM-based NoSQL存储如HBase); 扫描在线存储中对应时间区间(可以灵活指定)的所有行,并将计算结果返回给前端展示。 针对上面的PV/UV问题,如果采用Redis作为在线存储,我们可以将时间戳放在key内,并设定24小时过期时间。用数字字符串存储3分钟周期内的PV量,用HyperLogLog存储3分钟周期内的UV量。近24小时的PV和UV就分别可以通过简单加减和HyperLogLog的pfmerge/pfcount命令得出了。当然,实际操作起来还是要根据需求和服务器资源而定。

喵呜面试助手:一站式解决面试问题,你可以搜索微信小程序 [喵呜面试助手] 或关注 [喵呜刷题] -> 面试助手 免费刷题。如有好的面试知识或技巧期待您的共享!文章来源地址https://www.toymoban.com/news/detail-686696.html

到了这里,关于说说FLINK细粒度滑动窗口如何处理的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • Apache Flink连载(二十八):Flink细粒度资源管理(1)-适用场景和原理

    Apache Flink连载(二十八):Flink细粒度资源管理(1)-适用场景和原理

    🏡 个人主页:IT贫道-CSDN博客  🚩 私聊博主:私聊博主加WX好友,获取更多资料哦~  🔔 博主个人B栈地址:豹哥教你学编程的个人空间-豹哥教你学编程个人主页-哔哩哔哩视频 目录

    2024年02月19日
    浏览(10)
  • 【Flink实战】Flink hint更灵活、更细粒度的设置Flink sql行为与简化hive连接器参数设置

    SQL 提示(SQL Hints)是和 SQL 语句一起使用来改变执行计划的。本章介绍如何使用 SQL 提示来实现各种干预。 SQL 提示一般可以用于以下: 增强 planner:没有完美的 planner, SQL 提示让用户更好地控制执行; 增加元数据(或者统计信息):如\\\"已扫描的表索引\\\"和\\\"一些混洗键(shu

    2024年04月25日
    浏览(12)
  • 记录Flink 线上碰到java.lang.OutOfMemoryError: GC overhead limit exceeded如何处理?

    记录Flink 线上碰到java.lang.OutOfMemoryError: GC overhead limit exceeded如何处理?

    这个问题是Flink TM内存中我们常见的,看到这个问题我们就要想到下面这句话: 程序在垃圾回收上花了很多时间,却收集一点点内存,伴随着会出现CPU的升高。 是不是大家出现这个问题都会出现上面这种情况呢。那我的问题出现如下: 发现JVM Heap堆内存过高。那么堆内存包含

    2024年02月03日
    浏览(10)
  • Flink流数据窗口与时间

    随着大数据时代的到来,流处理技术变得越来越重要。流处理系统可以实时地处理大量数据,为实时应用提供有价值的信息。Apache Flink是一个流处理框架,它可以处理大规模的流数据,并提供丰富的功能,如窗口操作、时间操作等。在本文中,我们将深入探讨Flink流数据窗口

    2024年02月20日
    浏览(10)
  • 大数据-玩转数据-Flink窗口函数

    前面指定了窗口的分配器, 接着我们需要来指定如何计算, 这事由window function来负责. 一旦窗口关闭, window function 去计算处理窗口中的每个元素. window function 可以是ReduceFunction,AggregateFunction,or ProcessWindowFunction中的任意一种. ReduceFunction,AggregateFunction更加高效, 原因就是Flink可以对

    2024年02月11日
    浏览(12)
  • flink 最后一个窗口一直没有新数据,窗口不关闭问题

    flink 最后一个窗口一直没有新数据,窗口不关闭问题

    窗口类型:滚动窗口 代码: 代码部分逻辑说明 若设置了自动生成watermark 参数,根据打印日志,设置对应的时间(多久没新数据写入,触发窗口计算) env.getConfig().setAutoWatermarkInterval(5000); 使用自定义的watermark: watermark 周期生成()的疑问: 1、默认200ms,会连续生成4次后,

    2024年01月18日
    浏览(10)
  • 大数据-玩转数据-Flink时间滚动动窗口

    大数据-玩转数据-Flink时间滚动动窗口

    在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。在这种情况下,我们必须定义一个窗口,用来收集

    2024年02月11日
    浏览(11)
  • (增加细粒度资源管理)深入理解flink的task slot相关概念

    (增加细粒度资源管理)深入理解flink的task slot相关概念

    之前对flink的task slot的理解太浅了,重新捋一下相关知识点 我们知道,flink中每个TaskManager都是一个 JVM 进程,可以在单独的线程中执行一个或多个 subtask(线程)。 但是TaskManager 的计算资源是有限的,并不是所有任务都可以放在同一个 TaskManager 上并行执行。并行的任务越多

    2024年03月11日
    浏览(15)
  • 说说Flink运行模式

    说说Flink运行模式

    1.开发者模式     在idea中运行Flink程序的方式就是开发模式。 2.local-cluster模式     Flink中的Local-cluster(本地集群)模式,单节点运行,主要用于测试, 学习。 3.Standalone模式         独立集群模式,由Flink自身提供计算资源。 4.Yarn模式 把Flink应用提交给Yarn的ResourceManager Flin

    2024年02月10日
    浏览(11)
  • 说说Flink双流join

    说说Flink双流join

    Flink双流JOIN主要分为两大类 一类是基于原生State的Connect算子操作 另一类是基于窗口的JOIN操作。其中基于窗口的JOIN可细分为window join和interval join两种。 基于原生State的Connect算子操作 实现原理:底层原理依赖Flink的State状态存储,通过将数据存储到State中进行关联join, 最终输出

    2024年02月10日
    浏览(10)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包