Flink系列之:JDBC SQL 连接器

这篇具有很好参考价值的文章主要介绍了Flink系列之:JDBC SQL 连接器。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、JDBC SQL 连接器

  • Scan Source: Bounded
  • Lookup Source: Sync Mode
  • Sink: Batch
  • Sink: Streaming Append & Upsert Mode

JDBC 连接器允许使用 JDBC 驱动向任意类型的关系型数据库读取或者写入数据。本文档描述了针对关系型数据库如何通过建立 JDBC 连接器来执行 SQL 查询。

如果在 DDL 中定义了主键,JDBC sink 将以 upsert 模式与外部系统交换 UPDATE/DELETE 消息;否则,它将以 append 模式与外部系统交换消息且不支持消费 UPDATE/DELETE 消息。

二、依赖

在连接到具体数据库时,也需要对应的驱动依赖,目前支持的驱动如下:

Driver Group Id Artifact Id JAR
MySQL mysql mysql-connector-java 下载
Oracle com.oracle.database.jdbc ojdbc8 下载
PostgreSQL org.postgresql postgresql 下载
Derby org.apache.derby derby 下载
SQL Server com.microsoft.sqlserver mssql-jdbc 下载

三、创建 JDBC 表

JDBC table 可以按如下定义:

-- 在 Flink SQL 中注册一张 MySQL 表 'users'
CREATE TABLE MyUserTable (
  id BIGINT,
  name STRING,
  age INT,
  status BOOLEAN,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/mydatabase',
   'table-name' = 'users'
);

-- 从另一张表 "T" 将数据写入到 JDBC 表中
INSERT INTO MyUserTable
SELECT id, name, age, status FROM T;

-- 查看 JDBC 表中的数据
SELECT id, name, age, status FROM MyUserTable;

-- JDBC 表在时态表关联中作为维表
SELECT * FROM myTopic
LEFT JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctime
ON myTopic.key = MyUserTable.id;

这段代码是在Flink SQL中注册一个MySQL表"users",表结构包括id、name、age、status等字段,并设置id作为主键。

接下来通过INSERT INTO语句,从名为"T"的表中将数据写入到JDBC表"MyUserTable"中。INSERT INTO语句用于将"T"表中的id、name、age、status字段的值插入到"MyUserTable"表中。

然后使用SELECT语句查看JDBC表中的数据,返回id、name、age、status字段的值。

最后一段语句是将JDBC表"MyUserTable"作为维表,与实时流数据源"myTopic"进行时态表关联。通过FOR SYSTEM_TIME AS OF子句指定以"myTopic.proctime"字段的时间为基准,将"myTopic"的key字段与"MyUserTable"的id字段进行关联查询,返回所有字段的值。

通过这段代码,可以实现将数据从一个表写入到MySQL表中,并在流处理中进行关联查询,从而实现时态表的操作。

四、连接器参数

