flinkcdc抽取oracle数据(oracle cdc详细文档)

这篇具有很好参考价值的文章主要介绍了flinkcdc抽取oracle数据(oracle cdc详细文档)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

摘要

Flink一般常用的集群模式有 flink on yarn 和standalone模式。
yarn模式需要搭建hadoop集群,该模式主要依靠hadoop的yarn资源调度来实现flink的高可用,达到资源的充分利用和合理分配。一般用于生产环境。
standalone模式主要利用flink自带的分布式集群来提交任务,该模式的优点是不借助其他外部组件,缺点是资源不足需要手动处理。
本文主要以 standalone集群模式为例。

提示:flinkcdc获取oracle date日期字段的值存在时差而且是long型
一种方法:改java代码 例如:
preparedStatement.setObject(i, new Timestamp((Long) o - 8 * 60 * 60 * 1000));
另一种方法:flink-conf.yaml添加(未验证)
env.java.opts.taskmanager: -Duser.timezone=GMT+08

1.项目添加flink依赖

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.test</groupId>
    <artifactId>test-cdc</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <java.version>1.8</java.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
        <fastjson.vsersion>1.2.68</fastjson.vsersion>
        <druid.version>1.2.8</druid.version>
        <flink.version>1.14.3</flink.version>
        <flinkcdc.vsersion>2.3.0</flinkcdc.vsersion>
        <scala.version>2.12</scala.version>
        <postgresql.version>42.2.12</postgresql.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <version>${postgresql.version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>${druid.version}</version>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.9</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.version}</artifactId>
            <version>${flink.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>kafka-clients</artifactId>
                    <groupId>org.apache.kafka</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-oracle-cdc</artifactId>
            <version>${flinkcdc.vsersion}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

       <!--  <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-table-planner-blink_${scala.version}</artifactId>
                    <version>${flink.version}</version>
                </dependency>
              <dependency>
                       <groupId>mysql</groupId>
                       <artifactId>mysql-connector-java</artifactId>
                       <version>8.0.15</version>
                </dependency>
                <dependency>
                    <groupId>com.oracle.database.jdbc</groupId>
                    <artifactId>ojdbc10</artifactId>
                    <version>19.10.0.0</version>
                </dependency>
          -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.vsersion}</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.20</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-logging</artifactId>
            <version>2.1.5.RELEASE</version>
        </dependency>
        <!--        <dependency>-->
        <!--            <groupId>org.slf4j</groupId>-->
        <!--            <artifactId>slf4j-api</artifactId>-->
        <!--            <version>1.7.32</version>-->
        <!--        </dependency>-->
        <!--  slf4j 内置的简单实现      -->
        <!--                <dependency>-->
        <!--                    <groupId>org.slf4j</groupId>-->
        <!--                    <artifactId>slf4j-simple</artifactId>-->
        <!--                    <version>1.7.32</version>-->
        <!--                </dependency>-->
        <!--                <dependency>-->
        <!--            <groupId>ch.qos.logback</groupId>-->
        <!--            <artifactId>logback-core</artifactId>-->
        <!--            <version>1.2.11</version>-->
        <!--        </dependency>-->
        <!--        <dependency>-->
        <!--            <groupId>ch.qos.logback</groupId>-->
        <!--            <artifactId>logback-classic</artifactId>-->
        <!--            <version>1.2.11</version>-->
        <!--        </dependency>-->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>5.3.22</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.12.7.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

2.oracle开启日志归档

sqlplus / as sysdba

启用日志归档
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/home/oracle/oracle-data-test' scope=spfile;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;

检查日志归档是否开启
archive log list;

为捕获的数据库启用补充日志记录,以便数据更改捕获更改的数据库行之前的状态,下面说明了如何在数据库级别进行配置。
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

创建表空间
CREATE TABLESPACE logminer_tbs DATAFILE '/home/oracle/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;

创建用户flinkcdc绑定表空间LOGMINER_TBS
CREATE USER flinkcdc IDENTIFIED BY flinkcdc DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;

授予flinkcdc用户dba的权限
 grant connect,resource,dba to flinkcdc;

