flink 的 State

这篇具有很好参考价值的文章主要介绍了flink 的 State。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

一、前言

二、什么是State

2.1:什么时候需要历史数据

2.2:为什么要容错,以及checkpoint如何进行容错

2.3:state basckend 又是什么

三、有哪些常见的是 State

四、 State的使用

五、State backend

5.1  MemoryStateBackend:

5.2  FsStatebackend:

5.3  RocksDBStateBackend:

六、Checkpoint

七、 Deep

7.1 Checkpoint Barries


 

一、前言

首先State是flink中的一个非常基本且重要的概念,本文将介绍什么是State ,如何使用State,

State的存储和原理。以及State衍生的一些概念和应用。

二、什么是State

一种为了满足算子计算时需要历史数据需求的,使用checkpoint机制进行容错,存储在state backend 的数据结构。

首先state 其实就是一种数据结构。然后上面定义中隐含了三个基本知识点:

2.1:什么时候需要历史数据

    去重:在流处理系统中,上游的系统数据可能会有重复,落到下游是希望把重复的数据去掉,此时就需要记录历史的数据。

    窗口计算:在触发窗口计算函数前,需要将窗口中手机的数据保存起来,等到触发时进行计算。

    机器学习/深度学习:如训练的模型以及当前模型的参数也是一种状态,机器学习可以每次都用一个数据集,需要在数据集上进行学习,对模型进行一个反馈。    

2.2:为什么要容错,以及checkpoint如何进行容错

2.3:state basckend 又是什么

三、有哪些常见的是 State

最常见的是Keyed State 应用于keyedStreamh上,必须在KeyBy操作之后使用。它的特点是 同一个sub task 上的同一个 key 共享一个 state 。 另外还有 operator state ,顾名思义每一个operator state 都只有一个operation 的实列绑定。常见的 operation state 是 source state ,列如记录当前source 的 offset 。它的特点是  同一个 sub task 共享一个  state 。另外还有一种特殊的 operation state 称为 broadcast state , 它的特点是 同一个算子的多个 sub task 共享一个 state   。

四、 State的使用

这里以常用的 Keyed State 进行举例。前面说了 State 本质上就是一种用来存储数据的数据结构,那么作为 Keyed State,都支持哪些数据结构呢?以下列举了常见的几种数据结构:

ValueState 存储单个值,比如 Wordcount,用 Word 当 Key,State 就是它的 Count。这里面的单个值可能是数值或者字符串,作为单个值,访问接口可能有两种,get 和 set。在 State 上体现的是 update(T) / T value()。

MapState 的状态数据类型是 Map,在 State 上有 put、remove等。需要注意的是在 MapState 中的 key 和 Keyed state 中的 key 不是同一个。

ListState 状态数据类型是 List,访问接口如 add、update 等

flink官网的State  :

Working with State | Apache FlinkWorking with State # In this section you will learn about the APIs that Flink provides for writing stateful programs. Please take a look at Stateful Stream Processing to learn about the concepts behind stateful stream processing.Keyed DataStream # If you want to use keyed state, you first need to specify a key on a DataStream that should be used to partition the state (and also the records in the stream themselves).https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/

class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] {

  private var sum: ValueState[(Long, Long)] = _
  
  /** 
  也可以使用 lazy 的方式对 state 进行初始化
  lazy private val sum = getRuntimeContext.getState(
      new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
    ) 
  **/

  override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {

    // access the state value
    val tmpCurrentSum = sum.value

    // If it hasn't been used before, it will be null
    val currentSum = if (tmpCurrentSum != null) {
      tmpCurrentSum
    } else {
      (0L, 0L)
    }

    // update the count
    val newSum = (currentSum._1 + 1, currentSum._2 + input._2)

    // update the state
    sum.update(newSum)

    // if the count reaches 2, emit the average and clear the state
    if (newSum._1 >= 2) {
      out.collect((input._1, newSum._2 / newSum._1))
      sum.clear()
    }
  }

  override def open(parameters: Configuration): Unit = {
    sum = getRuntimeContext.getState(
      new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
    )
  }
}


object ExampleCountWindowAverage extends App {
  val env = StreamExecutionEnvironment.getExecutionEnvironment

