Flink实时同步MySQL与Doris数据

这篇具有很好参考价值的文章主要介绍了Flink实时同步MySQL与Doris数据。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

参考:

技术解析|Doris Connector 结合 Flink CDC 实现 MySQL 分库分表 Exactly Once 精准接入-阿里云开发者社区

逻辑图:

mysql 同步doris,Flink-cdc,数据库,mysql,java

1. Flink环境:

https://flink.apache.org/zh/

  • 下载flink-1.15.1
wget https://dlcdn.apache.org/flink/flink-1.15.1/flink-1.15.1-bin-scala_2.12.tgz
  • 解压,修改配置
tar -zxvf flink-1.15.1-bin-scala_2.12.tgz cd flink-1.15.1
  • 修改配置
修改rest.bind-address为 0.0.0.0
vi conf/flink-conf.yaml
  • 下载依赖jar包 至 flink安装目录lib下
wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.1/flink-sql-connector-mysql-cdc-2.2.1.jar 

wget https://repo1.maven.org/maven2/org/apache/doris/flink-doris-connector-1.14_2.12/1.0.3/flink-doris-connector-1.14_2.12-1.0.3.jar
  • 启动flink
./bin/start-cluster.sh

mysql 同步doris,Flink-cdc,数据库,mysql,java

  • 访问WebUI

http://192.168.0.158:8081

2、MySQL数据表及数据

  1. 开启Binlog,进入容器修改/etc/mysql/mysql.cnf,然后重启mysql
[mysqld] 
log_bin=mysql_bin 
binlog-format=Row 
server-id=1
  1. 进入MySQL命令行:创建数据库emp,数据表employee:
CREATE DATABASE emp; 

USE emp; 

CREATE TABLE employee ( 
emp_no INT NOT NULL, 
birth_date DATE NOT NULL, 
first_name VARCHAR(14) NOT NULL, 
last_name VARCHAR(16) NOT NULL, 
gender ENUM ('M','F') NOT NULL, 
hire_date DATE NOT NULL, PRIMARY KEY (emp_no) 
); ​ 

INSERT INTO `employee` VALUES 
(10001,'1953-09-02','Georgi','Facello','M','1986-06-26'), 
(10002,'1964-06-02','Bezalel','Simmel','F','1985-11-21'), 
(10003,'1959-12-03','Parto','Bamford','M','1986-08-28'), 
(10004,'1954-05-01','Chirstian','Koblick','M','1986-12-01'), 
(10005,'1955-01-21','Kyoichi','Maliniak','M','1989-09-12'), 
(10006,'1953-04-20','Anneke','Preusig','F','1989-06-02'), 
(10007,'1957-05-23','Tzvetan','Zielinski','F','1989-02-10'), 
(10008,'1958-02-19','Saniya','Kalloufi','M','1994-09-15'), 
(10009,'1952-04-19','Sumant','Peac','F','1985-02-18'), 
(10010,'1963-06-01','Duangkaew','Piveteau','F','1989-08-24'), 
(10011,'1953-11-07','Mary','Sluis','F','1990-01-22'), 
(10012,'1960-10-04','Patricio','Bridgland','M','1992-12-18'), 
(10013,'1963-06-07','Eberhardt','Terkki','M','1985-10-20'), 
(10014,'1956-02-12','Berni','Genin','M','1987-03-11'), 
(10015,'1959-08-19','Guoxiang','Nooteboom','M','1987-07-02'), 
(10016,'1961-05-02','Kazuhito','Cappelletti','M','1995-01-27'), 
(10017,'1958-07-06','Cristinel','Bouloucos','F','1993-08-03'), 
(10018,'1954-06-19','Kazuhide','Peha','F','1987-04-03'), 
(10019,'1953-01-23','Lillian','Haddadi','M','1999-04-30'), 
(10020,'1952-12-24','Mayuko','Warwick','M','1991-01-26');

3. Doris数据表

  1. 进入MySQL命令行:创建Doris数据库demo,数据表employee_info
CREATE DATABASE demo; 

USE demo; 

