flink postgresql cdc实时同步(含pg安装配置等)

这篇具有很好参考价值的文章主要介绍了flink postgresql cdc实时同步(含pg安装配置等)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1. 环境信息

类型 版本/描述
docker 20.10.9
Postgresql 10.6
初始化账号密码:postgres/postgres
普通用户:test1/test123
数据库:test_db
flink 1.13.6

2. 安装

step1: 拉取 PostgreSQL 10.6 版本的镜像:

docker pull postgres:10.6

step2:创建并启动 PostgreSQL 容器,在这里,我们将把容器的端口 5432 映射到主机的端口 30028,账号密码设置为postgres,并将 pgoutput 插件加载到 PostgreSQL 实例中:

docker run -d -p 30028:5432 --name postgres-10.6 -e POSTGRES_PASSWORD=postgres postgres:10.6 -c 'shared_preload_libraries=pgoutput'

step3: 查看容器是否创建成功:

docker ps | grep postgres-10.6

3. 配置

step1:docker进去Postgresql数据的容器:

docker exec -it postgres-10.6  bash

step2:编辑postgresql.conf配置文件:

vi /var/lib/postgresql/data/postgresql.conf 

配置内容如下:

# 更改wal日志方式为logical(方式有:minimal、replica 、logical  )
wal_level = logical  

# 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots
max_replication_slots = 20

# 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样
max_wal_senders = 20     

# 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s,0表示禁用)
wal_sender_timeout = 180s	          

step3:重启容器:

docker restart postgres-10.6

连接数据库,如果查询一下语句,返回logical表示修改成功:

SHOW wal_level;

4. 新建用户并赋权

使用创建容器时的账号密码(postgres/postgres)登录Postgresql数据库。

先创建数据库和表:

-- 创建数据库 test_db
CREATE DATABASE test_db;

-- 连接到新创建的数据库 test_db
\c test_db

-- 创建 t_user 表
CREATE TABLE "public"."t_user" (
    "id" int8 NOT NULL,
    "name" varchar(255),
    "age" int2,
    PRIMARY KEY ("id")
);

新建用户并且给用户权限:

-- pg新建用户
CREATE USER test1 WITH PASSWORD 'test123';

-- 给用户复制流权限
ALTER ROLE test1 replication;

-- 给用户登录数据库权限
GRANT CONNECT ON DATABASE test_db to test1;

-- 把当前库public下所有表查询权限赋给用户
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO test1;

5. 发布表

-- 设置发布为true
update pg_publication set puballtables=true where pubname is not null;

-- 把所有表进行发布
CREATE PUBLICATION dbz_publication FOR ALL TABLES;

-- 查询哪些表已经发布
select * from pg_publication_tables;

更改表的复制标识包含更新和删除的值:

-- 更改复制标识包含更新和删除之前值(目的是为了确保表 t_user 在实时同步过程中能够正确地捕获并同步更新和删除的数据变化。如果不执行这两条语句,那么 t_user 表的复制标识可能默认为 NOTHING,这可能导致实时同步时丢失更新和删除的数据行信息,从而影响同步的准确性)
ALTER TABLE t_user REPLICA IDENTITY FULL;

-- 查看复制标识(为f标识说明设置成功,f(表示 full),否则为 n(表示 nothing),即复制标识未设置)
select relreplident from pg_class where relname='t_user';

6. flink sql

-- 源表定义
CREATE TABLE `table_source_pg` (
      id BIGINT,
      name STRING,
      age INT
      ) WITH (
      'connector' = 'postgres-cdc',
      'hostname' = '10.194.183.120',
      'port' = '30028',
      'username' = 'test1',
      'password' = 'test123',
      'database-name' = 'test_db',
      'schema-name' = 'public',
      'table-name' = 't_user',
      'decoding.plugin.name' = 'pgoutput'
)

-- 目标表表定义
CREATE TABLE `table_sink_mysql` (
      id BIGINT,
      name STRING,
      age INT,
      PRIMARY KEY (`id`) NOT ENFORCED
      ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:mysql://10.194.183.120:30306/test',
      'username' = 'root',
      'password' = 'root',
      'table-name' = 't_user_copy'
)

-- insert语句
INSERT INTO `table_sink_mysql` (`id`, `name`, `age`) (SELECT `id`, `name`, `age` FROM `table_source_pg`)

7. 命令汇总

-- pg新建用户
CREATE USER test1 WITH PASSWORD 'test123';

-- 给用户复制流权限
ALTER ROLE ODPS_ETL replication;

-- 给用户数据库权限
GRANT CONNECT ON DATABASE test_db to test1;

-- 设置发布开关
update pg_publication set puballtables=true where pubname is not null;

-- 把所有表进行发布
CREATE PUBLICATION dbz_publication FOR ALL TABLES;

-- 查询哪些表已经发布
select * from pg_publication_tables;

-- 给表查询权限
grant select on TABLE aa to ODPS_ETL;

-- 给用户读写权限
grant select,insert,update,delete ON  ALL TABLES IN SCHEMA public to bd_test;

-- 把当前库所有表查询权限赋给用户
GRANT SELECT ON ALL TABLES IN SCHEMA public TO ODPS_ETL;

-- 把当前库以后新建的表查询权限赋给用户
alter default privileges in schema public grant select on tables to ODPS_ETL;

-- 更改复制标识包含更新和删除之前值
ALTER TABLE test0425 REPLICA IDENTITY FULL;

-- 查看复制标识
select relreplident from pg_class where relname='test0425';

-- 查看solt使用情况
SELECT * FROM pg_replication_slots;

-- 删除solt
SELECT pg_drop_replication_slot('zd_org_goods_solt');