  env.fromCollection(List(
    (1L, 3L),
    (1L, 5L),
    (1L, 7L),
    (1L, 4L),
    (1L, 2L)
  )).keyBy(_._1)
    .flatMap(new CountWindowAverage())
    .print()
  // the printed output will be (1,4) and (1,5)

  env.execute("ExampleKeyedState")
}

以上代码的功能是对输入的流数据进行平均数计算,当输入的数据大于等于 2 个时,触发计算。这里有几点需要注意:

  • 因为 state 的初始化需要用到运行时上下文,所以定义的类需要继承 RichXXFunction
  • state 有两种初始化方式,一种是在成员变量初定义并在 open 函数中初始化。另一种是直接在成员变量处通过 lazy 的方式进行定义和初始化。
  • 这里的例子中使用的是 ValueState,他的 get 和 put 方法分别是 .value().update()
  • state 除了需要我们自己维护状态更新,状态的删除也需要在合适的时间点通过调用 clear
    方法实现。

使用 state 除了继承 RichXXFunction 外还可以直接使用系统提供的函数。如 keyBy 之后直接使用 flatMapWithState

五、State backend

前面介绍了 State 的类型和常见的数据结构,那么这些 state 存储的介质有哪些呢? Flink 提供了三种存储 State 的介质:

5.1  MemoryStateBackend:

  • 构造方法: MemoryStateBackend( int maxStateSize, boolean asynchronousSnapshots )
  • 存储方式:
    • State: TaskManager 内存
    • Checkpoint: Jobmanager 内存
  • 使用场景:本地测试用,不推荐生产场景使用

5.2  FsStatebackend:

  • 构造方法: FaStateBackend( URI checkpointDataUri, boolean asynchronousSnapshots )

  • 存储方式:

    • State:Taskmanager 内存
    • Checkpoint: 外部文件系统( 本地或 HDFS )
  • 使用场景:常规使用 State 的作业,可以在生产中使用

5.3  RocksDBStateBackend:

  • 构造方法:RocksDBStateBackend( URI checkpointDataUri, boolean enableIncrementalCheckpointing )
  • 存储方式:
    • State: TaskManager 上的 KV 数据库(实际使用内存 + 磁盘)

    • Checkpoint: 外部文件系统(本地或 HDFS )

  • 使用场景:超大状态作业,对性能要求不高的生产场景

六、Checkpoint

前面对 State 的使用中没有考虑容错的问题,当集群出现故障时进行恢复时,State 的值肯定不会从头开始计算,这就需要进行容错。State 使用 Checkpoint 机制进行容错。简单来说就是定时制作分布式快照,当出现故障需要进行恢复时,将所有 Task 恢复到最近一次成功的 Checkpoint 状态中,然后从那个点开始继续处理。Checkpoint 通过 Barries 对齐机制保证了恰好一次的一致性语义,关于 Barries 的原理后面将进行详细说明。

七、 Deep

7.1 Checkpoint Barries

checkpoint 是 jobmanager 从 source 触发到下游所有节点完成的一次全局操作。checkpoint barriers 和 watermark 类似,都是一种特殊的事件。对某一 subtask 而言,checkpoint 表示所有 subtask 恰好处理完(不能多处理,也不能少处理。为了状态恢复时保持一致性)某个相同数据。watermark 表示这之前的数据已经接收完毕。
watermark在多 subtask 上游向下游传递时,是广播 + 取上游最小 watermark 作为当前 task 的watermark,不取最小 watermark 会丢数据。
checkpoint barriers 在多 subtask 上游向下游传递时,是广播 + checkpoint barriers 对齐(alignment)。所谓对齐就是下游 subtask 会等待他上游所有分区的 subtask 的 checkpoint barriers 都到达才进行 checkpoint。上游已经到达 checkpoint barriers 的 substask 后续数据会缓存,没有到达 checkpoint barriers 的 subtask 数据会继续处理直到 checkpoint barriers 到达。

flink 的 State

 flink 的 State

flink 的 State

以 even 流的 Sum 算子为例,从图中可以看到先接收到 Source1 的 barrier 后接收到 Source2 的 barrier( 分别对应蓝色和黄色的三角 )。所以在接收到 Source1 的 barrier 后,对后面值为 4 的蓝流数据进行了缓存没有进行下一步计算,因为这个数据属于下一个 checkpoint。而在接收到 Source2 的 barrier 之前,对值为 4 的黄流数据照常进行计算直到接收到 Source2 的 barrier 为止。这就是所谓的 barriers alignment。

