关于flink滚动窗口下数据乱序+倾斜,allowedLateness的一个坑

这篇具有很好参考价值的文章主要介绍了关于flink滚动窗口下数据乱序+倾斜,allowedLateness的一个坑。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

前言

        滚动窗口(Tumbling Windows)

        allowedLateness

场景描述

数据倾斜问题解决

输出结果偏差问题

        思考

输出结果偏差解决

扩展


前言

      滚动窗口(Tumbling Windows)

        滚动窗口的 assigner 分发元素到指定大小的窗口。滚动窗口的大小是固定的,且各自范围之间不重叠。 比如说,如果你指定了滚动窗口的大小为 5 分钟,那么每 5 分钟就会有一个窗口被计算。

      allowedLateness

        在使用 event-time 窗口时,数据可能会迟到,即 Flink 用来追踪 event-time 进展的 watermark 已经 越过了窗口结束的 timestamp 后,数据才到达。默认情况下,watermark 一旦越过窗口结束的 timestamp,迟到的数据就会被直接丢弃。 但是 Flink 允许指定窗口算子最大的 allowed lateness。 Allowed lateness 定义了一个元素可以在迟到多长时间的情况下不被丢弃,这个参数默认是 0。 在 watermark 超过窗口末端、到达窗口末端加上 allowed lateness 之前的这段时间内到达的元素, 依旧会被加入窗口。取决于窗口的 trigger,一个迟到但没有被丢弃的元素可能会再次触发窗口,比如 EventTimeTrigger

 可以像下面这样指定 allowed lateness

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .allowedLateness(<time>)
    .<windowed transformation>(<window function>);

场景描述:

上游数据有一定程度的乱序、并经过keyby之后会有数据倾斜的现象。

数据倾斜问题解决:

1、mapFuncation中的对key进行散列(窗口前)。

 Joiner.on("_").join(data[0], new Random().nextInt(3))

2、还原key,并进行二次keyby(窗口后)。 

aggregate.map(new MapFunction<Tuple3<Long, Integer, String>, Tuple3<Long, Integer, String>>() {
                    @Override
                    //in:windowTime,counter,randomKey
                    //out:windowTime,counter,id
                    public Tuple3<Long, Integer, String> map(Tuple3<Long, Integer, String> value) throws Exception {
                        String[] randomKey = value.f2.split("_");
                        String id = randomKey[0];
                        return Tuple3.of(value.f0, value.f1, id);
                    }
                }).keyBy(new KeySelector<Tuple3<Long, Integer, String>, Tuple2<Long, String>>() {
                    @Override
                    public Tuple2<Long, String> getKey(Tuple3<Long, Integer, String> value) throws Exception {
                        //tuple2<windowEndTs,id>
                        return Tuple2.of(value.f0, value.f2);
                    }
                })

3、之后使用KeyedProcessFunction进行counter。

输出结果偏差问题

由于使用了allowedLateness、造成最终在KeyedProcessFunction中的汇总结果与预期偏差

思考

下面是eventTime默认触发器源码

    @Override
    public TriggerResult onElement(
            Object element, long timestamp, TimeWindow window, TriggerContext ctx)
            throws Exception {
        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            // if the watermark is already past the window fire immediately
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
            return TriggerResult.CONTINUE;
        }
    }

        这是因为使用了allowedLateness之后,窗口内的状态信息不会在窗口关闭时间清除,而是在watemark到达allowedLateness之后再清除。        

        这个onElement方法是来一个事件会执行一次,其中window.mapTimeStamp就是当前数据的eventTime的最大窗口时间。可以看到当它<=当前水位线,就触发窗口计算,但没有清除窗口内状态信息。因为我们是数据倾斜+数据乱序的场景。后面我们还会有个KeyedProcessFunction进行最终的聚合计算。这就造成了数据重复计算。

TriggerResult有四个属性:

    CONTINUE  (什么也不做)
    FIRE_AND_PURGE  (触发计算并清除窗口内状态信息)
    FIRE  (触发计算)
    PURGE  (清除窗口内状态信息)

输出结果偏差解决

直接上代码

public class EventTimePurgeTrigger extends Trigger<Object, TimeWindow> {
    @Override
    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            return TriggerResult.FIRE_AND_PURGE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
            return TriggerResult.CONTINUE;

        }
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return time == window.maxTimestamp() ? TriggerResult.FIRE_AND_PURGE : TriggerResult.CONTINUE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteEventTimeTimer(window.maxTimestamp());
    }
}

在onElement中和onEvenetTime中使用TriggerResult.FIRE_AND_PURGE。

