Flink动态ClickhouseSink+自动建表

这篇具有很好参考价值的文章主要介绍了Flink动态ClickhouseSink+自动建表。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

通过自定义注解的形式,对JdbcSink进行封装,支持自动建表、自动拼接insert语句文章来源地址https://www.toymoban.com/news/detail-536734.html

主类

package flink.security.sink;


import cn.hutool.core.util.ObjectUtil;
import cn.hutool.json.JSONUtil;
import lombok.Cleanup;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.lang.reflect.Field;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
 * 通用clickhouse sink
 */
@Slf4j
public class SecurityClickHouseSink {
    public static <T> SinkFunction<T> getSink(Class<T> model) throws Exception {
        createTable(model);
        String sql = createSql(model);
        return JdbcSink.sink(
                sql,
                new JdbcStatementBuilder<T>() {
                    @SneakyThrows
                    @Override
                    public void accept(PreparedStatement preparedStatement, T t) throws SQLException {
                        Class<?> obj = t.getClass();
                        Field[] declaredFields = obj.getDeclaredFields();
                        Stream<Field> usedFields = getUsedFields(declaredFields);
                        List<Field> fields = usedFields.collect(Collectors.toList());
                        for (int i = 0; i < fields.size(); i++) {
                            fields.get(i).setAccessible(true);
                            Object o = fields.get(i).get(t);
                            if (o instanceof Boolean) {
                                preparedStatement.setInt(i + 1, (Boolean) o ? 1 : 0);
                            } else if (o instanceof List) {
                                preparedStatement.setString(i + 1, JSONUtil.toJsonStr(o));
                            } else if (o instanceof Date) {
                                preparedStatement.setString(i + 1, DateFormatUtil.toYmdHms(((Date) o).getTime()));
                            } else {
                                preparedStatement.setObject(i + 1, o);
                            }
                        }
                    }
                },
                new JdbcExecutionOptions.Builder()
                        .withBatchSize(10000)
                        .withBatchIntervalMs(1000L * 10)
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withDriverName(Constant.CLICKHOUSE_DRIVER_NAME)
                        .withUrl(Constant.CLICKHOUSE_URL)
                        .build()
        );
    }

    /**
     * 组装sql
     */
    private static <T> String createSql(Class<T> model) {
        // 1、获取表名
        SecurityTable tableAnno = model.getAnnotation(SecurityTable.class);
        if (tableAnno == null) {
            throw new RuntimeException("缺少SecurityTable注解");
        }
        String tableName = tableAnno.value().trim();
        // 2、获取字段名
        Field[] declaredFields = model.getDeclaredFields();
        // 2.1 过滤掉带SecurityField注解且isTableFile=false的字段
        Stream<Field> fields = getUsedFields(declaredFields);
        // 3.2 获取字段
        List<String> fieldnames = fields.map(SecurityClickHouseSink::getTableFiledName).collect(Collectors.toList());
        // 3、获取占位符
        List<String> placeholder = fieldnames.stream().map(f -> "?").collect(Collectors.toList());

        if (ObjectUtil.isEmpty(placeholder) || ObjectUtil.isEmpty(fieldnames)) {
            throw new RuntimeException("缺少必要的表字段!");
        }
        // 分布式表需要插入视图
        String distributed = tableAnno.distributed();
        return "insert into " + tableName + (StringUtils.isBlank(distributed) ? "" : "_mr") + "(" + StringUtils.join(fieldnames, ",") + ") values(" + StringUtils.join(placeholder, ",") + ")";
    }

