Flink算子简单测试样例

这篇具有很好参考价值的文章主要介绍了Flink算子简单测试样例。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Flink算子简单测试样例文章来源地址https://www.toymoban.com/news/detail-793230.html

1. 创建执行环境
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

2. 创建数据流
        // 创建数据流
        DataStream<String> source = env.addSource(new DataGeneratorSource<>(new DataGenerator<String>() {
            final int CNT = 10000; // 模拟一万条数
            int i = 0;

            @Override
            public void open(String s, FunctionInitializationContext functionInitializationContext, RuntimeContext runtimeContext) throws Exception {}

            @Override
            public boolean hasNext() {
                return i < CNT;
            }

            @Override
            public String next() {
                i++;
                try {
                    Thread.sleep(new Random().nextInt(2000)); // 随机发生时间
                } catch (InterruptedException e) {
                }
                return "" + i;
            }
        })).returns(String.class).uid("source").name("source");

3. 数据补充
        // 数据补充-添加时间戳,增加金额
        SingleOutputStreamOperator<Map<String, String>> mapOperator = source.map((MapFunction<String, Map<String, String>>) s -> {
            HashMap<String, String> hashMap = new HashMap<>();
            hashMap.put("userid", s);
            hashMap.put("amt", new Random().nextInt(100) + "");
            hashMap.put("time", System.currentTimeMillis() + "");
            return hashMap;
        }).returns(TypeInformation.of(new TypeHint<Map<String, String>>() {
        })).uid("mapOperator").name("mapOperator");

4. 数据过滤
        // 数据过滤-只取时间戳为偶数的数据
        SingleOutputStreamOperator<Map<String, String>> filterOperator = mapOperator.filter((FilterFunction<Map<String, String>>) data -> {
//                System.out.println("从mapOperator接到数据:" + data);
            long time = Long.parseLong(data.get("time"));
            return time % 2 == 0;
        }).returns(TypeInformation.of(new TypeHint<Map<String, String>>() {
        })).uid("filterOperator").name("filterOperator");

5. 数据放大
        // 数据放大-时间戳是4的倍数,双倍奖励,8的倍数,三倍奖励
        SingleOutputStreamOperator<Map<String, String>> flatMapOperator = filterOperator.flatMap((FlatMapFunction<Map<String, String>, Map<String, String>>) (data, collector) -> {
            collector.collect(data);
            if (Long.parseLong(data.get("time")) % 4 == 0) {
                collector.collect(data);
            }
            if (Long.parseLong(data.get("time")) % 8 == 0) {
                collector.collect(data);
            }
        }).returns(TypeInformation.of(new TypeHint<Map<String, String>>() {
        })).uid("flatMapOperator").name("flatMapOperator");

6. 数据输出
        // 数据输出
        flatMapOperator.print();

        // 执行程序
        env.execute("FlinkTest");

7. 执行结果
{amt=45, time=1705048891056, userid=4}
{amt=45, time=1705048891056, userid=4}
{amt=45, time=1705048891056, userid=4}
{amt=56, time=1705048894374, userid=6}
{amt=96, time=1705048899462, userid=10}
{amt=65, time=1705048901638, userid=12}
{amt=33, time=1705048902544, userid=13}
{amt=33, time=1705048902544, userid=13}
{amt=33, time=1705048902544, userid=13}
{amt=10, time=1705048903748, userid=14}
{amt=10, time=1705048903748, userid=14}
...

Process finished with exit code 0