参数 是否必填 默认值 类型 描述
connector 必填 (none) String 指定使用什么类型的连接器,这里应该是’jdbc’。
url 必填 (none) String JDBC 数据库 url。
table-name 必填 (none) String 连接到 JDBC 表的名称。
driver 可选 (none) String 用于连接到此 URL 的 JDBC 驱动类名,如果不设置,将自动从 URL 中推导。
username 可选 (none) String JDBC 用户名。如果指定了 ‘username’ 和 ‘password’ 中的任一参数,则两者必须都被指定。
password 可选 (none) String JDBC 密码。
connection.max-retry-timeout 可选 60s Duration 最大重试超时时间,以秒为单位且不应该小于 1 秒。
scan.partition.column 可选 (none) String 用于将输入进行分区的列名。
scan.partition.num 可选 (none) Integer 分区数。
scan.partition.lower-bound 可选 (none) Integer 第一个分区的最小值。
scan.partition.upper-bound 可选 (none) Integer 最后一个分区的最大值。
scan.fetch-size 可选 0 Integer 每次循环读取时应该从数据库中获取的行数。如果指定的值为 ‘0’,则该配置项会被忽略。
scan.auto-commit 可选 true Boolean 在 JDBC 驱动程序上设置 auto-commit 标志, 它决定了每个语句是否在事务中自动提交。有些 JDBC 驱动程序,特别是 Postgres,可能需要将此设置为 false 以便流化结果。
lookup.cache 可选 (none) 枚举类型,可选值: NONE, PARTIAL 维表的缓存策略。 目前支持 NONE(不缓存)和 PARTIAL(只在外部数据库中查找数据时缓存)。
lookup.cache.max-rows 可选 (none) Integer 维表缓存的最大行数,若超过该值,则最老的行记录将会过期。 使用该配置时 “lookup.cache” 必须设置为 "PARTIAL”。
lookup.partial-cache.expire-after-write 可选 (none) Duration 在记录写入缓存后该记录的最大保留时间。 使用该配置时 “lookup.cache” 必须设置为 "PARTIAL”。
lookup.partial-cache.expire-after-access 可选 (none) Duration 在缓存中的记录被访问后该记录的最大保留时间。 使用该配置时 “lookup.cache” 必须设置为 "PARTIAL”。
lookup.partial-cache.cache-missing-key 可选 (none) Boolean 是否缓存维表中不存在的键,默认为true。 使用该配置时 “lookup.cache” 必须设置为 "PARTIAL”。
lookup.max-retries 可选 3 Integer 查询数据库失败的最大重试次数。
sink.buffer-flush.max-rows 可选 100 Integer flush 前缓存记录的最大值,可以设置为 ‘0’ 来禁用它。
sink.buffer-flush.interval 可选 1s Duration flush 间隔时间,超过该时间后异步线程将 flush 数据。可以设置为 ‘0’ 来禁用它。注意, 为了完全异步地处理缓存的 flush 事件,可以将 ‘sink.buffer-flush.max-rows’ 设置为 ‘0’ 并配置适当的 flush 时间间隔。
sink.max-retries 可选 3 Integer 写入记录到数据库失败后的最大重试次数。
sink.parallelism 可选 (none) Integer 用于定义 JDBC sink 算子的并行度。默认情况下,并行度是由框架决定:使用与上游链式算子相同的并行度。

五、键处理

当写入数据到外部数据库时,Flink 会使用 DDL 中定义的主键。如果定义了主键,则连接器将以 upsert 模式工作,否则连接器将以 append 模式工作。

在 upsert 模式下,Flink 将根据主键判断插入新行或者更新已存在的行,这种方式可以确保幂等性。为了确保输出结果是符合预期的,推荐为表定义主键并且确保主键是底层数据库中表的唯一键或主键。在 append 模式下,Flink 会把所有记录解释为 INSERT 消息,如果违反了底层数据库中主键或者唯一约束,INSERT 插入可能会失败。

六、分区扫描

为了在并行 Source task 实例中加速读取数据,Flink 为 JDBC table 提供了分区扫描的特性。

如果下述分区扫描参数中的任一项被指定,则下述所有的分区扫描参数必须都被指定。这些参数描述了在多个 task 并行读取数据时如何对表进行分区。 scan.partition.column 必须是相关表中的数字、日期或时间戳列。注意,scan.partition.lower-bound 和 scan.partition.upper-bound 用于决定分区的起始位置和过滤表中的数据。如果是批处理作业,也可以在提交 flink 作业之前获取最大值和最小值。

  • scan.partition.column:输入用于进行分区的列名。
  • scan.partition.num:分区数。
  • scan.partition.lower-bound:第一个分区的最小值。
  • scan.partition.upper-bound:最后一个分区的最大值。

七、Lookup Cache

JDBC 连接器可以用在时态表关联中作为一个可 lookup 的 source (又称为维表),当前只支持同步的查找模式。

默认情况下,lookup cache 是未启用的,你可以将 lookup.cache 设置为 PARTIAL 参数来启用。

lookup cache 的主要目的是用于提高时态表关联 JDBC 连接器的性能。默认情况下,lookup cache 不开启,所以所有请求都会发送到外部数据库。 当 lookup cache 被启用时,每个进程(即 TaskManager)将维护一个缓存。Flink 将优先查找缓存,只有当缓存未查找到时才向外部数据库发送请求,并使用返回的数据更新缓存。 当缓存命中最大缓存行 lookup.partial-cache.max-rows 或当行超过 lookup.partial-cache.expire-after-write 或 lookup.partial-cache.expire-after-access 指定的最大存活时间时,缓存中的行将被设置为已过期。 缓存中的记录可能不是最新的,用户可以将缓存记录超时设置为一个更小的值以获得更好的刷新数据,但这可能会增加发送到数据库的请求数。所以要做好吞吐量和正确性之间的平衡。

默认情况下,flink 会缓存主键的空查询结果,你可以通过将 lookup.partial-cache.cache-missing-key 设置为 false 来切换行为。

