基于Canal+kafka监听数据库变化的最佳实践

这篇具有很好参考价值的文章主要介绍了基于Canal+kafka监听数据库变化的最佳实践。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1、前言

        工作中,我们很多时候需要根据某些状态的变化更新另一个业务的逻辑,比如订单的生成,成交等,需要更新或者通知其他的业务。我们通常的操作通过业务埋点、接口的调用或者中间件完成。

        但是状态变化的入口比较多的时候,就很容易漏掉某些地方。代码维护起来也比较麻烦。今天介绍阿里出品的 【canal】中间件完成数据库字段的监听。

2、canal的简单介绍

        canal详见介绍件官网:https://github.com/alibaba/canal

基于Canal+kafka监听数据库变化的最佳实践

 

2.1 家族成员:

基于Canal+kafka监听数据库变化的最佳实践

【canal.adapter】:客户端落地的适配以及功能

      基于Canal+kafka监听数据库变化的最佳实践

 【canal.admin】:提供WebUI的管理界面

基于Canal+kafka监听数据库变化的最佳实践

 【canal.deployer】:canal服务

 【canal.example】:客户端提供的demo

2.2 工作原理

基于Canal+kafka监听数据库变化的最佳实践

 3、 实践目标

        使用canal监控mysql数据的变化,将变化的数据推送到kafka,并使用canal-admin动态管理需要监控的数据库表。

 4、工具准备

基于Canal+kafka监听数据库变化的最佳实践

其中kafka是依赖zookeeper的,所以也需要zookeeper。

5、配置并启动kafka

Kafka QuickStart

5.1 修改配置

vim config/server.properties

换成自己的IP

基于Canal+kafka监听数据库变化的最佳实践

替换成自己zookeeper的地址

基于Canal+kafka监听数据库变化的最佳实践

 5.2 启动server

  • 启动zookeeper脚本

# bin/zkServer.sh start

  • 启动kafka脚本

# bin/kafka-server-start.sh -daemon config/server.properties &

  •  查看是否启动成功脚本

# jps -ml

 基于Canal+kafka监听数据库变化的最佳实践

此时kafka启动成功。

5.3 注意事项

值得注意的是官方文档中查看topic的命令,

# bin/kafka-topics.sh --list --zookeeper 192.168.1.110:2181

在心的kafka版本中已经改变,可移步kafka官方文档: Apache Kafka

新版本中使用bootstrap-server,如下

# bin/kafka-topics.sh --list --bootstrap-server localhost:9092 

6、启动canal-admin

6.1 修改配置

改成对应的ip

基于Canal+kafka监听数据库变化的最佳实践

 6.2 执行 conf/canal.manage.sql

         该脚本是canal-admin的管理脚本。

 6.3 启动canal-admin

sh bin/startup.sh

 6.4 查看启动状态

基于Canal+kafka监听数据库变化的最佳实践

 6.5 访问页面

此时代表canal-admin已经启动成功,可以通过 http://127.0.0.1:8089/ 访问,

默认密码:admin/123456

基于Canal+kafka监听数据库变化的最佳实践

7、启动canal-server

7.1 修改配置脚本

# vim conf/canal_local.properties

换成canal-admin的IP

基于Canal+kafka监听数据库变化的最佳实践

7.2 启动服务 指定local

# sh bin/startup.sh local

7.3 查询启动状态

基于Canal+kafka监听数据库变化的最佳实践

8、管理平台配置

8.1 查看canal服务的状态

基于Canal+kafka监听数据库变化的最佳实践

 8.2 配置实例

基于Canal+kafka监听数据库变化的最佳实践

 修改监听的数据库信息:

canal.instance.master.address=192.168.88.111:3306

canal.instance.dbUsername=***
canal.instance.dbPassword=***

#默认监听全库

canal.instance.filter.regex=test.test_user

#配置不可访问的库表

canal.instance.filter.black.regex=

#配置mq的主题/路由

canal.mq.topic=example

 保存即可。

8.3 启动实例

基于Canal+kafka监听数据库变化的最佳实践

9、编写客户端监听kafka的客户端

@Test
public void test01(){
	// 修改打印日志的级别,不然会不停的打印debug日志,影响阅读
	LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
	Logger root = loggerContext.getLogger("root");
	root.setLevel(Level.INFO);

	//设置消费者属性
	Properties properties = new Properties();
	properties.put("bootstrap.servers","192.168.88.111:9092");
	//反序列化器,与生产者的序列化器相对应
	properties.put("key.deserializer", StringDeserializer.class);
	properties.put("value.deserializer", StringDeserializer.class);
	//设置消费者的消费者群组
	properties.put(ConsumerConfig.GROUP_ID_CONFIG,"example");
	// properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 默认值是 lastest
	KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
	try {
		//消费者订阅主题(可以多个,支持正则表达式,进行模糊匹配)
		consumer.subscribe(Collections.singletonList("example"));
		System.out.println("-------------------------消费端准备就绪,等待消息接受------------------------------------");
		//kafka消费者是通过拉取的方式获得服务端消息
		while(true){
			//循环调用poll方法,获取数据。
			ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
			for(ConsumerRecord<String, String> record:records){
				String topic = record.topic();
				String value = record.value();
				if (StringUtils.isNotEmpty(value)) {
					System.out.println(String.format("topic:%s;" + "value:%s", topic,value));
				}
			}
		}
	} finally {
		consumer.close();
	}
}

 10、验证

