基于埋点日志数据的网络流量统计 - PV、UV

这篇具有很好参考价值的文章主要介绍了基于埋点日志数据的网络流量统计 - PV、UV。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

水善利万物而不争,处众人之所恶,故几于道💦

一、 网站总流量数统计 - PV

  1. 需求分析

  2. 代码实现

   方式一

   方式二

   方式三:使用process算子实现

   方式四:使用process算子实现

二、网站独立访客数统计 - UV

  1. 需求分析

  2. 代码实现


一、 网站总流量数统计 - PV

  PV全称 Page View,也就是一个网站的页面浏览量。每当用户进入网站加载或者刷新某个页面时,就会给该网站带来PV量,它往往用来衡量一个网站的流量和用户活跃度。当然了,单个指标并不能全面的反映网站的实际情况,往往需要结合其他的指标进行分析。

1. 需求分析
  埋点采集到的数据格式大概是这个样子(文件已上传资源)

基于埋点日志数据的网络流量统计 - PV、UV,Flink,Flink统计PV、UV,大数据编程练习,Flink流处理编程实战,Flink算子练习,Flink - Java,网站流量统计,Flink基础练习第一个是userId、第二个是itemId、第三个是categoryId、第四个是behavior、第五个是timestamp

所以我们要统计PV的话要先从第四列中筛选出PV,然后再进行累加,求出最终的PV

2. 代码实现
方式一:
  先用 readTextFile()读取文件,然后将读取到的每行数据封装成一个bean对象,再通过 filter过滤出我们需要的PV数据,这时得到的都是封装好的一个个对象没法直接sum,所以通过 map将数据映射为一个个的元组类型(PV,1)然后,使用 keyBy()将他们分到同一个并行度中进行 sum,得出最终的结果。
public class Flink01_Project_PV {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port",1000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(2);

        env
                .readTextFile("input/UserBehavior.csv")
                // 将数据封装成 UserBehavior 对象
                .map(new MapFunction<String, UserBehavior>() {
                    @Override
                    public UserBehavior map(String line) throws Exception {
                        String[] data = line.split(",");
                        return new UserBehavior(
                                Long.valueOf(data[0]),
                                Long.valueOf(data[1]),
                                Integer.valueOf(data[2]),
                                data[3],
                                Long.valueOf(data[4]));
                    }
                })
                // 过滤出行为为PV的数据
                .filter(new FilterFunction<UserBehavior>() {
                    @Override
                    public boolean filter(UserBehavior value) throws Exception {
                        return "pv".equals(value.getBehavior());
                    }
                })
                // 因为直接求和的话没法求,所以做一次映射,映射成 (PV,1) 这样的结构
                .map(new MapFunction<UserBehavior, Tuple2<String,Long>>() {
                    @Override
                    public Tuple2<String, Long> map(UserBehavior value) throws Exception {
                        return Tuple2.of(value.getBehavior(),1L);
                    }
                })
                // 然后 将他们通过key进行分组,进入同一个并行度里面 进行求和
                .keyBy(new KeySelector<Tuple2<String, Long>, String>() {
                    @Override
                    public String getKey(Tuple2<String, Long> value) throws Exception {
                        return value.f0;
                    }
                })
                // 进行求和
                .sum(1)
                .print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行结果:
基于埋点日志数据的网络流量统计 - PV、UV,Flink,Flink统计PV、UV,大数据编程练习,Flink流处理编程实战,Flink算子练习,Flink - Java,网站流量统计,Flink基础练习

方式二:

  这种方式省去了方式一的封装对象,其他的思路都一样。

public class Flink01_Project_PV {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port",1000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(2);