八、幂等写入

如果在 DDL 中定义了主键,JDBC sink 将使用 upsert 语义而不是普通的 INSERT 语句。upsert 语义指的是如果底层数据库中存在违反唯一性约束,则原子地添加新行或更新现有行,这种方式确保了幂等性。

如果出现故障,Flink 作业会从上次成功的 checkpoint 恢复并重新处理,这可能导致在恢复过程中重复处理消息。强烈推荐使用 upsert 模式,因为如果需要重复处理记录,它有助于避免违反数据库主键约束和产生重复数据。

除了故障恢复场景外,数据源(kafka topic)也可能随着时间的推移自然地包含多个具有相同主键的记录,这使得 upsert 模式是用户期待的。

由于 upsert 没有标准的语法,因此下表描述了不同数据库的 DML 语法:

数据库 更新语法
MySQL INSERT … ON DUPLICATE KEY UPDATE …
Oracle MERGE INTO … USING (…) ON (…) WHEN MATCHED THEN UPDATE SET (…) WHEN NOT MATCHED THEN INSERT (…) VALUES (…)
PostgreSQL INSERT … ON CONFLICT … DO UPDATE SET …
MS SQL Server MERGE INTO … USING (…) ON (…) WHEN MATCHED THEN UPDATE SET (…) WHEN NOT MATCHED THEN INSERT (…) VALUES (…)

九、JDBC Catalog

JdbcCatalog 允许用户通过 JDBC 协议将 Flink 连接到关系数据库。

目前,JDBC Catalog 有两个实现,即 Postgres Catalog 和 MySQL Catalog。目前支持如下 catalog 方法。其他方法目前尚不支持。

// Postgres Catalog & MySQL Catalog 支持的方法
databaseExists(String databaseName);
listDatabases();
getDatabase(String databaseName);
listTables(String databaseName);
getTable(ObjectPath tablePath);
tableExists(ObjectPath tablePath);

其他的 Catalog 方法现在尚不支持。

十、JDBC Catalog 的使用

本小节主要描述如果创建并使用 Postgres Catalog 或 MySQL Catalog。 请参阅 Dependencies 部分了解如何配置 JDBC 连接器和相应的驱动。

JDBC catalog 支持以下参数:

  • name:必填,catalog 的名称。
  • default-database:必填,默认要连接的数据库。
  • username:必填,Postgres/MySQL 账户的用户名。
  • password:必填,账户的密码。
  • base-url:必填,(不应该包含数据库名)
    • 对于 Postgres Catalog base-url 应为 “jdbc:postgresql://:” 的格式。
    • 对于 MySQL Catalog base-url 应为 “jdbc:mysql://:” 的格式。

SQL:

CREATE CATALOG my_catalog WITH(
    'type' = 'jdbc',
    'default-database' = '...',
    'username' = '...',
    'password' = '...',
    'base-url' = '...'
);

USE CATALOG my_catalog;

Java:

EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tableEnv = TableEnvironment.create(settings);

String name            = "my_catalog";
String defaultDatabase = "mydb";
String username        = "...";
String password        = "...";
String baseUrl         = "..."

JdbcCatalog catalog = new JdbcCatalog(name, defaultDatabase, username, password, baseUrl);
tableEnv.registerCatalog("my_catalog", catalog);

// 设置 JdbcCatalog 为会话的当前 catalog
tableEnv.useCatalog("my_catalog");

Scala:

val settings = EnvironmentSettings.inStreamingMode()
val tableEnv = TableEnvironment.create(settings)

val name            = "my_catalog"
val defaultDatabase = "mydb"
val username        = "..."
val password        = "..."
val baseUrl         = "..."

val catalog = new JdbcCatalog(name, defaultDatabase, username, password, baseUrl)
tableEnv.registerCatalog("my_catalog", catalog)

// 设置 JdbcCatalog 为会话的当前 catalog
tableEnv.useCatalog("my_catalog")

Python:

from pyflink.table.catalog import JdbcCatalog

environment_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(environment_settings)

name = "my_catalog"
default_database = "mydb"
username = "..."
password = "..."
base_url = "..."

catalog = JdbcCatalog(name, default_database, username, password, base_url)
t_env.register_catalog("my_catalog", catalog)

# 设置 JdbcCatalog 为会话的当前 catalog
t_env.use_catalog("my_catalog")

YAML

