Kafka+Fink 实战+工具类

这篇具有很好参考价值的文章主要介绍了Kafka+Fink 实战+工具类。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

  • LogServiceImpl
@Service
@Slf4j
public class LogServiceImpl implements LogService {

    private static final String TOPIC_NAME = "ods_link_visit_topic";

    @Autowired
    private KafkaTemplate kafkaTemplate;

    /**
     * 记录日志
     *
     * @param request
     * @param shortLinkCode
     * @param accountNo
     * @return
     */
    @Override
    public void recodeShortLinkLog(HttpServletRequest request, String shortLinkCode, Long accountNo) {
        // ip、 浏览器信息
        String ip = CommonUtil.getIpAddr(request);
        // 全部请求头
        Map<String, String> headerMap = CommonUtil.getAllRequestHeader(request);

        Map<String,String> availableMap = new HashMap<>();
        availableMap.put("user-agent",headerMap.get("user-agent"));
        availableMap.put("referer",headerMap.get("referer"));
        availableMap.put("accountNo",accountNo.toString());

        LogRecord logRecord = LogRecord.builder()
                //日志类型
                .event(LogTypeEnum.SHORT_LINK_TYPE.name())
                //日志内容
                .data(availableMap)
                //客户端ip
                .ip(ip)
                // 时间
                .ts(CommonUtil.getCurrentTimestamp())
                //业务唯一标识(短链码)
                .bizId(shortLinkCode).build();

        String jsonLog = JsonUtil.obj2Json(logRecord);

        //打印日志 in 控制台
        log.info(jsonLog);

        // 发送kafka
        kafkaTemplate.send(TOPIC_NAME,jsonLog);


    }
}

  • DwdShortLinkLogApp
@Slf4j
public class DwdShortLinkLogApp {
    //定义 topic
    public static final String SOURCE_TOPIC = "ods_link_visit_topic";

    //定义 消费组
    public static final String SINK_TOPIC = "dwd_link_visit_topic";

    //定义 消费组
    public static final String GROUP_ID = "dwd_short_link_group";


    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

//        DataStream<String> ds = env.socketTextStream("192.168.75.146", 8888);

        FlinkKafkaConsumer<String> kafkaConsumer = KafkaUtil.getKafkaConsumer(SOURCE_TOPIC, GROUP_ID);

        DataStreamSource<String> ds = env.addSource(kafkaConsumer);

        ds.print();

        SingleOutputStreamOperator<JSONObject> jsonDs = ds.flatMap(new FlatMapFunction<String, JSONObject>() {

            @Override
            public void flatMap(String value, Collector<JSONObject> out) throws Exception {
                JSONObject jsonObject = JSON.parseObject(value);
                // 生成web端设备唯一标识
                String udid = getDeviceId(jsonObject);
                jsonObject.put("udid",udid);

                String referer = getReferer(jsonObject);
                jsonObject.put("referer",referer);

                out.collect(jsonObject);

            }
        });

        // 分组
        KeyedStream<JSONObject, String> keyedStream = jsonDs.keyBy(new KeySelector<JSONObject, String>() {

            @Override
            public String getKey(JSONObject value) throws Exception {
                return value.getString("udid");

            }
        });


        // 识别新老访客    richMap open函数,对状态以及日期格式进行初始化

        SingleOutputStreamOperator<String> jsonDSWithVisitorState = keyedStream.map(new VisitorMapFunction());

        jsonDSWithVisitorState.print("ods新老访客");

        // 存储到dwd
        FlinkKafkaProducer<String> kafkaProducer = KafkaUtil.getKafkaProducer(SINK_TOPIC);

        jsonDSWithVisitorState.addSink(kafkaProducer);


