Flink DataStream之从Kafka读数据

这篇具有很好参考价值的文章主要介绍了Flink DataStream之从Kafka读数据。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

  • 搭建Kafka

参考:centos7下kafka2.12-2.1.0的安装及使用_kafka2.12-2.1.0 steam_QYHuiiQ的博客-CSDN博客

  •  启动zookeeper
[root@localhost kafka_2.12-2.8.1]# pwd
/usr/local/wyh/kafka/kafka_2.12-2.8.1
[root@localhost kafka_2.12-2.8.1]# ./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
  • 启动kafka
[root@localhost kafka_2.12-2.8.1]# ./bin/kafka-server-start.sh config/server.properties &
  • 查看进程

Flink DataStream之从Kafka读数据,大数据之Flink,kafka,flink,kafka,大数据

  •  创建topic
[root@localhost kafka_2.12-2.8.1]# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-flink-topic
  • 查看topic列表
[root@localhost kafka_2.12-2.8.1]# ./bin/kafka-topics.sh --list --zookeeper localhost:2181                                             test-flink-topic
  • 导入pom依赖
<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
</dependency>
  • 新建类
package test01;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class TestReadKafka {

    public static void main(String[] args) throws Exception {
        //创建执行环境
        StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        //从kafka读
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()//这里的泛型需要指定从kafka读数据的数据类型
                .setBootstrapServers("192.168.126.128:9092")//设置kafka server
                .setGroupId("test-consumer-group")//设置consumer groupid
                .setTopics("test-flink-topic")//设置要读取数据的topic
                .setValueOnlyDeserializer(new SimpleStringSchema())//从Kafka读数据时需要进行反序列化,由于kafka的数据一般是存在value中的,不是key中,所以这里我们使用的序列化器是只对Value进行反序列化。这里的参数是用的String类型的反序列化器,因为在前面build时我们设置了要读取的数据类型是String类型。
                .setStartingOffsets(OffsetsInitializer.latest())//设置flink读取kafka数据的读取策略,这里设置的是从最新数据消费
                .build();

        streamExecutionEnvironment.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkasource")
                        .print();
        streamExecutionEnvironment.execute();
    }
}
  • 启动程序
  • 在终端向kafka生产数据,同时观察程序控制台flink的读取情况
[root@localhost kafka_2.12-2.8.1]# ./bin/kafka-console-producer.sh --broker-list 192.168.126.128:9092 --topic test-flink-topic

Flink DataStream之从Kafka读数据,大数据之Flink,kafka,flink,kafka,大数据

 如图说明flink从kafka成功读取数据。文章来源地址https://www.toymoban.com/news/detail-536761.html

到了这里,关于Flink DataStream之从Kafka读数据的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

原文地址:https://blog.csdn.net/QYHuiiQ/article/details/131542898

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包