并授予权限
  GRANT CREATE SESSION TO flinkcdc;
  GRANT SELECT ON V_$DATABASE to flinkcdc;
  GRANT FLASHBACK ANY TABLE TO flinkcdc;
  GRANT SELECT ANY TABLE TO flinkcdc;
  GRANT SELECT_CATALOG_ROLE TO flinkcdc;
  GRANT EXECUTE_CATALOG_ROLE TO flinkcdc;
  GRANT SELECT ANY TRANSACTION TO flinkcdc;
  GRANT EXECUTE ON SYS.DBMS_LOGMNR TO flinkcdc;
  GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkcdc;
  GRANT CREATE TABLE TO flinkcdc;
  GRANT LOCK ANY TABLE TO flinkcdc;
  GRANT ALTER ANY TABLE TO flinkcdc;
  GRANT CREATE SEQUENCE TO flinkcdc;

  GRANT EXECUTE ON DBMS_LOGMNR TO flinkcdc;
  GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkcdc;

  GRANT SELECT ON V_$LOG TO flinkcdc;
  GRANT SELECT ON V_$LOG_HISTORY TO flinkcdc;
  GRANT SELECT ON V_$LOGMNR_LOGS TO flinkcdc;
  GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkcdc;
  GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkcdc;
  GRANT SELECT ON V_$LOGFILE TO flinkcdc;
  GRANT SELECT ON V_$ARCHIVED_LOG TO flinkcdc;
  GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkcdc;

修改以下表让其支持增量日志

ALTER TABLE test.table1 SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER TABLE test.table2 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER TABLE test.table3 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

3.Flink集群搭建

版本类型 版本号
项目版本 flink1.14.3、scala2.12、flinkoraclecdc2.3.0
flink集群版本 flink1.14.3
hostname ip 配置
yy1 10.201.1.1 StandaloneSessionClusterEntrypoint、Taskmanager
yy2 10.201.1.2 Taskmanager
yy3 10.201.1.3 Taskmanager

3.1 Flink下载安装并配置
1) 登录linux
2) cd /usr/local/
3) wget https://archive.apache.org/dist/flink/flink-1.14.3/flink-1.14.3-bin-scala_2.12.tgz
4) tar –zxvf flink-1.14.3-bin-scala_2.12.tgz
5) cd flink-1.14.3/conf
6) vi flink-conf.yaml
注意:冒号后面有个空格
jobmanager.rpc.address: yy1

# 这个参数比较重要,这个是总内存
jobmanager.memory.process.size: 10gb

# taskmanager大小
taskmanager.memory.process.size: 3gb

# 打开注释,并修改保存点存储目录
# 配置hdfs目录,一般用于搭建了hadoop集群
#state.savepoint.dir: hdfs://yy1:9000/flink/cdc

#存储目录设为服务器本地
state.checkpoints.dir: file:///bigdata/checkpoints

state.savepoints.dir: file:///bigdata/savepoints

#设置检查点保存的数据 默认是一个,增加下面
#state.checkpoints.num-retained: 3

# 修改slot的个数
taskmanager.numberOfTaskSlots: 3

#如果不想用flink默认目录/temp 可以自己配置如下并打开
# io.tmp.dirs: /data1/flink/tmp
# env.pid.dir: /data1/flink/env
# web.tmpdir: /data1/flink/tmp
#上传的jar包目录,这样不用每次都上传
#web.upload.dir: /data1/flink/jar

7)修改masters和workers 文件
masters内容:
yy1:8081

workers内容:
yy1
yy2
yy3

8)复制到其他节点
scp -rq flink-1.14.3 yy2:/usr/local
scp -rq flink-1.14.3 yy3:/usr/local

9)每个节点上建立flink-1.14.3目录的链接(每个节点操作)
ln -s flink-1.14.3 flink

10)配置flink的环境变量(每个节点操作)
vi /etc/profile
#配置如下

export JAVA_HOME=/usr/local/jdk18
export FLINK_HOME=/usr/local/flink
export JRE_HOME=$JAVA_HOME/jre
export CLASS_PATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin:$FLINK_HOME/bin

11)使其修改生效(每个节点操作)
source /etc/profile
12)在master节点上启动flink集群
start-cluster.sh
13)打开flink任务管理界面

http://10.201.1.1:8081

csdn oracle cdc,java,oracle,数据库,flink

14)在界面提交任务

csdn oracle cdc,java,oracle,数据库,flink
15)效果图
csdn oracle cdc,java,oracle,数据库,flink

4. Flink 提交任务的常用命令

4.1 stantalone模式
flink run –m [ip]:[端口] -p[并行数] -c[main方法所在类的全路径] [jar文件的绝对路径]

flink run -m 10.201.1.1:8090 -p 1 -c com.test.TestStudent /bigdata/testCDC-1.0-SNAPSHOT-jar-with-dependencies.jar