        env
                .readTextFile("input/UserBehavior.csv")
                // 直接过滤出我们想要的数据
                .filter(new FilterFunction<String>() {
                    @Override
                    public boolean filter(String value) throws Exception {
                        String[] data = value.split(",");
                        return "pv".equals(data[3]);
                    }
                })
                // 然后将结构转换为元组类型 (PV,1)
                .map(new MapFunction<String, Tuple2<String,Long>>() {
                    @Override
                    public Tuple2<String, Long> map(String value) throws Exception {
                        String[] data = value.split(",");
                        return Tuple2.of(data[3],1L);
                    }
                })
                // 通过key分组
                .keyBy(new KeySelector<Tuple2<String, Long>, String>() {
                    @Override
                    public String getKey(Tuple2<String, Long> value) throws Exception {
                        return value.f0;
                    }
                })
                // 求和
                .sum(1)
                .print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行结果:
基于埋点日志数据的网络流量统计 - PV、UV,Flink,Flink统计PV、UV,大数据编程练习,Flink流处理编程实战,Flink算子练习,Flink - Java,网站流量统计,Flink基础练习

方式三:使用process算子实现

  首先使用readTextFile读取数据,使用map将读取到的数据封装为对象,然后使用keyBy进行分组,最后使用process算子进行求解

  • 为什么要使用keyBy():目的是让pv数据进入同一个并行度,如果不使用直接process的话,两个并行度里面都有一个sum,结果就不对了
  • 为什么不使用filter过滤呢?因为我们的过滤逻辑是再process里面完成的,所以不用再额外过滤
public class Flink02_Project_PV_process {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port",1000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(2);

        env
                .readTextFile("input/UserBehavior.csv")
                // 封装成 UserBehavior 对象
                .map(new MapFunction<String, UserBehavior>() {
                    @Override
                    public UserBehavior map(String line) throws Exception {
                        String[] data = line.split(",");
                        return new UserBehavior(
                                Long.valueOf(data[0]),
                                Long.valueOf(data[1]),
                                Integer.valueOf(data[2]),
                                data[3],
                                Long.valueOf(data[4]));
                    }
                })
                // 通过key分组
                .keyBy(new KeySelector<UserBehavior, String>() {
                    @Override
                    public String getKey(UserBehavior value) throws Exception {
                        return value.getBehavior();
                    }
                })
                // 使用proces算子实现 PV 的统计
                .process(new ProcessFunction<UserBehavior, String>() {
                    // 定义累加变量
                    long sum =0L ;
                    @Override
                    public void processElement(UserBehavior value, Context ctx, Collector<String> out) throws Exception {
                        // 判断用户行为是否是PV
                        if ("pv".equals(value.getBehavior())){
                            // 条件满足 sum+1
                            sum++;
                            // 将结果收集
                            out.collect("pv = "+sum);
                        }
                    }
                })
                .print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行结果:
基于埋点日志数据的网络流量统计 - PV、UV,Flink,Flink统计PV、UV,大数据编程练习,Flink流处理编程实战,Flink算子练习,Flink - Java,网站流量统计,Flink基础练习

方式四:使用process算子实现

  方式四相比于方式三省去了对象的封装,其他思路一样。

public class Flink02_Project_PV_process {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port",1000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(2);

        env
                .readTextFile("input/UserBehavior.csv")
                // 通过key分组
                .keyBy(new KeySelector<String, String>() {
                    @Override
                    public String getKey(String value) throws Exception {
                        return value.split(",")[3];
                    }
                })
                // 直接使用process求 PV
                .process(new ProcessFunction<String, String>() {
                    // 定义累加变量
                    long sum = 0L;
                    @Override
                    public void processElement(String line, Context ctx, Collector<String> out) throws Exception {
                    	// 将过来的每行数据切割
                        String[] datas = line.split(",");
                        // 判断是否是我们想要的数据
                        if("pv".equals(datas[3])){
                        	// 符合条件,将累加变量+1
                            sum++;
                            // 收集结果
                            out.collect("pv = "+sum);
                        }
                    }
                })
                .print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行结果:
基于埋点日志数据的网络流量统计 - PV、UV,Flink,Flink统计PV、UV,大数据编程练习,Flink流处理编程实战,Flink算子练习,Flink - Java,网站流量统计,Flink基础练习


二、网站独立访客数统计 - UV