        env.execute();
    }

    /**
     * 获取referer
     * @param jsonObject
     * @return
     */
    public static String getReferer(JSONObject jsonObject){
        JSONObject dataJsonObj = jsonObject.getJSONObject("data");
        if(dataJsonObj.containsKey("referer")){

            String referer = dataJsonObj.getString("referer");
            if(StringUtils.isNotBlank(referer)){
                try {
                    URL url = new URL(referer);
                    return url.getHost();
                } catch (MalformedURLException e) {
                    log.error("提取referer失败:{}",e.toString());
                }
            }
        }

        return "";

    }

    /**
     * 生成设备唯一标识
     *
     * @param jsonObject
     * @return
     */
    public static String getDeviceId(JSONObject jsonObject){
        Map<String,String> map= new TreeMap<>();

        try{
            map.put("ip",jsonObject.getString("ip"));
            map.put("event",jsonObject.getString("event"));
            map.put("bizId",jsonObject.getString("bizId"));
            map.put("userAgent",jsonObject.getJSONObject("data").getString("userAgent"));

            return DeviceUtil.geneWebUniqueDeviceId(map);

        }catch (Exception e){
            log.error("生产唯一deviceId异常:{}", jsonObject);
            return null;
        }


    }


}

  • KafkaUtil

    @Slf4j
    public class KafkaUtil {
    
        /**
         * kafka 的 broker 地址
         */
        private static String KAFKA_SERVER = null;
    
        static {
            Properties properties = new Properties();
    
            InputStream in = KafkaUtil.class.getClassLoader().getResourceAsStream("application.properties");
    
            try {
                properties.load(in);
            } catch (IOException e) {
                e.printStackTrace();
                log.error("加载Kafka配置文件失败:{}",e.getMessage());
            }
    
            //获取配置文件中的value
            KAFKA_SERVER = properties.getProperty("kafka.servers");
    
        }
    
        /**
         * 获取flink的kafka消费者
         * @param topic
         * @param groupId
         * @return
         */
        public static FlinkKafkaConsumer<String> getKafkaConsumer(String topic,String groupId){
            Properties properties = new Properties();
            properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,groupId);
            properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_SERVER);
    
            return new FlinkKafkaConsumer<String>(topic,new SimpleStringSchema(),properties);
        }
    
        /**
         * 获取flink的kafka生产者
         * @param topic
         * @return
         */
        public static FlinkKafkaProducer<String> getKafkaProducer(String topic){
            return new FlinkKafkaProducer<String>(KAFKA_SERVER,topic,new SimpleStringSchema());
        }
    }
    
    
  • TimeUtil文章来源地址https://www.toymoban.com/news/detail-658286.html

    public class TimeUtil {
    
        /**
         * 默认日期格式
         */
        private static final String DEFAULT_PATTERN = "yyyy-MM-dd";
    
        /**
         * 默认日期格式
         */
        private static final DateTimeFormatter DEFAULT_DATE_TIME_FORMATTER  = DateTimeFormatter.ofPattern(DEFAULT_PATTERN);
    
        private static final ZoneId DEFAULT_ZONE_ID = ZoneId.systemDefault();
    
    
        /**
         * LocalDateTime 转 字符串,指定日期格式
         * @param localDateTime
         * @param pattern
         * @return
         */
        public static String format(LocalDateTime localDateTime, String pattern){
            DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern);
            String timeStr = formatter.format(localDateTime.atZone(DEFAULT_ZONE_ID));
            return timeStr;
        }
    
    
        /**
         * Date 转 字符串, 指定日期格式
         * @param time
         * @param pattern
         * @return
         */
        public static String format(Date time, String pattern){
            DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern);
            String timeStr = formatter.format(time.toInstant().atZone(DEFAULT_ZONE_ID));
            return timeStr;
        }
    
        /**
         *  Date 转 字符串,默认日期格式
         * @param time
         * @return
         */
        public static String format(Date time){
    
            String timeStr = DEFAULT_DATE_TIME_FORMATTER.format(time.toInstant().atZone(DEFAULT_ZONE_ID));
            return timeStr;
        }
    
        /**
         * timestamp 转 字符串,默认日期格式
         *
         * @param timestamp
         * @return
         */
        public static String format(long timestamp) {
            String timeStr = DEFAULT_DATE_TIME_FORMATTER.format(new Date(timestamp).toInstant().atZone(DEFAULT_ZONE_ID));
            return timeStr;
        }
    
    
        /**
         * 字符串 转 Date
         *
         * @param time
         * @return
         */
        public static Date strToDate(String time) {
            LocalDateTime localDateTime = LocalDateTime.parse(time, DEFAULT_DATE_TIME_FORMATTER);
            return Date.from(localDateTime.atZone(DEFAULT_ZONE_ID).toInstant());
    
        }
    
    }
    

