Flink的简单使用

这篇具有很好参考价值的文章主要介绍了Flink的简单使用。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

概要

Flink的适用场景以及如何使用

什么是FLink

一句话总结,Flink就是一个分布式,高可用,高性能的流处理框架。

主要构造

  • checkpoint:基于chandy-lamport算法实现分布式计算任务的一致性语义;

  • state:flink中的状态机制,flink天生支持state,state可以认为程序的中间计算结果或者是历史计算结果;

  • time:flink中支持基于事件时间和处理时间进行计算,spark streaming只能按照process time进行处理;基于事件时间的计算我们可以解决数据迟到和乱序等问题。

  • window:flink提供了更多丰富的window,基于时间,基于数量,session window,同样支持滚动和滑动窗口的计算。

    这个看起来没那么浅显易懂,用个很容易理解的栗子来说,Flink类似于一个多线程环境,每个的处理都是独立的,在每个处理里面他包含有自己的一系列机制,可以存储一些执行状态,定时任务,窗口计算等功能。
    理论结合实际来打个比方,你现在有一个水池,连接了很多根水管,当水从水池流向水管的时候,水管里面的水就是相对独立的一个执行环境了,至于是这个水是被使用,还是被浪费,对于其他的水管没有任何影响。每个水管都有自己独立的开关,你可以控制他什么时候开什么时候关。但是需要注意的一点是,每根管子的构造是一模一样的,所以他们的流程是一样的,只是会因为状态的不一样而处理方式不同

何时应用,如何应用

当你的数据量很大,比如有些数据推送,每几秒都会给你推送,然后还分了很多个渠道给你推送,你对这些消息中的某些值比较关心,有些还需要做延迟处理的,使用flink那真是赶上巧了。flink的高性能处理,可以很好的满足你数据量大的问题,其中的state,timer等也会对你的每一条消息负责。

springboot使用flink还算比较简单
引入依赖

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.13.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.13.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.13.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_2.12</artifactId>
            <version>1.13.2</version>
        </dependency>

添加配置

@Component
public class SignalInit {

    private static String topicNameSource;

    private static String topicGroupSource;

    private static String mqAddressSource;

    @Value("${custom-rocketmq-source.attribute-id-topic-name}")
    public void setHeatTopicSource(String heatTopicName) {
        billetTopicNameSource = heatTopicName;
    }

    @Value("${rocketmq.name-server}")
    public void setMqAddressSource(String mqAddressSourceAndPort) {
        mqAddressSource = mqAddressSourceAndPort;
    }

    @Value("${custom-rocketmq-source.attribute-ids-topic-group}")
    public void setHeatTopicGroupSource(String heatTopicGroup) {
        heatTopicGroupSource = heatTopicGroup;
    }