  UV全称 Unique Visitor,也就是独立访客数。在PV中,我们统计的是所有用户对所有页面的浏览行为,也就是同一个用户的浏览行为会被重复统计。实际上我们关注的是在某一特定范围内(一天、一周或者一个月)内访问该网站的用户数,也就是每个访客只计算一次。它能从侧面反映出该网站的受欢迎程度和用户规模的大小。

1. 需求分析
  要统计UV量的话,只需要对全量的PV,使用userId去重,然后就能得到独立访客数了。
2. 代码实现

  先filter过滤出PV数据,然后通过keyBy将PV分到同一组,然后使用process进行处理,处理方法是:用set集合存放userId,如果下一个userId可以加入该集合说明是一个新的独立访客,则收集当前集合的大小,若加入失败,说明集合中已经存在该userId,也就是不是一个新的独立访客,也就不做处理了。

public class Flink03_Project_UV {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port",1000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(2);

        env
                .readTextFile("input/UserBehavior.csv")
                // 过滤出 PV 数据
                .filter(new FilterFunction<String>() {
                    @Override
                    public boolean filter(String value) throws Exception {
                        return "pv".equals(value.split(",")[3]);
                    }
                })
                // 将PV的数据分到同一个组里面
                .keyBy(new KeySelector<String, String>() {
                    @Override
                    public String getKey(String value) throws Exception {
                        return value.split(",")[3];
                    }
                })
                // 对同一组里面的数据进行处理
                .process(new ProcessFunction<String, String>() {
                    // 存放 userId 的容器,回自动对数据进行去重,最后直接拿它的大小就知道UV了
                    Set<Long> userIdSet = new HashSet<>();
                    @Override
                    public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
                        Long userId = Long.valueOf(value.split(",")[0]);
                        // 向set中添加userId,判断是否添加成功
                        if (userIdSet.add(userId)) {
                            // 添加成功的话,说明是一个新的独立访客,收集到此时容器大小
                            out.collect("UV = "+ userIdSet.size());
                        }
                    }
                })
                .print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行结果:
基于埋点日志数据的网络流量统计 - PV、UV,Flink,Flink统计PV、UV,大数据编程练习,Flink流处理编程实战,Flink算子练习,Flink - Java,网站流量统计,Flink基础练习文章来源地址https://www.toymoban.com/news/detail-623517.html

到了这里,关于基于埋点日志数据的网络流量统计 - PV、UV的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • 微信小程序UV、PV量解释以及接口调用频率

    微信小程序UV、PV量解释以及接口调用频率

    微信小程序UV、PV量 浏览量(PV): 即通常说的Page View(PV),用户每打开一个网站页面就被记录1次。用户多次打开同一页面,浏览量值累计。 微信小程序中PV是打开小程序的打开次数。 访客数(UV): 一天之内网站的独立访客数(以Cookie为依据),一天内同一访客多次访问网站只计算

    2024年02月16日
    浏览(11)
  • QPS TPS RPS PV UV 等名词解释

    QPS :即Queries Per Second的缩写,每秒能处理查询数目。是 一台 服务器每秒能够相应的查询次数,是对一个 特定的查询服务器 在规定时间内所处理流量多少的衡量标准。 每秒钟处理完请求的次数;注意这里是处理完,具体是指发出请求到服务器处理完成功返回结果。可以理解

    2024年02月10日
    浏览(12)
  • 2023-06-13:统计高并发网站每个网页每天的 UV 数据,结合Redis你会如何实现?

    2023-06-13:统计高并发网站每个网页每天的 UV 数据,结合Redis你会如何实现?

    2023-06-13:统计高并发网站每个网页每天的 UV 数据,结合Redis你会如何实现? 答案2023-06-13: 如果统计 PV (页面浏览量)那非常好办,可以考虑为每个网页创建一个独立的 Redis 计数器,并将日期添加为键(key)的后缀。当网页收到请求时,对应的计数器将被递增。对于每天的

