【flink-sql实战】flink 主键声明与upsert功能实战

这篇具有很好参考价值的文章主要介绍了【flink-sql实战】flink 主键声明与upsert功能实战。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一. flink 主键声明语法

主键用作 Flink 优化的一种提示信息。主键限制表明一张表或视图的某个(些)列是唯一的并且不包含 Null 值。 主键声明的列都是非 nullable 的。因此主键可以被用作表行级别的唯一标识。

主键可以和列的定义一起声明,也可以独立声明为表的限制属性,不管是哪种方式,主键都不可以重复定义,否则 Flink 会报错。

 
有效性检查

SQL 标准主键限制可以有两种模式:ENFORCED 或者 NOT ENFORCED。 它申明了是否输入/出数据会做合法性检查(是否唯一)。
 
Flink 不存储数据因此只支持 NOT ENFORCED 模式,即不做检查,用户需要自己保证唯一性。

注意: 在 CREATE TABLE 语句中,创建主键会修改列的 nullable 属性,主键声明的列默认都是非 Nullable 的。

 
sql声明语法:

CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
  (
    { <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n]
    [ <watermark_definition> ]
    [ <table_constraint> ][ , ...n]
  )
 ...

<column_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED

<table_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED
...

联合主键声明

 create table t_sink_01 ( 
f1 varchar, 
f2 varchar,
f3 int,
f4 timestamp(3),
f5 varchar, 
primary key(f1,f2)  NOT ENFORCED  -- 主键声明,字段之间逗号分隔
)
with( 
..
) ;

 
 

二. 物理表创建联合主键表

CREATE TABLE test003(
     id INT(10),
     name VARCHAR(25),
     age int(10),
     PRIMARY KEY(id,name));


desc test003

Field|Type       |Null|Key|Default|Extra|
-----+-----------+----+---+-------+-----+
id   |int        |NO  |PRI|       |     |
name |varchar(25)|NO  |PRI|       |     |
age  |int        |YES |   |       |     |

 文章来源地址https://www.toymoban.com/news/detail-768571.html

三. flink sql使用

CREATE TABLE source
(   `id` int,
 	`username` varchar,
 	`age` int
) WITH (
  'connector' = 'binlog-x'
      ,'username' = 'root'
      ,'password' = '11111111'
      ,'cat' = 'insert,delete,update'
      ,'url' = 'jdbc:mysql://10.17.31.234:3306/360test'
      ,'host' = '10.17.31.234'
      ,'port' = '3306'
      -- 什么都不加:最新位置消费
      -- 加文件名,从此文件开头消费
       ,'journal-name' = 'binlog.000194'
      --  ,'timestamp'='169944781200'
      ,'table' = '360test.dimension_table'
      ,'timestamp-format.standard' = 'SQL'
      );
CREATE TABLE sink
(   `id` int,
 	`name` varchar,
 	`age` int,
 	PRIMARY KEY (id,name) NOT ENFORCED
) WITH (
        'connector' = 'mysql-x',
           'url' = 'jdbc:mysql://localhost:3306/360test',
           'table-name' = 'test003',
           'username' = 'root',
           'password' = '11111111',
           'sink.buffer-flush.max-rows' = '1024', -- 批量写数据条数,默认:1024
           'sink.buffer-flush.interval' = '10000', -- 批量写时间间隔,默认:10000毫秒
           -- insert时的选项,覆盖或者忽略。
           -- 声明了主键时,设置all-replace为true,全部更新覆盖,
           -- 或者是忽略,即来的新数据不插入?
           'sink.all-replace' = 'true', -- 解释如下(其他rdb数据库类似):默认:false。定义了PRIMARY KEY才有效,否则是追加语句
                                       -- sink.all-replace = 'true' 生成如:INSERT INTO `result3`(`mid`, `mbb`, `sid`, `sbb`) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE `mid`=VALUES(`mid`), `mbb`=VALUES(`mbb`), `sid`=VALUES(`sid`), `sbb`=VALUES(`sbb`) 。会将所有的数据都替换。
                                       -- sink.all-replace = 'false' 生成如:INSERT INTO `result3`(`mid`, `mbb`, `sid`, `sbb`) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE `mid`=IFNULL(VALUES(`mid`),`mid`), `mbb`=IFNULL(VALUES(`mbb`),`mbb`), `sid`=IFNULL(VALUES(`sid`),`sid`), `sbb`=IFNULL(VALUES(`sbb`),`sbb`) 。如果新值为null,数据库中的旧值不为null,则不会覆盖。
           -- 新增写入选项:默认会判断,当声明了key则是update
           'sink.parallelism' = '1'    -- 写入结果的并行度,默认:null
      );
insert into sink select id,username as name,age as age  from source;


