restful web服务实现mysql+debezium+kafka对mysql数据库的业务实时变更数据监控

这篇具有很好参考价值的文章主要介绍了restful web服务实现mysql+debezium+kafka对mysql数据库的业务实时变更数据监控。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

要求任务

请写java程序,创建restful web服务,示范利用RestController、Debezium和KafkaListener通过binlog日志监控并获得mysql业务数据库变更,请用具体数据集举例说明以上程序各步骤处理结果

一、了解相关知识

该问题首先要了解restful web服务,Debezium和kafka相关知识。

1.restful web服务

RESTful是HTTP接口调用的一种特殊实现,遵循REST架构风格的规范,能够提供更加标准化、统一化、可读性和易用性的API设计。RESTful调用相对于HTTP接口调用来说,具有更加清晰明了、易于理解和维护的API设计,扩展性和灵活性也更强。

其接口的介绍和实现可参考RESTful接口介绍与实现-CSDN博客

2.Debezium

Debezium 是一个开源的分布式平台,用于监控和捕获数据库的更改事件。它可以连接到各种不同类型的数据库,捕获数据库的更改,并将这些更改以事件流的形式传递给 Apache Kafka 或其他支持的消息队列系统。Debezium 提供了一个强大的变更数据捕获 (CDC) 解决方案,允许您实时地捕获数据库的变更,并将其用于构建实时应用程序、数据湖、分析和报告。

以下是 Debezium 的一些主要特点和概念:

  1. 支持多种数据库: Debezium 支持连接到多种不同类型的数据库,包括 MySQL、PostgreSQL、MongoDB、SQL Server 等。每个数据库都有相应的 Debezium 连接器。

  2. 实时事件捕获: Debezium 使用数据库事务日志 (transaction log) 或类似机制,以实时捕获数据库的更改。这意味着当数据库发生变更时,这些变更将立即被捕获并传递给消费者。

  3. 基于 Apache Kafka 的事件流: Debezium 将捕获的变更以事件流的形式传递给 Apache Kafka,这是一个高性能、持久性的分布式消息队列。这使得可以方便地将数据库变更集成到 Apache Kafka 生态系统中。

  4. 可扩展性: Debezium 可以通过添加新的数据库连接器进行扩展,使其能够连接到支持的数据库类型。这使得它适用于不同的数据库环境。

  5. 事件格式: Debezium 将数据库变更表示为一系列的事件,每个事件包含了一个或多个数据库更改的详细信息。这些事件可以以 JSON 格式进行序列化,易于理解和处理。

  6. 保留数据库历史: Debezium 不仅捕获当前的数据库状态,还保留了变更的历史。这允许您在任意时间点回溯并查看数据库的先前状态。

  7. 用例举例: Debezium 可以用于许多不同的用例,包括实时数据仓库、微服务架构中的事件驱动开发、数据湖、审计跟踪等。

在使用 Debezium 时,您需要配置适当的数据库连接器,以便 Debezium 能够连接到目标数据库,并配置 Kafka 连接信息,以确保数据库变更能够被传递到 Kafka 主题中。然后,您可以使用 Kafka 消费者来订阅这些主题,以实时获取数据库的变更事件。

3.kafka

Apache Kafka 是一个分布式流处理平台,用于构建实时数据管道和流应用程序。它最初是由 LinkedIn 开发,后来成为 Apache 软件基金会的开源项目。Kafka 设计用于高吞吐量、可扩展性、可持久性和容错性,并被广泛用于构建实时数据流平台。