    /**
     * 组装sql
     */
    private static <T> void createTable(Class<T> model) throws Exception {
        // 1、获取表名
        SecurityTable tableAnno = model.getAnnotation(SecurityTable.class);
        if (tableAnno == null) {
            throw new RuntimeException("缺少SecurityTable注解");
        }
        String tableName = tableAnno.value().trim();
        String engine = tableAnno.engine().trim();
        String distributed = tableAnno.distributed().trim();
        String cluster = tableAnno.cluster().trim();
        // 2、获取字段名
        Field[] declaredFields = model.getDeclaredFields();
        // 2.1 过滤掉带SecurityField注解且isTableFile=false的字段
        Stream<Field> fields = getUsedFields(declaredFields);
        // 3.2 获取字段
        List<String> fieldnames = fields.map(f -> {
            String wordType = f.getType().getName();
            SecurityField annotation = f.getAnnotation(SecurityField.class);
            switch (wordType) {
                case "java.lang.String":
                    wordType = "String";
                    break;
                case "java.util.Date":
                case "java.time.LocalDate":
                case "java.time.LocalDateTime":
                    wordType = "Datetime";
                    break;
                case "java.util.List":
                    wordType = "String";
                    break;
                case "boolean":
                    wordType = "UInt8";
                    break;
                case "java.lang.Double":
                    wordType = "Decimal(15,3)";
                    break;
                case "java.lang.Integer":
                    wordType = "Int32";
                    break;
                case "java.lang.Long":
                    wordType = "Int64";
                    break;
                default:
                    wordType = "String";
            }
            if (annotation == null) {
                return f.getName() + " " + wordType + " comment ''";
            } else {
                String comment = Optional.of(annotation.comment()).orElse("").replace("'", "`").replace("'", "`");
                return (StringUtils.isBlank(annotation.value()) ? f.getName() : annotation.value()) + " " + wordType + " comment '" + comment + "'";
            }
        }).collect(Collectors.toList());

        if (ObjectUtil.isEmpty(fieldnames)) {
            throw new RuntimeException("缺少必要的表字段!");
        }

        String cols = StringUtils.join(fieldnames, ",\n");

        try(Connection connection = DriverManager.getConnection(Constant.CLICKHOUSE_URL, Constant.CLICKHOUSE_USERNAME, Constant.CLICKHOUSE_PASSWORD);) {
            String localtable = String.format("create table if not exists  %s %s (%s) engine = %s", tableName, cluster, cols, engine);
            log.error(localtable);
            try (PreparedStatement preparedStatement = connection.prepareStatement(localtable)) {
                preparedStatement.executeUpdate();
            } catch (Exception e) {
                e.printStackTrace();
                throw new RuntimeException("建表失败请查看建表语句:\r\n" + localtable);
            }
            if (StringUtils.isNotBlank(distributed)) {
//                localtable = "CREATE TABLE default.PowerConsumption ON CLUSTER default_cluster AS default.PowerConsumption_local\n" +
//                        "ENGINE = Distributed(default_cluster, default, PowerConsumption_local, rand());";
                localtable = String.format("create table if not exists  %s_mr  %s AS %s engine = %s", tableName, cluster, tableName, distributed);
                log.error(localtable);
                try (PreparedStatement preparedStatement = connection.prepareStatement(localtable)) {
                    preparedStatement.executeUpdate();
                } catch (Exception e) {
                    e.printStackTrace();
                    throw new RuntimeException("建表失败请查看建表语句:\r\n" + localtable);
                }
            }
        }
    }

    private static String getTableFiledName(Field field) {
        SecurityField annotation = field.getAnnotation(SecurityField.class);
        if (annotation == null) {
            return field.getName();
        } else {
            String value = annotation.value();
            if (StringUtils.isBlank(value)) {
                return field.getName();
            }
            return value;
        }
    }

    private static Stream<Field> getUsedFields(Field[] declaredFields) {
        return Arrays.stream(declaredFields).filter(field -> {
            SecurityField annotation = field.getAnnotation(SecurityField.class);
            return annotation == null || annotation.isTableField();
        });
    }

}

 自定义注解

package cn.chinaunicom.sdsi.flink.security.anno;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface SecurityField {
    String value() default "";
    boolean isTableField() default true;
    String comment() default "''";
}
package cn.chinaunicom.sdsi.flink.security.anno;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface SecurityTable {
    String value();
    String engine() default "";
    String distributed() default "";
    String cluster() default "";
}

使用方式

