FlinkCDC能读取到快照,但是无法输出更新数据
发现是并行度问题,当我的并行度超过1的时候就无法捕获更新。文章来源地址https://www.toymoban.com/news/detail-641156.html
public static void main(String[] args) throws Exception {
System.setProperty("HADOOP_USER_NAME", "lcq");
Configuration conf = new Configuration();
conf.setInteger("rest.port", 2000);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("Master")
.port(3306)
.databaseList("test") // set captured database
.tableList("test.cart_info") // set captured table
.username("root")
.password("123456")
.deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.startupOptions(StartupOptions.initial())
.build();
// TODO 使用CDC Source从MySQL读取数据
DataStreamSource<String> mysqlDS =
env.fromSource(
mySqlSource,
WatermarkStrategy.noWatermarks(),
"MysqlSource")
.setParallelism(1);
mysqlDS.print();
env.execute();
}
```
有大佬回复:
https://developer.aliyun.com/ask/547600
出现这个问题的原因是,FlinkCDC 默认使用单线程来处理数据,当并行度高于 1 时,就会创建多个线程来处理数据,每个线程会从 MySQL 中接收到不同的数据,导致数据重复。
解决这个问题的方法是,修改 FlinkCDC 的配置,设置 parallelism.max 的值为 1。
文章来源:https://www.toymoban.com/news/detail-641156.html
到了这里,关于FlinkCDC能读取到快照,但是无法输出更新数据的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!