以下是 Kafka 的一些关键概念和特点:

  1. 消息传递系统: Kafka 是一个分布式的消息传递系统,用于可靠地发布和订阅消息。它允许应用程序之间以异步的方式传递数据,而不需要直接连接。

  2. 分布式架构: Kafka 的架构被设计为分布式的,允许它在多个节点上水平扩展。这使得 Kafka 能够处理大量数据并提供高可用性。

  3. 主题(Topics): 消息在 Kafka 中被组织成主题。主题是消息的逻辑容器,用于对消息进行分类。发布者(Producers)将消息发布到特定的主题,而订阅者(Consumers)从主题中读取消息。

  4. 分区(Partitions): 每个主题可以分为一个或多个分区。分区是消息的物理存储单位,并允许 Kafka 水平扩展。每个分区都可以在不同的节点上进行处理。

  5. 生产者(Producers): 生产者负责将消息发布到 Kafka 主题。它们将消息写入特定主题的一个或多个分区。

  6. 消费者(Consumers): 消费者订阅一个或多个主题,并从中读取消息。每个分区只能被一个消费者组中的一个消费者消费,但一个消费者组可以包含多个消费者,以实现水平扩展和容错。

  7. ZooKeeper: Kafka 使用 Apache ZooKeeper 来协调和管理集群中的节点。ZooKeeper用于维护集群的元数据和协助领导选举等任务。

  8. 持久性和日志结构存储: Kafka 使用日志结构存储来保证消息的持久性。消息被追加到不可变的日志中,允许它们在存储中进行有效地管理。

  9. 流处理: Kafka 还包括强大的流处理功能,使其成为构建实时数据流应用程序的理想平台。通过 Kafka Streams API,开发人员可以在 Kafka 上直接构建流处理应用程序。

  10. 社区和生态系统: Kafka 拥有强大的社区支持,并且在其周边有丰富的生态系统,包括各种连接器、工具和扩展。

Kafka 的设计目标之一是能够处理大量数据流,并提供可靠的消息传递和持久性。这使得 Kafka 成为构建大规模实时数据管道和流应用程序的首选选择。

二、debezium与kafka的关系

常见的是,Debezium是通过Apache Kafka连接部署的。Kafka Connect是一个用于实现和操作的框架和运行时 源连接器,它将数据摄取到Kafka 接收连接器,它将数据从Kafka主题写入到其他系统。 下图显示了一个基于Debezium的CDC管道的架构:

restful web服务实现mysql+debezium+kafka对mysql数据库的业务实时变更数据监控,数据库,restful,kafka,java,maven

总得来说,我的理解就是Debezium时连接数据库与kafka的一个工具,它会实时监控数据库的binlog文件并将数据库变更的信息传输kafka相应主题上,而kafka就相当于一个数据流的管道,它主要有主题和消费者,消费者就是从主题中读取消息的功能,所以debezium是基于kafka实现该功能的一个插件。

三、环境及软件搭建

1.kafka安装及测试

可参考Windows安装启动Kafka,创建测试实例,关闭kafka_kafka本地测试启动-CSDN博客

2.debezium安装步骤:

(1)第一步就是版本匹配问题,一定要根据不同的jdk版本配备相应版本的debezium,然后选择相应数据库的debezium下载即可

查询自己电脑jdk版本可以cmd在终端输入:java -version查看jdk版本

restful web服务实现mysql+debezium+kafka对mysql数据库的业务实时变更数据监控,数据库,restful,kafka,java,maven

插件下载地址:Debezium Releases Overview

restful web服务实现mysql+debezium+kafka对mysql数据库的业务实时变更数据监控,数据库,restful,kafka,java,maven

restful web服务实现mysql+debezium+kafka对mysql数据库的业务实时变更数据监控,数据库,restful,kafka,java,maven

由于我的jdk为8,所以选择1.5.4版本,然后在kafka目录下新建文件夹名称为plugins,将debezium压缩包解压到此文件夹。

restful web服务实现mysql+debezium+kafka对mysql数据库的业务实时变更数据监控,数据库,restful,kafka,java,maven

然后修改一些kafka的config中distributed.propertites文档中的一些内容,其中

(1)bootstrap.servers=localhost:9092

这个修改为自己安装kafka的端口号即可,默认端口号为9092

(2)group.id=connect-mysqltest

这个是组名,可以自己任意设定

(3)key.converter.schemas.enable=false value.converter.schemas.enable=false

这两处将true该为false即可

(4)listeners=HTTP://:8083

这个是zookeeper接口号

(5)

restful web服务实现mysql+debezium+kafka对mysql数据库的业务实时变更数据监控,数据库,restful,kafka,java,maven

修改地址为自己安装的debezium地址即可

3.测试debezium

首先打开数据库,终端先依次启动zookeeper与kafka,然后启动debezium,其中

注意要进入kafka文件家内启动一下命令:

restful web服务实现mysql+debezium+kafka对mysql数据库的业务实时变更数据监控,数据库,restful,kafka,java,maven

(1)windows终端启动zookper命令: bin\windows\zookeeper-server-start.bat config\zookeeper.properties

restful web服务实现mysql+debezium+kafka对mysql数据库的业务实时变更数据监控,数据库,restful,kafka,java,maven

表示启动zookeeper启动成功。

(2)windows终端启动kafka命令:bin\windows\kafka-server-start.bat config\server.properties