修改数据库字段,可以接收到修改的信息

基于Canal+kafka监听数据库变化的最佳实践文章来源地址https://www.toymoban.com/news/detail-403225.html

到了这里,关于基于Canal+kafka监听数据库变化的最佳实践的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Oracle运维(数据库、监听、重启)

    shutdown有四个参数,四个参数的含义如下: Normal 需要等待所有的用户断开连接 Immediate 等待用户完成当前的语句 Transactional 等待用户完成当前的事务 Abort 不做任何等待,直接关闭数据库 normal需要在所有连接用户断开后才执行关闭数据库任务,所以有的时候看起来好象命令没

    2024年02月06日
    浏览(26)
  • 数据库监听器停止与启动

    切换至安装oracle数据库的那个用户,一般为oracle(在root下是安装或是启动不了oracle的); # su oracle 然后启动监听器 # lsnrctl start 会看到启动成功的界面; 停止监听器命令. lsnrctl stop 可以修改oracle的ora文件,对数据库进行配置,在opt/oracle/product/9.2.0/network/admin 目录中,修改相应的ora文件即

    2024年02月07日
    浏览(18)
  • Java 实现实时监听MySQL数据库变更MySQLBinListener

    目录 1、导出需要的类和接口 2、 定义 MySQLBinlogListener类 3、私有方法,启动重连定时器 4、完整代码   编写一个MySQL数据库实时变更的监听器。 为什么要编写这个一个监听器:为了实时监测和响应MySQL数据库中的变更事件 实时数据同步:通过监听MySQL Binlog,可以捕获数据库的

    2024年02月16日
    浏览(24)
  • Linux下Oracle的数据库和监听启动关闭命令

    sqlplus /nolog conn /as sysdba connect sys/123456 as sysdba; (123456为用户密码) startup startup命令它有三种情况: 第一种:不带参数,启动数据库实例并打开数据库,以便用户使用数据库,在多数情况下,使用这种方式! 第二种:带nomount参数,只启动数据库实例,但不打开数据库,在你希

    2024年02月04日
    浏览(24)
  • 【Web开发 | Django】数据库分流之道:探索Django多数据库路由最佳实践

    🤵‍♂️ 个人主页: @AI_magician 📡主页地址: 作者简介:CSDN内容合伙人,全栈领域优质创作者。 👨‍💻景愿:旨在于能和更多的热爱计算机的伙伴一起成长!!🐱‍🏍 🙋‍♂️声明:本人目前大学就读于大二,研究兴趣方向人工智能硬件(虽然硬件还没开始玩,但一直

    2024年02月07日
    浏览(26)
  • SpringBoot整合Canal+RabbitMQ监听数据变更

    需求 步骤 环境搭建 整合SpringBoot Canal实现客户端 Canal整合RabbitMQ SpringBoot整合RabbitMQ   我想要在SpringBoot中采用一种与业务代码解耦合的方式,来实现数据的变更记录,记录的内容是新数据,如果是更新操作还得有旧数据内容。 经过调研发现,使用Canal来监听MySQL的binlog变化可

    2024年02月11日
    浏览(15)
  • docker安装达梦数据库最佳实践

    下载地址:产品下载 | 达梦数据库 安装博客地址:安装前准备 | 达梦技术文档 到官网docker部署那一章节,下载镜像tar包,上传到服务器上后,运行下面的命令 docker安装启动脚本: 说明: CASE_SENSITIVE=0 设置大小写不敏感 LENGTH_IN_CHAR=1 VARCHAR 类型对象的长度以字符为单位 UNICO

    2023年04月16日
    浏览(24)
  • openGauss学习笔记-200 openGauss 数据库运维-常见故障定位案例-表文件大小无变化

    200.1 VACUUM FULL一张表后,表文件大小无变化 200.1.1 问题现象 使用VACUUM FULL命令对一张表进行清理,清理完成后表大小和清理前一样大。 200.1.2 原因分析 假定该表的名称为table_name,对于该现象可能有以下两种原因: table_name表本身没有delete过数据,使用VACUUM FULL table_name后无需清

    2024年01月18日
    浏览(23)
  • SQLAlchemy ORM指南:简化数据库操作的最佳实践

    背景: ​ SQLAlchemy是一个数据库的ORM框架,让我们操作数据库的时候不要再用SQL语句了,跟直接操作模型一样。操作十分便捷,其实SQLAlchemy应该是在Flask和Django应用的特别多,而且在flask中已经集成了flask_sqlalchemy ,好像是 SQLAlchemy的作者和 Flask是同一个,背景了解到这里就可

    2024年01月20日
    浏览(34)
  • SpringCloud 整合 Canal+RabbitMQ+Redis 实现数据监听

    Canal 指的是阿里巴巴开源的数据同步工具,用于数据库的实时增量数据订阅和消费。它可以针对 MySQL、MariaDB、Percona、阿里云RDS、Gtid模式下的异构数据同步等情况进行实时增量数据同步。 当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x Canal是如何同步数据库

    2024年02月03日
    浏览(18)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包