stantalone 模式下savepoint,取消任务的同时savepoint

flink cancel -s 282c334dd9dc9ae04c3d6cbe1bfdf8f2

暂停任务的同时savepoint

flink savepoint 282c334dd9dc9ae04c3d6cbe1bfdf8f2

4.2 flink on yarn模式

flink run -t yarn-per-job -c com.test.TestStudent /bigdata/testCDC-1.0-SNAPSHOT-jar-with-dependencies.jar

Flink on yarn 模式下savepoint

flink savepoint 8f1d21525dc3bebf22f9c3a617326142 hdfs:///flink/cdc -yid application_1657250519562_0007

从保存点恢复
$ bin/flink run -s :savepointPath [:runArgs]

flink run  -s hdfs:///flink/cdc/savepoint-a4f769-58ee3095ee02

5.完成

6.问题汇总

1)报错信息:ERROR: Attempting to operate on hdfs namenode as root
ERROR: but there is no HDFS_NAMENODE_USER defined. Aborting operation.
Starting datanodes
ERROR: Attempting to operate on hdfs datanode as root
ERROR: but there is no HDFS_DATANODE_USER defined. Aborting operation.
Starting secondary namenodes [hadoop]
ERROR: Attempting to operate on hdfs secondarynamenode as root
ERROR: but there is no HDFS_SECONDARYNAMENODE_USER defined. Aborting operation.
2018-07-16 05:45:04,628 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform… using builtin-java classes where applicable
Starting resourcemanager
ERROR: Attempting to operate on yarn resourcemanager as root
ERROR: but there is no YARN_RESOURCEMANAGER_USER defined. Aborting operation.
Starting nodemanagers
ERROR: Attempting to operate on yarn nodemanager as root
ERROR: but there is no YARN_NODEMANAGER_USER defined. Aborting operation.

解决:
vi /etc/profile 加入以下信息,然后source /etc/profile
export HDFS_NAMENODE_USER=root
export HDFS_DATANODE_USER=root
export HDFS_SECONDARYNAMENODE_USER=root
export YARN_RESOURCEMANAGER_USER=root
export YARN_NODEMANAGER_USER=root
export HDFS_JOURNALNODE_USER=root
export HDFS_ZKFC_USER=root
export HADOOP_CLASSPATH=hadoop classpath
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

2)报错信息:java.lang.IllegalStateException: Trying to access closed classloader.
Please check if you store classloaders directly or indirectly in static fields.
If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately,
you can disable this check with the configuration ‘classloader.check-leaked-classloader’.
解决:
修改flink配置文件:vi flink-conf.yaml
增加:classloader.check-leaked-classloader: false

3)File /tmp/logs/root/logs-tfile/application_1656991740104_0001 does not exist.
File /tmp/logs/root/bucket-logs-tfile/0001/application_1656991740104_0001 does not exist.

Can not find any log file matching the pattern: [ALL] for the application: application_1656991740104_0001
Can not find the logs for the application: application_1656991740104_0001 with the appOwner: root

解决:
yarn-site.xml 增加以下

<property>
    <name>yarn.log-aggregation-enable</name>
    <value>true</value>
</property>

4)报错信息:DebeziumException: Supplemental logging not properly configured. Use: ALTER DATABASE ADD SUPPLEMENTAL LOG DATA

解决:
ALTER TABLE 表名 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

5)oracle版本是19c,flink ui界面报错
报错信息:Caused by: java.sql.SQLException: ORA-44609: CONTINOUS_MINE is desupported for use with DBMS_LOGMNR.START_LOGMNR.
ORA-06512: at “SYS.DBMS_LOGMNR”, line 72
解决
注释以下配置
// proper.setProperty(“log.mining.continuous.mine”, “true”);文章来源地址https://www.toymoban.com/news/detail-788773.html