CREATE TABLE employee_info ( 
emp_no int NOT NULL, 
birth_date date, 
first_name varchar(20), 
last_name varchar(20), 
gender char(2), 
hire_date date, 
database_name varchar(50), 
table_name varchar(200) 
) 
UNIQUE KEY(`emp_no`, `birth_date`) 
DISTRIBUTED BY HASH(`birth_date`) BUCKETS 1 
PROPERTIES ( "replication_allocation" = "tag.location.default: 1" );

4. Flink数据表及数据

  • 启动fink-sql-client
./bin/sql-client.sh embedded

mysql 同步doris,Flink-cdc,数据库,mysql,java

  • 开启Checkpoint
Flink作业周期性执行checkpoint,记录Binlog位点,当作业发生Failover时,便会从之前记录的Binlog位点继续处理。
生产环境建议设置为60秒。
Flink SQL> SET execution.checkpointing.interval = 10s

mysql 同步doris,Flink-cdc,数据库,mysql,java

  • 创建MySQL CDC表
Flink SQL> CREATE TABLE employee_source ( 
database_name STRING METADATA VIRTUAL, 
table_name STRING METADATA VIRTUAL, 
emp_no int NOT NULL, 
birth_date date, 
first_name STRING, 
last_name STRING, 
gender STRING, 
hire_date date, 
PRIMARY KEY (`emp_no`) NOT ENFORCED 
) 
WITH ( 
'connector' = 'mysql-cdc', 
'hostname' = 'localhost', 
'port' = '3336', 
'username' = 'root', 
'password' = '1234.abcd', 
'database-name' = 'emp', 
'table-name' = 'employee' 
);

查询数据:

Flink SQL> select * from employee_source limit 10;

mysql 同步doris,Flink-cdc,数据库,mysql,java

  • 创建Doris Sink表
Flink SQL> CREATE TABLE cdc_doris_sink ( 
emp_no int , 
birth_date STRING, 
first_name STRING, 
last_name STRING, 
gender STRING, 
hire_date STRING, 
database_name STRING, 
table_name STRING 
) 
WITH ( 
'connector' = 'doris', 
'fenodes' = 'localhost:8030', 
'table.identifier' = 'demo.employee_info', 
'username' = 'root', 
'password' = '1234.abcd' 
);
参数说明:
connector : 指定连接器是doris
fenodes:doris FE节点IP地址及http port
table.identifier : Doris对应的数据库及表名
username:doris用户名
password:doris用户密码

查询数据:

Flink SQL> select * from cdc_doris_sink;

mysql 同步doris,Flink-cdc,数据库,mysql,java

  • 添加数据同步任务
Flink SQL> insert into cdc_doris_sink (emp_no,birth_date,first_name,last_name,gender,hire_date,database_name,table_name) 
select emp_no,cast(birth_date as string) as birth_date ,first_name,last_name,gender,cast(hire_date as string) as hire_date ,database_name,table_name from employee_source;

WebUI可以看到正在执行中的任务,说明添加完成

mysql 同步doris,Flink-cdc,数据库,mysql,java

查看Doris数据表中数据

mysql> select * from employee_info;

5. 问题说明:

NoResourceAvailableException: Could not acquire the minimum required resources

进入flink目录,修改conf/conf/flink-conf.yaml:taskmanager.numberOfTaskSlots: 4 , 一般配置为cpu的个数。文章来源地址https://www.toymoban.com/news/detail-640043.html