restful web服务实现mysql+debezium+kafka对mysql数据库的业务实时变更数据监控,数据库,restful,kafka,java,maven

表示kafka服务器启动成功。

(3)windows终端启动debezium命令:bin\windows\connect-distributed.bat config\connect-distributed.properties

restful web服务实现mysql+debezium+kafka对mysql数据库的业务实时变更数据监控,数据库,restful,kafka,java,maven

debezium启动成功。

想要验证自己的debezium插件路径是否修改正确,可以使用postman软件发送http://localhost:8083/connector-plugins出现红框中内容就表示路径修改正确。或者在终端利用curl命令也可以查询,该命令为curl -H "Content-Type: application/json" http://localhost:8083/connector-plugins能返回红框中内容则表示修改正确。

restful web服务实现mysql+debezium+kafka对mysql数据库的业务实时变更数据监控,数据库,restful,kafka,java,maven

4.配置debezium服务连接到mysql

(1)想要利用debezium实现对mysql的具体数据库表的业务监控,就需要对debezium服务器进行配置,其中有两种方法可以采用,第一种就是利用postman软件向接受端口发送更新请求,返回没有错误就表示对debezium服务器配置成功。

restful web服务实现mysql+debezium+kafka对mysql数据库的业务实时变更数据监控,数据库,restful,kafka,java,maven

第二种就是在终端利用curl命令对其进行服务器配置,输入curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{"name":"mysql-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","database.hostname":"localhost","database.port":"3306","database.user":"root","database.password":"123456","database.server.id":"1","database.server.name":"my-app-connector","table.whitelist":"test01.user","database.history.kafka.bootstrap.servers":"localhost:9092","database.history.kafka.topic":"schema-changes-topic"}}'不返回任何错误即可。

配置完成就可以在终端利用curl -H "Content-Type: application/json" http://localhost:8083/connectors/查询连接器,能查到也表明配置成功!

restful web服务实现mysql+debezium+kafka对mysql数据库的业务实时变更数据监控,数据库,restful,kafka,java,maven

(2)配置文件解析

{

"name": "my-app-connector",//指定Connector的名称为mysql-connector。

"config": {//包含Connector的具体配置信息。

"connector.class": "io.debezium.connector.mysql.MySqlConnector",//不用修改

"database.hostname": "localhost",//指定MySQL数据库的主机名

"database.port": "3306", //指定MySQL数据库的端口号

"database.user": "root",//指定连接数据库所使用的用户名

"database.password": "123456",//指定连接数据库所使用的密码

"database.server.id": "1",//指定用于标识MySQL服务器的唯一ID,随便写个数字

"database.server.name": "my-app-connector",//指定在Debezium中用于标识此MySQL服务器的名称

"database.include.list": "test01",//指定要捕获变更事件的表

"database.serverTimezone":"UTC",

"key.deserializer.encoding":"UTF8",

"value.deserializer.encoding":"UTF8",

"key.serializer.encoding":"UTF8",

"value.serializer.encoding":"UTF8",

"database.history.kafka.bootstrap.servers": "localhost:9092",//用于保存数据库 schema 变更的历史记录

"database.history.kafka.topic": "schema-changes-topic"//用于保存数据库 schema 变更的历史记录。

}

}

5.实例测试

restful web服务实现mysql+debezium+kafka对mysql数据库的业务实时变更数据监控,数据库,restful,kafka,java,maven

首先创建一个消费者来提取kafka对应主题中的数据,该命令为:bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic my-app-connector.test01.user --group my-listener,这个主题就是前面我们配置debezium时的主题,此时修改数据库中一个用户id由9变为11,在终端显示先删除一个用户id为9的用户,在创建一个用户id为11的用户。其中“op”表示当前数据库进行的操作。

四、利用java写restful web服务

到前面一步我们已经完成了debezium与kafka的安装测试,已经在终端可以实时对数据库业务进行监控输出。创建一个restful的java代码。

这是代码结构:

restful web服务实现mysql+debezium+kafka对mysql数据库的业务实时变更数据监控,数据库,restful,kafka,java,maven