flink 的 State

flink 的 State

 flink 的 State文章来源地址https://www.toymoban.com/news/detail-416952.html

到了这里,关于flink 的 State的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • 【flink番外篇】7、flink的State(Keyed State和operator state)介绍及示例(1) - Keyed State

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年01月17日
    浏览(12)
  • 【flink番外篇】7、flink的State(Keyed State和operator state)介绍及示例(2) - operator state

    【flink番外篇】7、flink的State(Keyed State和operator state)介绍及示例(2) - operator state

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年01月22日
    浏览(13)
  • 【flink番外篇】7、flink的State(Keyed State和operator state)介绍及示例 - 完整版

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月03日
    浏览(14)
  • Flink 学习七 Flink 状态(flink state)

    Flink 学习七 Flink 状态(flink state)

    流式计算逻辑中,比如sum,max; 需要记录和后面计算使用到一些历史的累计数据, 状态就是 :用户在程序逻辑中用于记录信息的变量 在Flink 中 ,状态state 不仅仅是要记录状态;在程序运行中如果失败,是需要重新恢复,所以这个状态也是需要持久化;一遍后续程序继续运行 1.1 row state 我

    2024年02月09日
    浏览(15)
  • Flink State 状态管理

    状态在Flink中叫做State,用来保存中间计算结果或者缓存数据。要做到比较好的状态管理,需要考虑以下几点内容: 状态数据的存储和访问 在Task内部,如何高效地保存状态数据和使用状态数据。 状态数据的备份和恢复 作业失败是无法避免的,那么就要考虑如何高效地将状态

    2024年01月17日
    浏览(13)
  • flink 的 State

    flink 的 State

    目录 一、前言 二、什么是State 2.1:什么时候需要历史数据 2.2:为什么要容错,以及checkpoint如何进行容错 2.3:state basckend 又是什么 三、有哪些常见的是 State 四、 State的使用 五、State backend 5.1  MemoryStateBackend: 5.2  FsStatebackend: 5.3  RocksDBStateBackend: 六、Checkpoint 七、 Deep

    2023年04月18日
    浏览(7)
  • flink学习之state

    state作用 保留当前key的历史状态。 state用法 ListStateInteger vipList = getRuntimeContext().getListState(new ListStateDescriptorInteger(\\\"vipList\\\", TypeInformation.of(Integer.class))); 有valueState listState mapstate 。冒失没有setstate state案例 比如起点的小说不能被下载。别人只能通过截屏,提取文字的方式盗版小

    2024年02月09日
    浏览(7)
  • Flink State 状态原理解析

    State 用于记录 Flink 应用在运行过程中,算子的中间计算结果或者元数据信息。运行中的 Flink 应用如果需要上次计算结果进行处理的,则需要使用状态存储中间计算结果。如 Join、窗口聚合场景。 Flink 应用运行中会保存状态信息到 State 对象实例中,State 对象实例通过 StateBac

    2024年02月05日
    浏览(15)
  • Flink_state 的优化与 remote_state 的探索

    Flink_state 的优化与 remote_state 的探索

    摘要:本文整理自 bilibili 资深开发工程师张杨,在 Flink Forward Asia 2022 核心技术专场的分享。本篇内容主要分为四个部分: 相关背景 state 压缩优化 Remote state 探索 未来规划 点击查看原文视频 演讲PPT 1.1 业务概况 从业务规模来讲,B 站目前大约是 4000+的 Flink 任务,其中 95%是

    2024年02月11日
    浏览(9)
  • Flink源码之State创建流程

    Flink源码之State创建流程

    StreamOperatorStateHandler 在StreamTask启动初始化时通过StreamTaskStateInitializerImpl::streamOperatorStateContext会为每个StreamOperator 创建keyedStatedBackend和operatorStateBackend,在AbstractStreamOperator中有个StreamOperatorStateHandler成员变量,调用AbstractStreamOperator::initializeState方法中会初始化StreamOperatorStateH

    2024年02月12日
    浏览(10)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包