-- 查询用户当前连接数
select usename, count(*) from pg_stat_activity group by usename order by count(*) desc;

-- 设置用户最大连接数
alter role odps_etl connection limit 200;

附:文章来源地址https://www.toymoban.com/news/detail-672886.html

  • 参考文章:https://www.cnblogs.com/xiongmozhou/p/14817641.html

到了这里,关于flink postgresql cdc实时同步(含pg安装配置等)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • flink oracle cdc实时同步(超详细)

    flink oracle cdc实时同步(超详细)

    官方文档:https://github.com/ververica/flink-cdc-connectors/blob/release-master/docs/content/connectors/oracle-cdc.md 本文参照官方文档来记录Oracle CDC 的配置。 在本文开始前,需要先安装Oracle,有兴趣的同学可以参考博主之前写的《docker下安装oracle11g(一次安装成功)》。 如果要做oracle的实时同步

    2024年02月12日
    浏览(13)
  • 【实战-01】flink cdc 实时数据同步利器

    【实战-01】flink cdc 实时数据同步利器

    cdc github源码地址 cdc官方文档 对很多初入门的人来说是无法理解cdc到底是什么个东西。 有这样一个需求,比如在mysql数据库中存在很多数据,但是公司要把mysql中的数据同步到数据仓库(starrocks), 数据仓库你可以理解为存储了各种各样来自不同数据库中表。 数据的同步目前对

    2023年04月08日
    浏览(43)
  • 【FLINK】Kafka数据源通过Flink-cdc进行实时数据同步

    【FLINK】Kafka数据源通过Flink-cdc进行实时数据同步

    CDC是Change Data Capture的缩写,中文意思是 变更数据获取 ,flink-cdc的作用是,通过flink捕获数据源的事务变动操作记录,包括数据的增删改操作等,根据这些记录可作用于对目标端进行实时数据同步。 下图是flink-cdc最新支持的数据源类型: kafka的数据源要通过flink-cdc进行实时数

    2024年02月12日
    浏览(22)
  • 基于Flink CDC实时同步数据(MySQL到MySQL)

    基于Flink CDC实时同步数据(MySQL到MySQL)

    jdk8 Flink 1.16.1(部署在远程服务器:192.168.137.99) Flink CDC 2.3.0 MySQL 8.0(安装在本地:192.168.3.31) (安装部署过程略) 准备三个数据库:flink_source、flink_sink、flink_sink_second。 将flink_source.source_test表实时同步到flink_sink和flink_sink_second的sink_test表。 (建库建表过程略) 开发过程

    2024年02月06日
    浏览(16)
  • 用flink cdc sqlserver 将数据实时同步到clickhouse

    flink cdc 终于支持 sqlserver 了。 现在互联网公司用sqlserver的不多,大部分都是一些国企的老旧系统。我们以前同步数据,都是用datax,但是不能实时同步数据。现在有了flinkcdc,可以实现实时同步了。 1、首先sqlserver版本:要求sqlserver版本为14及以上,也就是 SQL Server 2017 版。

    2023年04月08日
    浏览(10)
  • Flink CDC 基于mysql binlog 实时同步mysql表

    Flink CDC 基于mysql binlog 实时同步mysql表

    环境说明: flink 1.15.2 mysql 版本5.7    注意:需要开启binlog,因为增量同步是基于binlog捕获数据 windows11 IDEA 本地运行 先上官网使用说明和案例:MySQL CDC Connector — Flink CDC documentation 1. mysql开启binlog (注意,引擎是 InnoDB,如果是ndbcluster,本人测试是捕获不到binlog日志的,增量相

    2024年02月10日
    浏览(15)
  • 基于 Flink CDC 构建 MySQL 到 Databend 的 实时数据同步

    基于 Flink CDC 构建 MySQL 到 Databend 的 实时数据同步

    这篇教程将展示如何基于 Flink CDC 快速构建 MySQL 到 Databend 的实时数据同步。本教程的演示都将在 Flink SQL CLI 中进行,只涉及 SQL,无需一行 Java/Scala 代码,也无需安装 IDE。 假设我们有电子商务业务,商品的数据存储在 MySQL ,我们需要实时把它同步到 Databend 中。 接下来的内容

    2024年02月10日
    浏览(21)
  • 使用Flink CDC将Mysql中的数据实时同步到ES

    最近公司要搞搜索,需要把mysql中的数据同步到es中来进行搜索,由于公司已经搭建了flink集群,就打算用flink来做这个同步。本来以为很简单,跟着官网文档走就好了,结果没想到折腾了将近一周的时间…… 我也是没想到,这玩意网上资源竟然这么少,找到的全部都是通过

    2024年02月11日
    浏览(16)
  • Flink CDC2.4 整库实时同步MySql 到Doris

            Flink 1.15.4          目前有很多工具都支持无代码实现Mysql - Doris 的实时同步         如:SlectDB 已发布的功能包                 Dinky SeaTunnel TIS 等等          不过好多要么不支持表结构变动,要不不支持多sink,我们的业务必须支持对表结构的实时级变动

    2024年02月11日
    浏览(19)
  • Flink CDC 基于mysql binlog 实时同步mysql表(无主键)

    Flink CDC 基于mysql binlog 实时同步mysql表(无主键)

    环境说明: flink 1.15.2 mysql 版本5.7    注意:需要开启binlog,因为增量同步是基于binlog捕获数据 windows11 IDEA 本地运行 具体前提设置,请看这篇,包含 binlog 设置、Maven...... Flink CDC 基于mysql binlog 实时同步mysql表_彩虹豆的博客-CSDN博客 经过不懈努力,终于从阿里help页面找到了支

    2024年02月08日
    浏览(16)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包