Flink DataStream之从Kafka读数据

这篇具有很好参考价值的文章主要介绍了Flink DataStream之从Kafka读数据。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

  • 搭建Kafka

参考:centos7下kafka2.12-2.1.0的安装及使用_kafka2.12-2.1.0 steam_QYHuiiQ的博客-CSDN博客

  •  启动zookeeper
[root@localhost kafka_2.12-2.8.1]# pwd
/usr/local/wyh/kafka/kafka_2.12-2.8.1
[root@localhost kafka_2.12-2.8.1]# ./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
  • 启动kafka
[root@localhost kafka_2.12-2.8.1]# ./bin/kafka-server-start.sh config/server.properties &
  • 查看进程

Flink DataStream之从Kafka读数据,大数据之Flink,kafka,flink,kafka,大数据

  •  创建topic
[root@localhost kafka_2.12-2.8.1]# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-flink-topic
  • 查看topic列表
[root@localhost kafka_2.12-2.8.1]# ./bin/kafka-topics.sh --list --zookeeper localhost:2181                                             test-flink-topic
  • 导入pom依赖
<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
</dependency>
  • 新建类
package test01;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class TestReadKafka {

    public static void main(String[] args) throws Exception {
        //创建执行环境
        StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        //从kafka读
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()//这里的泛型需要指定从kafka读数据的数据类型
                .setBootstrapServers("192.168.126.128:9092")//设置kafka server
                .setGroupId("test-consumer-group")//设置consumer groupid
                .setTopics("test-flink-topic")//设置要读取数据的topic
                .setValueOnlyDeserializer(new SimpleStringSchema())//从Kafka读数据时需要进行反序列化,由于kafka的数据一般是存在value中的,不是key中,所以这里我们使用的序列化器是只对Value进行反序列化。这里的参数是用的String类型的反序列化器,因为在前面build时我们设置了要读取的数据类型是String类型。
                .setStartingOffsets(OffsetsInitializer.latest())//设置flink读取kafka数据的读取策略,这里设置的是从最新数据消费
                .build();

        streamExecutionEnvironment.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkasource")
                        .print();
        streamExecutionEnvironment.execute();
    }
}
  • 启动程序
  • 在终端向kafka生产数据,同时观察程序控制台flink的读取情况
[root@localhost kafka_2.12-2.8.1]# ./bin/kafka-console-producer.sh --broker-list 192.168.126.128:9092 --topic test-flink-topic

Flink DataStream之从Kafka读数据,大数据之Flink,kafka,flink,kafka,大数据

 如图说明flink从kafka成功读取数据。文章来源地址https://www.toymoban.com/news/detail-536761.html

到了这里,关于Flink DataStream之从Kafka读数据的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 轻松通关Flink第24讲:Flink 消费 Kafka 数据业务开发

    在上一课时中我们提过在实时计算的场景下,绝大多数的数据源都是消息系统,而 Kafka 从众多的消息中间件中脱颖而出,主要是因为 高吞吐 、 低延迟 的特点;同时也讲了 Flink 作为生产者像 Kafka 写入数据的方式和代码实现。这一课时我们将从以下几个方面介绍 Flink 消费

    2024年02月08日
    浏览(1)
  • 【FLINK】Kafka数据源通过Flink-cdc进行实时数据同步

    CDC是Change Data Capture的缩写,中文意思是 变更数据获取 ,flink-cdc的作用是,通过flink捕获数据源的事务变动操作记录,包括数据的增删改操作等,根据这些记录可作用于对目标端进行实时数据同步。 下图是flink-cdc最新支持的数据源类型: kafka的数据源要通过flink-cdc进行实时数

    2024年02月12日
    浏览(2)
  • 使用Flink处理Kafka中的数据

    目录         使用Flink处理Kafka中的数据 前提:  一, 使用Flink消费Kafka中ProduceRecord主题的数据 具体代码为(scala) 执行结果 二, 使用Flink消费Kafka中ChangeRecord主题的数据           具体代码(scala)                 具体执行代码①                 重要逻

    2024年01月23日
    浏览(1)
  • 大数据-玩转数据-FLINK-从kafka消费数据

    大数据-玩转数据-Kafka安装 运行本段代码,等待kafka产生数据进行消费。

    2024年02月14日
    浏览(2)
  • Idea本地跑flink任务时,总是重复消费kafka的数据(kafka->mysql)

    1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 Idea中执行任务时,没法看到JobManager的错误,以至于我以为是什么特殊的原因导致任务总是反复消费。在close方法中,增加日志,发现jdbc连接被关闭了。 重新消费,jdbc连接又启动了。 注意,在Flink的函数中,open和close方法

    2024年02月07日
    浏览(1)
  • 使用Flink实现Kafka到MySQL的数据流转换:一个基于Flink的实践指南

    在现代数据处理架构中,Kafka和MySQL是两种非常流行的技术。Kafka作为一个高吞吐量的分布式消息系统,常用于构建实时数据流管道。而MySQL则是广泛使用的关系型数据库,适用于存储和查询数据。在某些场景下,我们需要将Kafka中的数据实时地写入到MySQL数据库中,本文将介绍

    2024年04月15日
    浏览(1)
  • 流批一体计算引擎-4-[Flink]消费kafka实时数据

    Python3.6.9 Flink 1.15.2消费Kafaka Topic PyFlink基础应用之kafka 通过PyFlink作业处理Kafka数据 PyFlink需要特定的Python版本,Python 3.6, 3.7, 3.8 or 3.9。 1.3.1 python3和pip3的配置 一、系统中安装了多个版本的python3 。 二、环境变量path作用顺序 三、安装Pyflink 1.3.2 配置Flink Kafka连接 (1)在https://mvnr

    2024年02月06日
    浏览(1)
  • flink如何初始化kafka数据源的消费偏移

    我们知道在日常非flink场景中消费kafka主题时,我们只要指定了消费者组,下次程序重新消费时是可以从上次消费停止时的消费偏移开始继续消费的,这得益于kafka的_offset_主题保存的关于消费者组和topic偏移位置的具体偏移信息,那么flink应用中重启flink应用时,flink是从topic的什

    2024年02月16日
    浏览(1)
  • flink执行环境和读取kafka以及自定义数据源操作

    目录 创建执行环境 1. getExecutionEnvironment 2. createLocalEnvironment 3. createRemoteEnvironment  执行模式(Execution Mode) 1. BATCH 模式的配置方法 2. 什么时候选择 BATCH 模式 触发程序执行 数据源操作 读取kafka数据源操作  自定义Source           编 写 Flink 程 序 的 第 一 步 , 就 是 创 建 执

    2023年04月10日
    浏览(1)
  • 构建高效实时数据流水线:Flink、Kafka 和 CnosDB 的完美组合

    当今的数据技术生态系统中,实时数据处理已经成为许多企业不可或缺的一部分。为了满足这种需求,Apache Flink、Apache Kafka和CnosDB等开源工具的结合应运而生,使得实时数据流的收集、处理和存储变得更加高效和可靠。本篇文章将介绍如何使用 Flink、Kafka 和 CnosDB 来构建一个

    2024年02月10日
    浏览(1)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包