扩展:

        在allowedLateness允许迟到范围内如果有大量数据迟到,因为每来一条迟到数据就触发一次计算,服务器负载会特别高。这就需要重写以下代码(目前博主还没有研究如何修改):文章来源地址https://www.toymoban.com/news/detail-832816.html

        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            return TriggerResult.FIRE_AND_PURGE;
        } 

到了这里,关于关于flink滚动窗口下数据乱序+倾斜,allowedLateness的一个坑的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • flink的MaxOutOfOrderness 和 Allowedlateness 区别

    MaxOutOfOrderness 和 Allowedlateness 都是为了对乱序数据进行处理,但是经常会混淆着两个概念,需要进行一些区分。 source.map(...//省略不必要代码)       // 定义 watermark       .assignTimestampsAndWatermarks(             //  设置 watermark 比 事件时间晚 1s             

    2024年04月18日
    浏览(19)
  • 如何解决Flink任务的数据倾斜

    如何解决flink任务的数据倾斜问题 Flink 任务的数据倾斜问题可以通过以下几种方法来解决: 使用滑动窗口:滑动窗口可以将窗口划分成多个子窗口,从而使数据更加均衡地分配到不同的计算节点中。同时,滑动窗口还可以使窗口内的数据更加连续,从而减少数据倾斜的情况。

    2024年02月14日
    浏览(21)
  • Hive & Spark & Flink 数据倾斜

    绝大部分任务都很快完成,只有一个或者少数几个任务执行的很慢甚至最终执行失败, 这样的现象为数据倾斜现象。 任务进度长时间维持在 99%或者 100%的附近,查看任务监控页面,发现只有少量 reduce 子任务未完成,因为其处理的数据量和其他的 reduce 差异过大。 单一 redu

    2024年02月07日
    浏览(14)
  • 【flink番外篇】2、flink的23种算子window join 和interval join 数据倾斜、分区介绍及详细示例(3)- 数据倾斜处理、分区示例

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月03日
    浏览(21)
  • 万字解决Flink|Spark|Hive 数据倾斜

    此篇主要总结到Hive,Flink,Spark出现数据倾斜的表现,原因和解决办法。首先会让大家认识到不同框架或者计算引擎处理倾斜的方案。最后你会发现计算框架只是“异曲”,文末总结才是“同工之妙”。点击收藏与分享,工作和涨薪用得到!!! 数据倾斜最笼统概念就是数据的

    2024年02月03日
    浏览(14)
  • 如何处理 Flink 作业中的数据倾斜问题?

    什么是数据倾斜? 由于数据分布不均匀,造成数据大量的集中到一点,造成数据热点。 举例:一个 Flink 作业包含 200 个 Task 节点,其中有 199 个节点可以在很短的时间内完成计算。但是有一个节点执行时间远超其他结果,并且随着数据量的持续增加,导致该计算节点挂掉,从

    2024年02月10日
    浏览(18)
  • flink state原理,TTL,状态后端,数据倾斜一文全

    拿五个字做比喻:“铁锅炖大鹅”,铁锅是状态后端,大鹅是状态,Checkpoint 是炖的动作。 状态 :本质来说就是数据,在 Flink 中,其实就是 Flink 提供给用户的状态编程接口。比如 flink 中的 MapState,ValueState,ListState。 状态后端 :Flink 提供的用于管理状态的组件,状态后端决

    2024年02月22日
    浏览(17)
  • flink sql 实战实例 及延伸问题:聚合/数据倾斜/DAU/Hive流批一体 等

    ⭐ 需求:上游是一个 kafka 数据源,数据内容是用户 QQ 等级变化明细数据(time,uid,level)。需要你求出当前每个等级的用户数。 ⭐ 需求:数据源:用户心跳日志(uid,time,type)。计算分 Android,iOS 的 DAU,最晚一分钟输出一次当日零点累计到当前的结果。 经过测试 在fl

    2024年02月22日
    浏览(17)
  • 【flink番外篇】2、flink的23种算子window join 和interval join 数据倾斜、分区介绍及详细示例(1)- window join

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月03日
    浏览(22)
  • 大数据-玩转数据-Flink窗口函数

    前面指定了窗口的分配器, 接着我们需要来指定如何计算, 这事由window function来负责. 一旦窗口关闭, window function 去计算处理窗口中的每个元素. window function 可以是ReduceFunction,AggregateFunction,or ProcessWindowFunction中的任意一种. ReduceFunction,AggregateFunction更加高效, 原因就是Flink可以对

    2024年02月11日
    浏览(12)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包