//application.properties文件中主要是一些端口的配置及数据库连接的配置,前面已经讲过
//application.properties
# ?????
spring.datasource.url=jdbc:mysql://localhost:3306/test01
spring.datasource.username=root
spring.datasource.password=123456
spring.kafka.consumer.group-id=my-listener
server.port=8081
​
​
# Debezium ??
debezium.connector.class=io.debezium.connector.mysql.MySqlConnector
debezium.database.hostname=localhost
debezium.database.port=3306
debezium.database.user=root
debezium.database.password=123456
debezium.database.server.id=1
debezium.database.server.name=my-app-connector
debezium.database.include.list=test01
debezium.database.serverTimezone=UTC
debezium.key.deserializer.encoding=UTF8
debezium.value.deserializer.encoding=UTF8
debezium.key.serializer.encoding=UTF8
debezium.value.serializer.encoding=UTF8
debezium.database.history.kafka.bootstrap.servers=localhost:9092
debezium.database.history.kafka.topic=schema-changes-topic
​
# Kafka ??
spring.kafka.bootstrap-servers=localhost:9092
//启动类Application
package com.example.demo;
​
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
​
@SpringBootApplication
public class Application {
​
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}
//控制类主要实现restful服务的功能
package com.example.demo.controller;
​
import com.example.demo.service.KafkaConsumerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
​
@RestController
@RequestMapping("/api")
public class KafkaRestController {
​
    @Autowired
    private KafkaConsumerService kafkaConsumerService;
​
    @GetMapping("/latestMessage")
    public String getLatestMessage() {
​
        String latestMessage = kafkaConsumerService.getLatestMessage();
        System.out.println("成功");
        System.out.println("Retrieving latest message: " + latestMessage);
        return latestMessage;
    }
    @KafkaListener(topics = "my-app-connector.test01.user", groupId = "my-listener")
    public void consumeMessage(String message) {
        kafkaConsumerService.handleMessage(message);
    }
    // 添加处理根路径的方法
    @GetMapping
    public String home() {
        return "Welcome to KafkaRestController!";
    }
}
//服务类:主要获取消费者数据信息的两个函数
package com.example.demo.service;
​
import org.springframework.stereotype.Service;
​
@Service
public class KafkaConsumerService {
​
    private String latestMessage;
​
​
    public void handleMessage(String message) {
        // Handle your Kafka message here
        this.latestMessage = message;
        System.out.println("Received message from Kafka: " + message);
    }
​
    public String getLatestMessage() {
        return latestMessage;
    }
}
//用户类
package com.example.demo.entity;
​
public class User {
    private int id;
    private String username;
    private String email;
​
    // Getter and Setter methods
}
//pom.xml文件主要是一些项目所用到的依赖声明
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
​
    <groupId>org.example</groupId>
    <artifactId>Kafka-mysql</artifactId>
    <version>1.0-SNAPSHOT</version>
​
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <dependencies>
        <!-- Spring Boot Starter Web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.5.5</version> <!-- 使用你需要的版本 -->
        </dependency>
​
        <!-- Debezium Connector for MySQL -->
        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-connector-mysql</artifactId>
            <version>1.9.0.Final</version> <!-- 使用最新版本 -->
        </dependency>
​
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.13.0</version> <!-- 替换为最新的版本号 -->
        </dependency>
​
​
​
        <!-- Spring Kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.7.4</version> <!-- 使用你想要的版本号 -->
        </dependency>
​
    </dependencies>
​
​
​
</project>

至此,我们就已经完成了请写java程序,创建restful web服务,示范利用RestController、Debezium和KafkaListener通过binlog日志监控并获得mysql业务数据库变更,请用具体数据集举例说明以上程序各步骤处理结果,可以通过postman软件访问http://localhost:8081/api/latestMessage地址查看数据库变更信息

restful web服务实现mysql+debezium+kafka对mysql数据库的业务实时变更数据监控,数据库,restful,kafka,java,maven文章来源地址https://www.toymoban.com/news/detail-810424.html