execution:
    ...
    current-catalog: my_catalog  # 设置目标 JdbcCatalog 为会话的当前 catalog
    current-database: mydb

catalogs:
   - name: my_catalog
     type: jdbc
     default-database: mydb
     username: ...
     password: ...
     base-url: ...

十一、JDBC Catalog for PostgreSQL

PostgreSQL 元空间映射

除了数据库之外,postgreSQL 还有一个额外的命名空间 schema。一个 Postgres 实例可以拥有多个数据库,每个数据库可以拥有多个 schema,其中一个 schema 默认名为 “public”,每个 schema 可以包含多张表。 在 Flink 中,当查询由 Postgres catalog 注册的表时,用户可以使用 schema_name.table_name 或只有 table_name,其中 schema_name 是可选的,默认值为 “public”。

因此,Flink Catalog 和 Postgres 之间的元空间映射如下:

Flink 目录元空间结构 Postgres 元空间结构
catalog name (defined in Flink only) N/A
database name database name
table name [schema_name.]table_name

Flink 中的 Postgres 表的完整路径应该是 “..<schema.table>”。如果指定了 schema,请注意需要转义 <schema.table>。

这里提供了一些访问 Postgres 表的例子:

-- 扫描 'public' schema(即默认 schema)中的 'test_table' 表,schema 名称可以省略
SELECT * FROM mypg.mydb.test_table;
SELECT * FROM mydb.test_table;
SELECT * FROM test_table;

-- 扫描 'custom_schema' schema 中的 'test_table2' 表,
-- 自定义 schema 不能省略,并且必须与表一起转义。
SELECT * FROM mypg.mydb.`custom_schema.test_table2`
SELECT * FROM mydb.`custom_schema.test_table2`;
SELECT * FROM `custom_schema.test_table2`;

十二、JDBC Catalog for MySQL

MySQL 元空间映射

MySQL 实例中的数据库与 MySQL Catalog 注册的 catalog 下的数据库处于同一个映射层级。一个 MySQL 实例可以拥有多个数据库,每个数据库可以包含多张表。 在 Flink 中,当查询由 MySQL catalog 注册的表时,用户可以使用 database.table_name 或只使用 table_name,其中 database 是可选的,默认值为创建 MySQL Catalog 时指定的默认数据库。

因此,Flink Catalog 和 MySQL catalog 之间的元空间映射如下:

Flink 目录元空间结构 Mysql 元空间结构
catalog name (defined in Flink only) N/A
database name database name
table name table_name

Flink 中的 MySQL 表的完整路径应该是 “<catalog>.<db>.<table>”。

这里提供了一些访问 MySQL 表的例子:

-- 扫描 默认数据库中的 'test_table' 表
SELECT * FROM mysql_catalog.mydb.test_table;
SELECT * FROM mydb.test_table;
SELECT * FROM test_table;

-- 扫描 'given_database' 数据库中的 'test_table2' 表,
SELECT * FROM mysql_catalog.given_database.test_table2;
SELECT * FROM given_database.test_table2;

十三、数据类型映射

Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、Oracle、PostgreSQL、Derby 等。其中,Derby 通常是用于测试目的。下表列出了从关系数据库数据类型到 Flink SQL 数据类型的类型映射,映射表可以使得在 Flink 中定义 JDBC 表更加简单。文章来源地址https://www.toymoban.com/news/detail-786202.html