到了这里,关于flinkcdc抽取oracle数据(oracle cdc详细文档)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • Flink CDC-Oracle CDC配置及DataStream API实现代码...可实现监控采集一个数据库的多个表

    使用sysdba角色登录到Oracle数据库 确保Oracle归档日志(Archive Log)已启用 若未启用归档日志, 需运行以下命令启用归档日志 设置归档日志存储大小及位置 设置数据库恢复文件存储区域的大小(如归档重做日志文件、控制文件备份等) 设置恢复文件的实际物理存储路径;scope=spfile参数

    2024年02月05日
    浏览(14)
  • Flink-CDC——MySQL、SqlSqlServer、Oracle、达梦等数据库开启日志方法

    目录 1. 前言 2. 数据源安装与配置 2.1 MySQL 2.1.1 安装 2.1.2 CDC 配置 2.2 Postgresql 2.2.1 安装 2.2.2 CDC 配置 2.3 Oracle 2.3.1 安装 2.3.2 CDC 配置 2.4 SQLServer 2.4.1 安装 2.4.2 CDC 配置 2.5达梦 2.4.1安装 2.4.2CDC配置 3. 验证 3.1 Flink版本与CDC版本的对应关系 3.2 下载相关包 3.3 添加cdc jar 至lib目录 3.4 验

    2024年02月05日
    浏览(67)
  • Flink oracle cdc - Oracle Logminer CDC性能问题

    Flink oracle cdc - Oracle Logminer CDC性能问题

    最近的项目中有用到Flink Oracle CDC实时到监听数据库变化,将变化的数据sink到Kafka。Oracle CDC依赖Debezium组件解析Redo Log与Archive Log,Debezium 通过Oracle 的Logminer解析Log。在我们生产环境遇到 运行一段时间后,再也查询不到数据,直到报miss log file异常(线上环境cron job 将一

    2024年02月08日
    浏览(10)
  • Flink CDC系列之:Oracle CDC 导入 Elasticsearch

    Flink CDC系列之:Oracle CDC Connector 该 Docker Compose 中包含的容器有: Oracle: Oracle 11g, 已经预先创建了 products 和 orders表,并插入了一些数据 Elasticsearch: orders 表将和 products 表进行join,join的结果写入Elasticsearch中 Kibana: 可视化 Elasticsearch 中的数据 在 docker-compose.yml 所在目录下运行如下

    2024年02月12日
    浏览(10)
  • Kettle实战案例:拉取CSDN博客列表数据至Excel文件【详细教程】

    Kettle实战案例:拉取CSDN博客列表数据至Excel文件【详细教程】

    本文详细介绍了使用Kettle工具实现拉取CSDN博客列表数据到Excel文件的实战案例,包括接口调用、数据解析、存储过程和实际操作步骤。适用于数据抓取和处理的初学者和专业人士。

    2024年02月02日
    浏览(8)
  • PDF或图片文档内容识别、关系抽取

    PDF或图片文档内容识别、关系抽取

            自动识别法院和公积金中心的文书(调解书、判决书、裁定书、通知书)扫描件(PDF或图片),获取特定结构的数据,自动对比。抽取结构如: 执行 搭建label studio标记,标记完成后导出JSON。 Label Studio JSON转Doccano JSON 构造数据集 工具,命名为utils.py   训练  模型部

    2024年02月08日
    浏览(22)
  • Flink CDC 3.0 正式发布,详细解读新一代实时数据集成框架

    Flink CDC 3.0 正式发布,详细解读新一代实时数据集成框架

    Flink CDC 是基于数据库日志 CDC(Change Data Capture)技术的实时数据集成框架,支持了全增量一体化、无锁读取、并行读取、表结构变更自动同步、分布式架构等高级特性。配合 Flink 优秀的管道能力和丰富的上下游生态,Flink CDC 可以高效实现海量数据的实时集成。Flink CDC 社区发

    2024年02月04日
    浏览(11)
  • Flink Oracle CDC Connector源码解读

    Flink Oracle CDC Connector源码解读

    flink cdc是在flink的基础上对oracle的数据进行实时采集,底层使用的是debezium框架来实现,debezium使用oracle自带的logminer技术来实现。logminer的采集需要对数据库和采集表添加补充日志,由于oracle18c不支持对数据添加补充日志,所以目前支持的oracle11、12、19三个版本。 flink oracle

    2024年02月02日
    浏览(12)
  • Flink CDC 基于Oracle log archiving 实时同步Oracle表到Mysql

    Flink CDC 基于Oracle log archiving 实时同步Oracle表到Mysql

    环境说明: flink 1.15.2 Oracle 版本:Oracle Database 11g Enterprise Edition Release 11.2.0.1.0 - 64bit Production mysql 版本:5.7 windows11 IDEA 本地运行 先上官网使用说明和案例:Oracle CDC Connector — Flink CDC documentation 1. Oracle 开启 log archiving (1).启用 log archiving         a:以DBA用户连接数据库    

    2024年02月11日
    浏览(24)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包