flink写入到kafka 大坑解析。

这篇具有很好参考价值的文章主要介绍了flink写入到kafka 大坑解析。。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1.kafka能不能发送null消息?

   能!

flink写入到kafka 大坑解析。,flink,kafka,大数据

2 flink能不能发送null消息到kafka?

不能!

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment
            .getExecutionEnvironment();
    Properties properties = new Properties();
    properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"9.135.68.201:9092" );
    FlinkKafkaProducer<String> flinkKafkaProducer = new FlinkKafkaProducer<>( "cc_test",new SimpleStringSchema(),properties);
    env.fromCollection(Lists.newArrayList("111", "222", "333"))
        .map(s->{return s.equals("222")?null:s;})
            .addSink(flinkKafkaProducer);
    env.execute("ContractLabelJob");
}

flink写入到kafka 大坑解析。,flink,kafka,大数据 

 flink写入到kafka 大坑解析。,flink,kafka,大数据

这里就报了java的最常见错误 空指针,原因就是flink要把kafka的消息getbytes。所以flink不能发送null到kafka。

这种问题会造成什么后果?

flink直接挂掉。

如果我们采取了失败重试机制会怎样?

env.setRestartStrategy(  RestartStrategies.fixedDelayRestart(3, Time.seconds(5))  );

数据重复或者丢失。

flink写入到kafka 大坑解析。,flink,kafka,大数据

还有此时kafka的offset由flink在管理, 消费的offset 一直没有被commit,所以一直重复消费。

来个demo

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment
                .getExecutionEnvironment();
        env.enableCheckpointing(6000);
//        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"9.135.68.201:9092" );
        FlinkKafkaProducer<String> sink = new FlinkKafkaProducer<>( "cc_test2",new SimpleStringSchema(),properties);
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setTopics("cc_test")
                .setGroupId("cc_test1234")
                .setBootstrapServers("9.135.68.201:9092")
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setStartingOffsets(OffsetsInitializer.earliest())
                .build();
        DataStreamSource<String> stringDataStreamSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka");
//        stringDataStreamSource.print("kafka msg");
        stringDataStreamSource
                .addSink(sink);
        env.execute("test");

 从topic cc_test消费 然后写到cc_test2里面去

cc_test里的数据

flink写入到kafka 大坑解析。,flink,kafka,大数据

 cc_test_2里写入的数据

flink写入到kafka 大坑解析。,flink,kafka,大数据

 可以看到一个null 报错了,然后它分区的333就会一直被提交。

总之大家小心这个问题。

不加检查点 flink报错后就会直接停掉。。

加了检查点env.enableCheckpointing(6000); flink失败后会一直重试

加了重试机制 env.setRestartStrategy(RestartStrategies.failureRateRestart(3,Time.of(5000, TimeUnit.SECONDS),Time.of(5000,TimeUnit.SECONDS))); 失败的任务只会重试几次。

还是得熟悉源码呀。文章来源地址https://www.toymoban.com/news/detail-613593.html