到了这里,关于restful web服务实现mysql+debezium+kafka对mysql数据库的业务实时变更数据监控的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【REST2SQL】07 GO 操作 Mysql 数据库

    【REST2SQL】01RDB关系型数据库REST初设计 【REST2SQL】02 GO连接Oracle数据库 【REST2SQL】03 GO读取JSON文件 【REST2SQL】04 REST2SQL第一版Oracle版实现 【REST2SQL】05 GO 操作 达梦 数据库 【REST2SQL】06 GO 跨包接口重构代码 MySQL是一个关系型数据库管理系统,由瑞典MySQL AB 公司开发,属于 Oracle旗

    2024年01月22日
    浏览(28)
  • Spring MVC实现RESTful API,打造高效便捷的Web服务

    REST(Representational State Transfer)是一种架构风格一种设计风格而非标准。它并不是新技术是用于Web服务端的一种设计风格。通过设计好的URI和HTTP方法可以实现对资源的操作。RESTful API是符合REST风格的API,具有良好的可读性、可扩展性、可维护性、可测试性和可移植性。 Sprin

    2024年02月05日
    浏览(17)
  • 十万字图文详解mysql、redis、kafka、elasticsearch(ES)多源异构不同种类数据库集成、数据共享、数据同步、不同中间件技术实现与方案,如何构建数据仓库、数据湖、数仓一体化?

    数据库大数据量、高并发、高可用解决方案,十万字图文详解mysql、redis、kafka、elasticsearch(ES)多源异构不同种类数据库集成、数据共享、数据同步、不同中间件技术实现与方案,如何构建数据仓库、数据湖、数仓一体化?Delta Lake、Apache Hudi和Apache Iceberg数仓一体化技术架构

    2024年02月07日
    浏览(15)
  • MySQL FlinkCDC 通过Kafka实时同步到ClickHouse(自定义Debezium格式支持增加删除修改)

    MySQL FlinkCDC 通过Kafka实时同步到ClickHouse(自定义Debezium格式支持增加删除修改) 把MySQL多库多表的数据通过FlinkCDC DataStream的方式实时同步到同一个Kafka的Topic中,然后下游再写Flink SQL拆分把数据写入到ClickHouse,FlinkCDC DataStream通过自定义Debezium格式的序列化器,除了增加,还能进行

    2024年02月15日
    浏览(16)
  • Debezium系列之:详细介绍Debezium2.X版本导出Sqlserver数据库Debezium JMX指标的方法

    Debezium2.X版本sqlserver数据库JMX指标导出的方式与Debezium1.X版本不同 需要根据Debezium2.X版本sqlserver数据库jmx的格式,导出sqlserver数据库的JMX指标

    2024年02月01日
    浏览(19)
  • 【flume实时采集mysql数据库的数据到kafka】

    最近做了flume实时采集mysql数据到kafka的实验,做个笔记,防止忘记 !!!建议从头看到尾,因为一些简单的东西我在前面提了,后面没提。 Kafka搭建:https://blog.csdn.net/cjwfinal/article/details/120803013 flume搭建:https://blog.csdn.net/cjwfinal/article/details/120441503?spm=1001.2014.3001.5502 编写配置

    2024年02月03日
    浏览(23)
  • Android:安卓开发采用Volley网络框架+MySQL数据库,实现从服务器获取数据并展示完成记单词APP

    实现功能:设计一个记单词APP。服务器采用Tomcat,数据库采用Mysql。实现用户的注册登录功能以及单词的增删改查。 指标要求:实现UI布局;将系统数据保存到Mysql数据库中,并采用Volley网络框架实现从服务器获取数据并展示。 步骤1:搭建开发环境。 步骤2:准备资源。 步骤

    2024年02月13日
    浏览(20)
  • Web安全——数据库mysql学习

    1、Web安全——HTML基础 2、Web安全——DIV CSS基础 3、Web安全——JavaScript基础 4、Web安全——PHP基础 5、Web安全——JavaScript基础(加入案例) 6、靶场搭建——搭建pikachu靶场 D:phpStudyMySQLbin 登陆: 输入 mysql -uroot -p -P3306 -h127.0.0.1 退出的三种方法 语法使用: 数据库的登陆: 这里

    2024年02月12日
    浏览(15)
  • Debezium vs OGG vs Tapdata:如何实时同步 Oracle 数据到 Kafka 消息队列?

    随着信息时代的蓬勃发展,企业对实时数据处理的需求逐渐成为推动业务创新和发展的重要驱动力。在这个快速变化的环境中,许多企业选择将 Oracle 数据库同步到 Kafka,以满足日益增长的实时数据处理需求。本文将深入探讨这一趋势的背后原因,并通过一个真实的客户案例

    2024年04月10日
    浏览(21)
  • Spring——RESTful Web服务

    本节我们将开发一个简单的 RESTful Web 服务。 RESTful Web 服务与传统的 MVC 开发一个关键区别是返回给客户端的内容的创建方式: 传统的 MVC 模式开发会直接返回给客户端一个视图,但是 RESTful Web 服务一般会将返回的数据以 JSON 的形式返回,这也就是现在所推崇的前后端分离开

    2024年02月10日
    浏览(14)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包