    public static void init() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties consumerProps = getConsumerProps();
        //获取配置信息
        SimpleTupleDeserializationSchema schema = new SimpleTupleDeserializationSchema();
        DataStream<Tuple2<String, String>> source =
                env.addSource(new RocketMQSource<>(schema, consumerProps));
        //获取mq发过来的数据转成IotEvent对象
        DataStream<IotEvent> attributeEventStreamSource =
                source.flatMap(new FilterCommonTuple2ToIotEventFlatMap());
        // 分组处理
        attributeEventStreamSource
                .keyBy(IotEvent::getPushId)
                .process(new DealSignalKeyedProcess());
        try {
            env.execute("SignalInit");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    private static Properties getConsumerProps() {
        Properties consumerProps = new Properties();
        consumerProps.setProperty(
                RocketMQConstant.NAME_SERVER_ADDR,
                mqAddressSource);
        consumerProps.setProperty(RocketMQConstant.CONSUMER_GROUP, heatTopicGroupSource);
        consumerProps.setProperty(RocketMQConstant.CONSUMER_TOPIC, billetTopicNameSource);
        return consumerProps;
    }

上面代码是Flink配合RokectMq对mq发送过来的数据做的一个处理,通过获取mq发送的消息转换成一个Event对象(这个Event对象自己根据mq发送过来的消息实体新建),最后生成一个Flink的流处理(new DealSignalKeyedProcess())开始执行,在DealSignalKeyedProcess会对你发过来的信号进行具体的处理。
如下是DealSignalKeyedProcess方法中大概需要实现的东西

@Slf4j
public class DealSignalKeyedProcess extends KeyedProcessFunction<String, IotEvent, String> {
	ApplicationContext context;
	//这里就使用到了Flink的状态机制,里面存放是数据可以是基本数据类型,也可以是一个对象,其大致的使用方式为 f赋值:	  timeTsState.update(1.0),获取值timeTsState。value()
	ValueState<Long> timeTsState;
	ValueState<IotEvent> iotEventStatus;
	public DealSignalKeyedProcess() {
    }

/**
 * open中将会对前面定义的属性进行一个简单的初始化
 */
	@Override
    public void open(Configuration parameters) throws Exception {
        context = SpringUtil.getApplicationContext();
        timeTsState = getRuntimeContext().getState(new ValueStateDescriptor<>("times-ts", Long.class));
        iotEventStatus = getRuntimeContext().getState(new ValueStateDescriptor<>("iotEvent", IotEvent.clas
    }

/**
 * processElement为业务的主要执行方法,在这个方法中对接收到的Event来进行处理
 * 其中可以声明一个定时器,在多少时间后执行,然后销毁定时器,下方将有一个简单的举例
 */
    @Override
    public void processElement(IotEvent value, Context ctx, Collector<String> out) {
		iotEventStatus.update(value);
		//....
		if (timeTsState.value() == null) {
            Long timestamp = sourceMsgTime + (timesUnitCode == 1 ? durationTime : durationTime * 60) * 1000L;
            //创建持续时长后的定时器
            ctx.timerService().registerProcessingTimeTimer(timestamp);
            timeTsState.update(timestamp);
        }
    }

    @Override
    public void onTimer(long timestamp, KeyedProcessFunction<String, IotEvent, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
        //这里为达到该定时时间时的具体业务实现...
    }
}

小结

很多细节的东西网上都有讲,比如flink的架构体系,流程图等,很多东西其实就在于一个了解,知道在什么时候用什么方法能够提升效率,有一个大概的技术框架感觉就能很轻松的干活了。纯个人想法,不喜勿喷。文章来源地址https://www.toymoban.com/news/detail-804720.html

到了这里,关于Flink的简单使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • 远程方法调用中间件Dubbo在spring项目中的使用

    作者: 逍遥Sean 简介:一个主修Java的Web网站游戏服务器后端开发者 主页:https://blog.csdn.net/Ureliable 觉得博主文章不错的话,可以三连支持一下~ 如有需要我的支持,请私信或评论留言! Dubbo是一个高性能分布式服务的Java RPC框架,它可以可以帮助实现不同应用之间的远程调用

    2024年02月10日
    浏览(14)
  • Spring Cloud Alibaba 最新版本(基于Spring Boot 3.1.0)整合完整使用及与各中间件集成

    Spring Cloud Alibaba 最新版本(基于Spring Boot 3.1.0)整合完整使用及与各中间件集成

    目录 前言 源码地址 官方中文文档 使用版本 spring Spring Boot 3.1.0 中间件 使用到的组件与功能 环境安装 虚拟机 nexus nacos 集成过程 工程搭建 父工程搭建 子工程 服务集成 nacos集成 配置文件 服务注册与发现-discovery 服务注册 启动 服务发现 测试 配置管理-config 新增配置  测试

    2024年02月07日
    浏览(17)
  • 基于 Docker 的 Spring Boot 项目部署演示,其中使用了 Redis、MySQL 和 RabbitMQ 中间件

    这是一个基于 Docker 的 Spring Boot 项目部署演示,其中使用了 Redis、MySQL 和 RabbitMQ 中间件。 拉取 MySQL 镜像: 创建 MySQL 容器: 将 密码 、 数据库名 、 用户名 和 密码 替换为您自己的值。 拉取 Redis 镜像: 创建 Redis 容器: 拉取 RabbitMQ 镜像: 创建 RabbitMQ 容器: 构建和运行

    2024年02月06日
    浏览(13)
  • redux中间件的简单讲解

    中间件的作用: 就是在 源数据 到 目标数据 中间做各种处理,有利于程序的可拓展性,通常情况下,一个中间件就是一个函数,且一个中间件最好只做一件事情 数据源 -------- 中间件 -------- 中间件 -------- 中间件 -------- 目标数据 applyMiddleware applymiddleware将一堆函数封装成一个

    2024年02月07日
    浏览(7)
  • 网络安全基础知识&中间件简单介绍

    网络安全基础知识&中间件简单介绍

    apache-httpd tomcat iis lighttp nginx:不是用来web服务器,而是用来做反向代理(tps10w,优化tqs2020w) fastdf:FastDFS 是一个开源的高性能分布式文件系统(DFS)。 它的主要功能包括:文件存储,文件同步和文件访问,以及高容量和负载平衡。主要解决了海量数据存储问题,特别适合以

    2023年04月16日
    浏览(34)
  • 数据库访问中间件--springdata-jpa的基本使用

    数据库访问中间件--springdata-jpa的基本使用

    回顾 示例 JPQL 片段 And findByLastnameAndFirstname … where x.lastname = ?1 and x.firstname = ?2 Or findByLastnameOrFirstname … where x.lastname = ?1 or x.firstname = ?2 Is,Equals findByFirstnameIs,findByFirstnameEquals … where x.firstname = ?1 Between findByStartDateBetween … where x.startDate between ?1 and ?2 LessThan findByAgeLessT

    2024年02月14日
    浏览(12)
  • 【Spring Cloud】服务容错中间件Sentinel入门

    【Spring Cloud】服务容错中间件Sentinel入门

    欢迎来到阿Q社区 https://bbs.csdn.net/topics/617897123 Sentinel(分布式系统的流量防卫兵)是阿里开源的一套用于 服务容错 的综合性解决方案。它以流量为切入点,从 流量控制、熔断降级、系统负载保护 等多个维度来保护服务的稳定性。 Sentinel 具有以下特征: 丰富的应用场景 :

    2024年04月15日
    浏览(17)
  • Spring Cloud Alibaba 最新版本(基于Spring Boot 3.1.0)整合完整使用及与各中间件集成
Sleuth+Zipkin集成分布式链路追踪

    Spring Cloud Alibaba 最新版本(基于Spring Boot 3.1.0)整合完整使用及与各中间件集成 Sleuth+Zipkin集成分布式链路追踪

    目录 前言 源码地址 官方中文文档 使用版本 spring Spring Boot 3.1.0 中间件 使用到的组件与功能 环境安装 虚拟机 nexus nacos 集成过程 工程搭建 父工程搭建 子工程 服务集成 nacos集成 配置文件 服务注册与发现-discovery 服务注册 启动 服务发现 测试 配置管理-config 新增配置  测试

    2024年02月12日
    浏览(43)
  • 【Spring Cloud】服务容错中间件Sentinel进阶——五大规则

    【Spring Cloud】服务容错中间件Sentinel进阶——五大规则

    我们在上一篇文章中对 Sentinel 已经有了基本的了解,接下来,我们一起对它的进阶进行学习吧! 资源 所谓资源就是 Sentinel 要保护的东西,资源是 Sentinel 的关键概念。它可以是Java应用程序中的任何内容,可以是一个服务,也可以是一个方法,甚至可以是一段代码。 我们入门

    2024年04月26日
    浏览(11)
  • Spring Cloud 项目中实现推送消息到 RabbitMQ 消息中间件

    Spring Cloud 项目中实现推送消息到 RabbitMQ 消息中间件

    (注:安装在虚拟机则填虚拟机地址,否则则为本机地址) 用户名和密码都为guest 看到如下页面则为RabbitMQ安装登录成功。 三、依赖注入 导入依赖坐标 四、配置yaml文件 配置yaml配置文件 (注:host为地址,如果安装在虚拟机则为虚拟机地址,安装在本机则本机地址。port为端

    2024年04月13日
    浏览(16)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包