到了这里,关于【flink-sql实战】flink 主键声明与upsert功能实战的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • Flink-SQL——时态表(Temporal Table)

    Flink-SQL——时态表(Temporal Table)

    这里我们需要注意一下的是虽然我们介绍的是Flink 的 Temporal Table 但是这个概念最早是在数据库中提出的 在ANSI-SQL 2011 中提出了Temporal 的概念,Oracle,SQLServer,DB2等大的数据库厂商也先后实现了这个标准。Temporal Table记录了历史上任何时间点所有的数据改动,Temporal Table的工作

    2024年01月16日
    浏览(12)
  • Flink-SQL——动态表 (Dynamic Table)

    Flink-SQL——动态表 (Dynamic Table)

    SQL 和关系代数在设计时并未考虑流数据。因此,在关系代数(和 SQL)之间几乎没有概念上的差异。 本文会讨论这种差异,并介绍 Flink 如何在无界数据集上实现与数据库引擎在有界数据上的处理具有相同的语义。 下表比较了传统的关系代数和流处理与输入数据、执行和输出结果

    2024年01月17日
    浏览(12)
  • flink-sql对kafka数据进行清洗过滤

    今天这篇blog主要记录使用flink-sql对kafka中的数据进行过滤。 以前对kafka数据进行实时处理时都是使用java来进行flink开发,需要创建一个工程,并且打成jar包再提交,流程固定但对于简单任务来说还是比较繁琐的。 今天我们要对logstash采集到kafka中的数据进行过滤筛选,将筛选

    2024年02月16日
    浏览(13)
  • Flink系列之:Upsert Kafka SQL 连接器

    Scan Source: Unbounded 、 Sink: Streaming Upsert Mode Upsert Kafka 连接器支持以 upsert 方式从 Kafka topic 中读取数据并将数据写入 Kafka topic。 作为 source,upsert-kafka 连接器生产 changelog 流,其中每条数据记录代表一个更新或删除事件。更准确地说,数据记录中的 value 被解释为同一 key 的最后一

    2024年01月16日
    浏览(14)
  • Flink-SQL join 优化 -- MiniBatch + local-global

    Flink-SQL join 优化 -- MiniBatch + local-global

    问题1. 近期在开发flink-sql期间,发现数据在启动后,任务总是进行重试,运行一段时间后,container heartbeat timeout,内存溢出(GC overhead limit exceede) ,作业无法进行正常工作 问题2. 未出现container心跳超时的,作业运行缓慢,超过一天 ,作业仍存在反压情况 查看日志内容发现,出

    2024年02月06日
    浏览(12)
  • Flink系列Table API和SQL之:动态表、持续查询、将流转换成动态表、更新查询、追加查询、将动态表转换为流、更新插入流(Upsert)

    Flink系列Table API和SQL之:动态表、持续查询、将流转换成动态表、更新查询、追加查询、将动态表转换为流、更新插入流(Upsert)

    Flink中使用表和SQL基本上跟其他场景是一样的。不过对于表和流的转换,却稍显复杂。当我们将一个Table转换成DataStream时,有\\\"仅插入流\\\"(Insert-Only Streams)和\\\"更新日志流\\\"(Changelog Streams)两种不同的方式,具体使用哪种方式取决于表中是否存在更新操作。 这种麻烦其实是不可避

    2024年02月03日
    浏览(9)
  • Flink CDC 基于mysql binlog 实时同步mysql表(无主键)

    Flink CDC 基于mysql binlog 实时同步mysql表(无主键)

    环境说明: flink 1.15.2 mysql 版本5.7    注意:需要开启binlog,因为增量同步是基于binlog捕获数据 windows11 IDEA 本地运行 具体前提设置,请看这篇,包含 binlog 设置、Maven...... Flink CDC 基于mysql binlog 实时同步mysql表_彩虹豆的博客-CSDN博客 经过不懈努力,终于从阿里help页面找到了支

    2024年02月08日
    浏览(16)
  • Flink CDC 基于Oracle log archiving 实时同步Oracle表到Mysql(无主键)

    环境说明: flink 1.15.2 Oracle 版本:Oracle Database 11g Enterprise Edition Release 11.2.0.1.0 - 64bit Production mysql 版本:5.7 windows11 IDEA 本地运行 具体环境设置和maven依赖请看上篇:Flink CDC 基于Oracle log archiving 实时同步Oracle表到Mysql_彩虹豆的博客-CSDN博客 现在操作的是源表和目标表都无主键数

    2024年02月15日
    浏览(17)
  • Flink 实时数仓关键技术解读:Upsert Kafka 和 动态表(Dynamic Table)

    Flink 实时数仓关键技术解读:Upsert Kafka 和 动态表(Dynamic Table)

    博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧

    2024年02月22日
    浏览(9)
  • 【flink报错】flink cdc无主键时的操作

    “org.apache.flink.table.api.validationexception: ‘scan.incremental.snapshot.chunk.key-column’ must be set when the table doesn’t have primary keys” 报错提示当表没有主键时,必须设置 ‘scan.incremental.snapshot.chunk.key-column’。 这里表没有主键,不是flink table中设置的primary key,而是物理表中没有主键。 如

    2024年04月23日
    浏览(7)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包