    2024年02月08日
    浏览(11)
  • Flink基于信用值的流量控制

    Flink基于信用值的流量控制

    flink内部实现了一个类似于tcp滑动窗口概念的流量控制功能,以满足其内部的流量控制功能,本文就来讲解下flink实现的基于信用值的流量控制的原理 首先,我们先来看一下在flink中是如何实现数据传输的, 从上图可知,发送端的taskmanager会为每个下游的算子的任务都创建一个

    2024年02月13日
    浏览(5)
  • 基于opencv深度学习,交通目标检测,行人车辆检测,人流统计,交通流量检测

    基于opencv深度学习,交通目标检测,行人车辆检测,人流统计,交通流量检测

    文章目录 0 前言+ 1. 目标检测概况+ 1.1 什么是目标检测?+ 1.2 发展阶段 2. 行人检测+ 2.1 行人检测简介+ 2.2 行人检测技术难点+ 2.3 行人检测实现效果+ 2.4 关键代码-训练过程 最后 设计项目案例演示地址: 链接 毕业设计代做一对一指导项目方向涵盖: 1.1 什么是目标检测? 目标检

    2024年02月04日
    浏览(12)
  • 基于matomo实现业务数据埋点采集上报

    基于matomo实现业务数据埋点采集上报

    matomo是一款Google-analytics数据埋点采集上报的平替方案,可保护您的数据和客户的隐私;正如它官网的slogan: Google Analytics alternative that protects your data and your customers\\\' privacy; 该项目源码开源免费,支持私有化部署,保证数据安全、可靠;支持多种方式集成,不管你的应用是传统的

    2024年02月08日
    浏览(10)
  • [超详细]基于YOLO&OpenCV的人流量统计监测系统(源码&部署教程)

    [超详细]基于YOLO&OpenCV的人流量统计监测系统(源码&部署教程)

    [YOLOv7]基于YOLO&Deepsort的人流量统计系统(源码&部署教程)_哔哩哔哩_bilibili (1)获取原始视频帧 (2)利用目标检测器对视频帧中的目标进行检测 (3)将检测到的目标的框中的特征提取出来,该特征包括表观特征(方便特征对比避免ID switch)和运动特征(运动特征方 便卡尔

    2024年02月04日
    浏览(7)
  • 手把手教你实现—基于OpenCV的车流量统计和车速检测代码

    手把手教你实现—基于OpenCV的车流量统计和车速检测代码

             本章将实现了一个简单的车辆速度估计和车流量统计的GUI应用,它使用了Haar级联检测器和相关跟踪器来检测和跟踪视频中的车辆,并通过图像处理和数学计算来估计车辆的速度。         1.首先,该代码需要cv2:用于图像处理和计算机视觉任务;dlib:用于对象

    2024年02月04日
    浏览(11)
  • 【Redis应用】UV统计(四)

    【Redis应用】UV统计(四)

    🚗Redis应用学习·第四站~ 🚩本文已收录至专栏:Redis技术学习 首先我们要搞懂两个概念: UV :全称 Unique Visitor ,也叫 独立访客量 ,是指通过互联网访问、浏览这个网页的自然人。1天内同一个用户多次访问该网站,只记录1次。 PV :全称 Page View ,也叫 页面访问量或点击量

    2024年02月09日
    浏览(14)
  • Redis - 附近商铺、用户签到、UV统计

    Redis - 附近商铺、用户签到、UV统计

    底层都是基于地理坐标进行搜索,支持地理坐标的技术有很多,Redis就是其中之一 GEO 就是Geolocation的简写形式,代表 地理坐标 。 Redis 在3.2版本中加入了对GEO的支持, 允许存储地理坐标信息 ,帮助我们根据经纬度来检索数据。 常见的命令有 : GEOADD :添加一个地理空间信息,

    2024年02月13日
    浏览(13)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包