到了这里,关于flink写入到kafka 大坑解析。的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • 【Flink-Kafka-To-Mongo】使用 Flink 实现 Kafka 数据写入 Mongo(根据对应操作类型进行增、删、改操作,写入时对时间类型字段进行单独处理)

    需求描述: 1、数据从 Kafka 写入 Mongo。 2、相关配置存放于 Mysql 中,通过 Mysql 进行动态读取。 3、此案例中的 Kafka 是进行了 Kerberos 安全认证的,如果不需要自行修改。 4、Kafka 数据为 Json 格式,获取到的数据根据操作类型字段进行增删改操作。 5、读取时使用自定义 Source,写

    2024年02月22日
    浏览(16)
  • flink日志实时采集写入Kafka/ElasticSearch

    flink日志实时采集写入Kafka/ElasticSearch

    由于公司想要基于flink的日志做实时预警功能,故需要实时接入,并刷入es进行分析。 日志接入必须异步,不能影响服务性能 kafka集群宕机,依旧能够提交flink任务且运行任务 kafka集群挂起恢复,可以依旧续写实时运行日志 在类上加上@Plugin注解,标记为自定义appender 在类加上

    2024年02月08日
    浏览(10)
  • 记一次Flink通过Kafka写入MySQL的过程

    记一次Flink通过Kafka写入MySQL的过程

    一、前言 总体思路:source --transform --sink ,即从source获取相应的数据来源,然后进行数据转换,将数据从比较乱的格式,转换成我们需要的格式,转换处理后,然后进行sink功能,也就是将数据写入的相应的数据库DB中或者写入Hive的HDFS文件存储。 思路: pom部分放到最后面。 二

    2024年01月24日
    浏览(15)
  • Flink SQL和Table API实现消费kafka写入mysql

    Flink SQL和Table API实现消费kafka写入mysql

    1、构建 table环境 2、构建source kafka 方式一:API 方式二:Flink SQL 3、构建sink mysql  4、写入将source表写入sink表 方式一:API 方式二:Flink SQL 5、手动执行 6、测试 (1)连接kafka生产者 (2)造数据 (3)mysql查看入库情况

    2024年01月16日
    浏览(15)
  • Flink写入数据到ClickHouse

    Flink写入数据到ClickHouse

    1.ClickHouse建表 ClickHouse中建表 2.ClickHouse依赖 Flink开发相关依赖 3.Bean实体类 User.java 4.ClickHouse业务写入逻辑 ClickHouseSinkFunction.java open():在SinkFunction实例化后调用,用于初始化连接或资源。这在处理每个并行任务的子任务之前只被调用一次。 invoke():定义了在每个元素到达Sink操

    2024年02月12日
    浏览(8)
  • Flink将数据写入MySQL(JDBC)

    Flink将数据写入MySQL(JDBC)

    在实际的生产环境中,我们经常会把Flink处理的数据写入MySQL、Doris等数据库中,下面以MySQL为例,使用JDBC的方式将Flink的数据实时数据写入MySQL。 2.1 版本说明 2.2 导入相关依赖 2.3 连接数据库,创建表 2.4 创建POJO类 2.5 自定义map函数 2.5 Flink2MySQL 2.6 启动necat、Flink,观察数据库写

    2024年02月07日
    浏览(8)
  • 【Flink】【ClickHouse】写入流式数据到ClickHouse

    【Flink】【ClickHouse】写入流式数据到ClickHouse

    Flink 安装的教程就不在这里赘叙了,可以看一下以前的文章,这篇文章主要是把流式数据写入的OLAP(ClickHouse)中作查询分析 Flink 1.13.2, ClickHouse 22.1.3.7 这里直接使用docker安装,没有安装的同学可以使用homebreak来安装,执行下面的命令即可( 已经安装了docker的可以忽略 ) 四指

    2024年02月03日
    浏览(12)
  • Flink将数据写入CSV文件后文件中没有数据

    Flink将数据写入CSV文件后文件中没有数据

    Flink中有一个过时的 sink 方法: writeAsCsv ,这个方法是将数据写入 CSV 文件中,有时候我们会发现程序启动后,打开文件查看没有任何数据,日志信息中也没有任何报错,这里我们结合源码分析一下这个原因. 这里先看一下数据处理的代码 代码中我是使用的自定义数据源生产数据的方式

    2024年02月16日
    浏览(8)
  • Flink之FileSink将数据写入parquet文件

    Flink之FileSink将数据写入parquet文件

    在使用FileSink将数据写入列式存储文件中时必须使用 forBulkFormat ,列式存储文件如 ORCFile 、 ParquetFile ,这里就以 ParquetFile 为例结合代码进行说明. 在Flink 1.15.3 中是通过构造 ParquetWriterFactory 然后调用 forBulkFormat 方法将构造好的 ParquetWriterFactory 传入,这里先讲一下构造 ParquetWriterF

    2024年02月03日
    浏览(11)
  • 【数据湖Hudi-10-Hudi集成Flink-读取方式&限流&写入方式&写入模式&Bucket索引】

    【数据湖Hudi-10-Hudi集成Flink-读取方式&限流&写入方式&写入模式&Bucket索引】

    当前表默认是快照读取,即读取最新的全量快照数据并一次性返回。通过参数 read.streaming.enabled 参数开启流读模式,通过 read.start-commit 参数指定起始消费位置,支持指定 earliest 从最早消费。 1.with参数 名称 Required 默认值 说明 read.streaming.enabled false false 设置 true 开启流读模式

    2024年02月14日
    浏览(15)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包