到了这里,关于Flink实时同步MySQL与Doris数据的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • flink-cdc同步mysql数据到elasticsearch

    flink-cdc同步mysql数据到elasticsearch

    CDC是(Change Data Capture 变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据 或 数据表的插入INSERT、更新UPDATE、删除DELETE等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。 cdc项目地址:https://github.com/ver

    2024年02月13日
    浏览(12)
  • 使用Apache Doris自动同步整个 MySQL/Oracle 数据库进行数据分析

    使用Apache Doris自动同步整个 MySQL/Oracle 数据库进行数据分析

    Flink-Doris-Connector 1.4.0 允许用户一步将包含数千个表的整个数据库(MySQL或Oracle )摄取到Apache Doris(一种实时分析数据库)中。 通过内置的Flink CDC,连接器可以直接将上游源的表模式和数据同步到Apache Doris,这意味着用户不再需要编写DataStream程序或在Doris中预先创建映射表。

    2024年02月09日
    浏览(15)
  • 【FLINK】Kafka数据源通过Flink-cdc进行实时数据同步

    【FLINK】Kafka数据源通过Flink-cdc进行实时数据同步

    CDC是Change Data Capture的缩写,中文意思是 变更数据获取 ,flink-cdc的作用是,通过flink捕获数据源的事务变动操作记录,包括数据的增删改操作等,根据这些记录可作用于对目标端进行实时数据同步。 下图是flink-cdc最新支持的数据源类型: kafka的数据源要通过flink-cdc进行实时数

    2024年02月12日
    浏览(21)
  • ApacheStreamPark2.1.0部署及执行flink-cdc任务同步mysql表的数据到es的实践

    ApacheStreamPark2.1.0部署及执行flink-cdc任务同步mysql表的数据到es的实践

    ApacheStreamPark是流处理极速开发框架,流批一体 湖仓一体的云原生平台,一站式流处理计算平台。   特性中的简单易用和文档详尽这两点我也是深有体会的,部署一点都不简单,照着官方文档都不一定能搞出来,下面部署环节慢慢来吐槽吧。   之前我们写 Flink SQL 基本上

    2024年02月11日
    浏览(13)
  • Doris通过Flink CDC接入MySQL实战

    1. 创建MySQL库表,写入demo数据 登录测试MySQL 创建MySQL库表,写入demo数据 注意:MySQL需要开通bin-log log_bin=mysql_bin binlog-format=Row server-id=1 2. 创建Doris库表 创建Doris表 3. 启动Flink 启动flink 创建Flink 任务: 输入如下地址,查看flink任务 http://localhost:8081/#/job/running 数据验证:启动后可

    2023年04月10日
    浏览(11)
  • flink cdc同步Oracle数据库资料到Doris问题集锦

    java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava18/com/google/common/util/concurrent/ThreadFactoryBuilder at com.ververica.cdc.debezium.DebeziumSourceFunction.open(DebeziumSourceFunction.java:218) ~[flink-connector-debezium-2.2.0.jar:2.2.0] at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) ~[flink-co

    2024年02月16日
    浏览(13)
  • 基于Flink CDC实时同步数据(MySQL到MySQL)

    基于Flink CDC实时同步数据(MySQL到MySQL)

    jdk8 Flink 1.16.1(部署在远程服务器:192.168.137.99) Flink CDC 2.3.0 MySQL 8.0(安装在本地:192.168.3.31) (安装部署过程略) 准备三个数据库:flink_source、flink_sink、flink_sink_second。 将flink_source.source_test表实时同步到flink_sink和flink_sink_second的sink_test表。 (建库建表过程略) 开发过程

    2024年02月06日
    浏览(16)
  • Flink+Doris 实时数仓

    Flink+Doris 实时数仓

    Doris基本原理 Doris基本架构非常简单,只有FE(Frontend)、BE(Backend)两种角色,不依赖任何外部组件,对部署和运维非常友好。架构图如下 可以 看到Doris 的数仓架构十分简洁,不依赖 Hadoop 生态组件,构建及运维成本较低。 FE(Frontend)以 Java 语言为主,主要功能职责: 接收用户

    2024年02月07日
    浏览(11)
  • 基于 Flink CDC 构建 MySQL 到 Databend 的 实时数据同步

    基于 Flink CDC 构建 MySQL 到 Databend 的 实时数据同步

    这篇教程将展示如何基于 Flink CDC 快速构建 MySQL 到 Databend 的实时数据同步。本教程的演示都将在 Flink SQL CLI 中进行,只涉及 SQL,无需一行 Java/Scala 代码,也无需安装 IDE。 假设我们有电子商务业务,商品的数据存储在 MySQL ,我们需要实时把它同步到 Databend 中。 接下来的内容

    2024年02月10日
    浏览(21)
  • 使用Flink CDC将Mysql中的数据实时同步到ES

    最近公司要搞搜索,需要把mysql中的数据同步到es中来进行搜索,由于公司已经搭建了flink集群,就打算用flink来做这个同步。本来以为很简单,跟着官网文档走就好了,结果没想到折腾了将近一周的时间…… 我也是没想到,这玩意网上资源竟然这么少,找到的全部都是通过

    2024年02月11日
    浏览(16)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包