到了这里,关于Kafka+Fink 实战+工具类的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • Kafka实战进阶:一篇详解与互联网实战PDF指南,带你深入Apache Kafka的世界

    Kafka实战进阶:一篇详解与互联网实战PDF指南,带你深入Apache Kafka的世界

    Apache Kafka 是由Apache软件基金会开发的一款开源消息系统项目,主要使用Scala语言编写。该项目旨在为处理实时数据提供一个统一、高通量、低等待的平台。Kafka作为一种分布式的、分区的、多复本的日志提交服务,凭借其独特的设计提供了丰富的消息系统功能。 特点 高吞吐量

    2024年01月19日
    浏览(11)
  • 【Kafka】Kafka监控工具Kafka-eagle简介

    Kafka-eagle是一种基于Web的开源管理工具,可以用来监控、管理多个Kafka集群。 下面是使用Docker部署Kafka-eagle的步骤: 下载并安装Docker和Docker Compose。 创建文件夹,例如kafka-eagle,并在其中创建docker-compose.yml文件,将以下配置写入: 在命令行中转到kafka-eagle文件夹中,运行以下命

    2024年02月14日
    浏览(10)
  • Spring Boot+Kafka实战生产级Kafka消费组

    作者:禅与计算机程序设计艺术 Kafka是一个开源分布式消息系统,最初由LinkedIn开发,之后成为Apache项目的一部分。Kafka主要用于大数据实时流处理,具有低延迟、高吞吐量等特点。本文将会从基本概念、术语说明、原理及应用场景三个方面对Kafka进行详细介绍。 Kafka作为一个

    2024年02月10日
    浏览(15)
  • Kafka 可视化工具 Kafka Tool

    Kafka 可视化工具 Kafka Tool

    使用Kafka的小伙伴,有没有为无法直观地查看 Kafka 的 Topic 里的内容而发过愁呢? 下面推荐给大家一款带有可视化页面的Kafka工具: Kafka Tool (目前最新版本是 2.0.4 ) 下载地址 http://www.kafkatool.com/download.html 下载界面 不同版本的Kafka对应不同版本的工具,个人使用的是0.11,所

    2024年02月12日
    浏览(13)
  • kafka map kafka可视化工具

    kafka map kafka可视化工具

    kafka-map是使用Java17和React开发的一款kafka可视化工具。 目前支持的功能有: 多集群管理 集群状态监控(分区数量、副本数量、存储大小、offset) 主题创建、删除、扩容(删除需配置delete.topic.enable = true) broker状态监控 消费者组查看、删除 重置offset 消息查询(支持String和j

    2024年03月28日
    浏览(74)
  • KafKa 分区,副本实战

    5个broker (1主4从) 安装目路/config/server.properties, 额外复制4份为 server-2.properties,server-3.properties,server-4.properties,server-5.properties 主要配置不同 server.properties server-2.properties server-3.properties server-4.properties server-5.properties 运行这5个broker 创建一个主题test,8个分区,3个副本 bootstrap

    2024年02月11日
    浏览(11)
  • kafka消息系统实战

    kafka消息系统实战

    kafka是什么?         是一种高吞吐量的、分布式、发布、订阅、消息系统  1.导入maven坐标 2.编写提供者 3.编写消费者  4.下载kafka 点此去官网下载——Apache Kafka 解压后进入config目录  修改zookeeper.properties dataDir=D:/kafka_2.13-3.5.1/tmp/zookeeper 修改日志存放的路径server.propertie

    2024年02月10日
    浏览(8)
  • 【面试实战】Kafka面试题

    是一个分布式流式处理平台,流平台一个关键的功能就是消息队列。 项目中使用Kafka消息队列,对评论、点赞、关注功能发布通知,封装为Event实体类。 消费者负责将消息队列中的Event取出,并将其封装为Message对象,并持久化到数据库中保存。 了解到除了Kafka还有其他的消息

    2024年02月07日
    浏览(6)
  • 【kafka】Kafka 可视化工具Kafka Eagle安装和使用

    【kafka】Kafka 可视化工具Kafka Eagle安装和使用

    Kafka产线环境需要管理的Topic和Consumser越来越多,使用命令行工具进行管理会非常繁杂。因此,大数据平台上需要一套Kafka的管理监控系统,Kafka-Eagle。 Kafka Eagle是一个用于监控和管理kafka的开源组件,可以同时监控多个kafka集群。 Kafka Eagle提供了完善的监控页面和kafka常用操作

    2023年04月15日
    浏览(11)
  • kafka学习-概念与简单实战

    kafka学习-概念与简单实战

    目录 1、核心概念 消息和批次 Topic和Partition Replicas Offset broker和集群 生产者和消费者 2、开发实战 2.1、消息发送 介绍 代码实现 2.2、消息消费 介绍 代码实现 2.3、SpringBoot Kafka pom application.yaml KafkaConfig producer consumer         kafka的基本数据单元,由字节数组组成。可以理解

    2024年02月09日
    浏览(10)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包