SingleOutputStreamOperator<SecurityAlarms> process = attack.connect(broadcastStream).process(new BroadcastProcessFunction<SecurityAlarms, Co...

SinkFunction<SecurityAlarms> sink = SecurityClickHouseSink.getSink(SecurityAlarms.class);
process.addSink(sink);

Bean的定义

@Data
@SecurityTable(value = "security_data_center.security_alarms",
        cluster = "on cluster cluster_p2_r2_2345",
        engine = "ReplicatedReplacingMergeTree partition by (toYYYYMMDD(collectorReceiptTime)) order by (collectorReceiptTime,eventId)",
        distributed = "Distributed(cluster_p2_r2_2345,security_data_center,security_alarms,rand())")
public class SecurityAlarms extends CommonFileds{

    @SecurityField(comment = "数据接收来源【数据接收来源(安恒云)】")
    private String dispatchSource;
    @SecurityField(comment = "活动目录唯一标识【活动目录唯一标识】")
    private String DN;
    @SecurityField(comment = "DPI告警类型【DPI数据告警类型】")
    private String DPIAlertType;
}

到了这里,关于Flink动态ClickhouseSink+自动建表的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 用flink cdc sqlserver 将数据实时同步到clickhouse

    flink cdc 终于支持 sqlserver 了。 现在互联网公司用sqlserver的不多,大部分都是一些国企的老旧系统。我们以前同步数据,都是用datax,但是不能实时同步数据。现在有了flinkcdc,可以实现实时同步了。 1、首先sqlserver版本:要求sqlserver版本为14及以上,也就是 SQL Server 2017 版。

    2023年04月08日
    浏览(16)
  • XL-LightHouse 与 Flink 和 ClickHouse 流式大数据统计系统

    一个Flink任务只能并行处理一个或少数几个数据流,而XL-LightHouse一个任务可以并行处理数万个、几十万个数据流; 一个Flink任务只能实现一个或少数几个数据指标,而XL-LightHouse单个任务就能支撑大批量、数以万计的数据指标。 1、XL-LightHouse :  1、再也不需要用 Flink、Spark、

    2024年02月09日
    浏览(15)
  • 十分钟掌握 Flink CDC,实现Mysql数据增量备份到Clickhouse [纯干货,建议收藏]

    Clickhouse的优点. 真正的面向列的 DBMS ClickHouse 是一个 DBMS,而不是一个单一的数据库。它允许在运行时创建表和数据库、加载数据和运行 查询,而无需重新配置和重新启动服务器。 数据压缩 一些面向列的 DBMS(InfiniDB CE 和 MonetDB)不使用数据压缩。但是,数据压缩确实提高了

    2024年04月14日
    浏览(25)
  • 大数据Flink(七十):SQL 动态表 & 连续查询

    文章目录 SQL 动态表 连续查询 一、​​​​​​​SQL 应用于流处理的思路

    2024年02月10日
    浏览(13)
  • flink-cdc,clickhouse写入,多路输出

    kafka日志数据从kafka读取 1、关联字典表:完善日志数据 2、判断日志内容级别:多路输出 低级:入clickhouse 高级:入clickhouse的同时推送到kafka供2次数据流程处理。

    2024年02月09日
    浏览(22)
  • 【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(3)- kafka

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月03日
    浏览(29)
  • 12、Flink source和sink 的 clickhouse 详细示例

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月13日
    浏览(59)
  • Spring SpEL在Flink中的应用-与Filter结合实现数据动态分流

    SpEL表达式与Flink fiter结合可以实现基于表达式的灵活动态过滤。有关SpEL表达式的使用请参考 Spring SpEL在Flink中的应用-SpEL详解 。 可以将过滤规则放入数据库,根据不同的数据设置不同的过滤表达式,从而实现只需修改过滤表达式不用修改Flink代码的功能。对于基于Flink进行数

    2024年01月25日
    浏览(18)
  • 【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(5) - kafka

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月03日
    浏览(34)
  • 【flink番外篇】4、flink的sink(内置、mysql、kafka、redis、clickhouse、分布式缓存、广播变量)介绍及示例(8) - 完整版

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月04日
    浏览(22)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包