到了这里,关于Flink算子简单测试样例的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • 【flink番外篇】2、flink的23种算子window join 和interval join 数据倾斜、分区介绍及详细示例(3)- 数据倾斜处理、分区示例

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月03日
    浏览(17)
  • 二次开发Flink-coGroup算子支持迟到数据通过测输出流提取

    二次开发Flink-coGroup算子支持迟到数据通过测输出流提取

    目录 1.背景 2.coGroup算子源码分析 2.1完整的coGroup算子调用流程 2.2coGroup方法入口 2.3 CoGroupedStreams对象分析 2.4WithWindow内部类分析 2.5CoGroupWindowFunction函数分析 3.修改源码支持获取迟到数据测输出流 3.1复制CoGroupedStreams 3.2新增WithWindow.sideOutputLateData方法 3.3新增WithWindow构造方法 3

    2024年04月11日
    浏览(23)
  • 【flink番外篇】2、flink的23种算子window join 和interval join 数据倾斜、分区介绍及详细示例(1)- window join

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月03日
    浏览(11)
  • 【Flink-1.17-教程】-【四】Flink DataStream API(2)转换算子(Transformation)【基本转换算子、聚合算子】

    【Flink-1.17-教程】-【四】Flink DataStream API(2)转换算子(Transformation)【基本转换算子、聚合算子】

    数据源读入数据之后,我们就可以使用各种转换算子,将一个或多个 DataStream 转换为新的 DataStream。 map 是大家非常熟悉的大数据操作算子,主要用于将数据流中的数据进行转换,形成新的数据流。简单来说,就是一个 “一 一映射”,消费一个元素就产出一个元素 。 我们只

    2024年01月23日
    浏览(12)
  • Flink源算子、转换算子和输出算子(DataSet)

    Flink源算子、转换算子和输出算子(DataSet)

    Flink是一种一站式处理的框架,既可以进行批处理(DataSet),也可以进行流处理(DataStream) 将Flink的算子分为两大类:DataSet 和 DataStream 1.1 fromCollection 从本地集合读取数据 1.2 readTextFile 从文件中读取 1.3 readTextFile 遍历目录 对一个文件目录内的所有文件,包括所有子目录中的

    2024年04月23日
    浏览(12)
  • [flink 实时流基础]源算子和转换算子

    [flink 实时流基础]源算子和转换算子

    Flink可以从各种来源获取数据,然后构建DataStream进行转换处理。一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator)。所以,source就是我们整个处理程序的输入端。 在Flink1.12以前,旧的添加source的方式,是调用执行环境的addSource()方

    2024年04月11日
    浏览(11)
  • Flink|《Flink 官方文档 - DataStream API - 算子 - 窗口》学习笔记

    Flink|《Flink 官方文档 - DataStream API - 算子 - 窗口》学习笔记

    学习文档:《Flink 官方文档 - DataStream API - 算子 - 窗口》 学习笔记如下: 窗口(Window):窗口是处理无界流的关键所在。窗口可以将数据流装入大小有限的 “桶” 中,再对每个 “桶” 加以处理。 Keyed Windows 在 Keyed Windows 上使用窗口时,要调用 keyBy(...) 而后再调用 window(..

    2024年01月18日
    浏览(31)
  • Flink - sink算子

    Flink - sink算子

    水善利万物而不争,处众人之所恶,故几于道💦   1. Kafka_Sink   2. Kafka_Sink - 自定义序列化器   3. Redis_Sink_String   4. Redis_Sink_list   5. Redis_Sink_set   6. Redis_Sink_hash   7. 有界流数据写入到ES   8. 无界流数据写入到ES   9. 自定义sink - mysql_Sink   10. Jdbc_Sink 官方

    2024年02月14日
    浏览(12)
  • Flink之窗口聚合算子

    Flink之窗口聚合算子

    1.窗口聚合算子 在Flink中窗口聚合算子主要分类两类 滚动聚合算子(增量聚合) 全窗口聚合算子(全量聚合) 1.1 滚动聚合算子 滚动聚合算子一次只处理一条数据,通过算子中的累加器对聚合结果进行更新,当窗口触发时再从累加器中取结果数据,一般使用算子如下: aggregate max maxBy

    2024年02月07日
    浏览(14)
  • Flink - souce算子

    Flink - souce算子

    水善利万物而不争,处众人之所恶,故几于道💦   1. 从Java的集合中读取数据   2. 从本地文件中读取数据   3. 从HDFS中读取数据   4. 从Socket中读取数据   5. 从Kafka中读取数据   6. 自定义Source 官方文档 - Flink1.13 fromCollection(waterSensors) 运行结果: readTextFile(“inpu

    2024年02月14日
    浏览(6)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包