MySQL type Oracle type PostgreSQL type SQL Server type Flink SQL type
TINYINT TINYINT TINYINT
SMALLINT, TINYINT UNSIGNED SMALLINT, INT2, SMALLSERIAL, SERIAL2 SMALLINT SMALLINT
INT, MEDIUMINT, SMALLINT UNSIGNED INTEGER, SERIAL INT INT
BIGINT, INT UNSIGNED BIGINT, BIGSERIAL BIGINT BIGINT
BIGINT UNSIGNED DECIMAL(20, 0)
BIGINT BIGINT BIGINT BIGINT
FLOAT BINARY_FLOAT REAL, FLOAT4 REAL FLOAT
DOUBLE, DOUBLE PRECISION BINARY_DOUBLE FLOAT8, DOUBLE PRECISION FLOAT DOUBLE
NUMERIC(p, s), DECIMAL(p, s) SMALLINT, FLOAT(s), DOUBLE PRECISION, REAL, NUMBER(p, s) NUMERIC(p, s), DECIMAL(p, s) DECIMAL(p, s) DECIMAL(p, s)
BOOLEAN, TINYINT(1) BOOLEAN BIT BOOLEAN
DATE DATE DATE DATE DATE
TIME [§] DATE TIME [§] [WITHOUT TIMEZONE] TIME(0) TIME [§] [WITHOUT TIMEZONE]
DATETIME [§] TIMESTAMP [§] [WITHOUT TIMEZONE] TIMESTAMP [§] [WITHOUT TIMEZONE] DATETIME, DATETIME2 TIMESTAMP [§] [WITHOUT TIMEZONE]
CHAR(n), VARCHAR(n), TEXT CHAR(n), VARCHAR(n), CLOB CHAR(n), CHARACTER(n), VARCHAR(n), CHARACTER VARYING(n), TEXT CHAR(n), NCHAR(n), VARCHAR(n), NVARCHAR(n), TEXT, NTEXT STRING
BINARY, VARBINARY, BLOB RAW(s), BLOB BYTEA BINARY(n), VARBINARY(n) BYTES
ARRAY ARRAY

到了这里,关于Flink系列之:JDBC SQL 连接器的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • 【flink sql】kafka连接器

    Kafka 连接器提供从 Kafka topic 中消费和写入数据的能力。 前面已经介绍了flink sql创建表的语法及说明:【flink sql】创建表 这篇博客聊聊怎么通过flink sql连接kafka 以下的连接器元数据可以在表定义中通过元数据列的形式获取。 R/W 列定义了一个元数据是可读的(R)还是可写的(

    2024年02月08日
    浏览(13)
  • Flink系列之:Flink CDC深入了解MySQL CDC连接器

    Flink系列之:Flink CDC深入了解MySQL CDC连接器

    增量快照读取是一种读取表快照的新机制。与旧的快照机制相比,增量快照具有许多优点,包括: (1)在快照读取期间,Source 支持并发读取 (2)在快照读取期间,Source 支持进行 chunk 粒度的 checkpoint (3)在快照读取之前,Source 不需要数据库锁权限。 如果希望 source 并行运

    2024年02月02日
    浏览(12)
  • 【Flink实战】Flink hint更灵活、更细粒度的设置Flink sql行为与简化hive连接器参数设置

    SQL 提示(SQL Hints)是和 SQL 语句一起使用来改变执行计划的。本章介绍如何使用 SQL 提示来实现各种干预。 SQL 提示一般可以用于以下: 增强 planner:没有完美的 planner, SQL 提示让用户更好地控制执行; 增加元数据(或者统计信息):如\\\"已扫描的表索引\\\"和\\\"一些混洗键(shu

    2024年04月25日
    浏览(12)
  • 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例(1)

    16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例(1)

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月12日
    浏览(12)
  • 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例(2)

    16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例(2)

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月11日
    浏览(8)
  • 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)

    16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月11日
    浏览(12)
  • 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例(3)

    16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例(3)

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月10日
    浏览(18)
  • Flink 之 Kafka连接器

    Flink附带了一个通用的Kafka连接器,它试图跟踪Kafka客户端的最新版本。Kafka的客户端版本会在Flink不同版本间发生变化。现代Kafka客户端向后兼容broker 0.10.0版本及以后的版本。 用法 Kafka Source 提供了一个构造器类来构建KafkaSource的实例。下面代码展示如何构建一个KafkaSource来消

    2023年04月08日
    浏览(12)
  • 流批一体计算引擎-7-[Flink]的DataStream连接器

    流批一体计算引擎-7-[Flink]的DataStream连接器

    参考官方手册DataStream Connectors 一、预定义的Source和Sink 一些比较基本的Source和Sink已经内置在Flink里。 1、预定义data sources支持从文件、目录、socket,以及collections和iterators中读取数据。 2、预定义data sinks支持把数据写入文件、标准输出(stdout)、标准错误输出(stderr)和 sock

    2023年04月08日
    浏览(12)
  • Semantic Kernel 入门系列:?Connector连接器

    Semantic Kernel 入门系列:?Connector连接器

    当我们使用Native Function的时候,除了处理一些基本的逻辑操作之外,更多的还是需要进行外部数据源和服务的对接,要么是获取相关的数据,要么是保存输出结果。这一过程在Semantic Kernel中可以被归类为Connector。 Connector更像是一种设计模式,并不像Function和Memory 一样有强制和

    2023年04月15日
    浏览(14)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包