【Spark手机流量日志处理】使用SparkSQL按月统计流量使用量最多的用户

这篇具有很好参考价值的文章主要介绍了【Spark手机流量日志处理】使用SparkSQL按月统计流量使用量最多的用户。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

🚀 作者 :“大数据小禅”

🚀文章简介:本篇文章属于Spark系列文章,专栏将会记录从spark基础到进阶的内容
🚀 内容涉及到Spark的入门集群搭建,核心组件,RDD,算子的使用,底层原理,SparkCore,SparkSQL,SparkStreaming等,Spark专栏地址.欢迎小伙伴们订阅💪

SparkSQL简介
  • Spark SQL是Apache Spark的一个模块,提供了一种基于结构化数据的编程接口。它允许用户使用SQL语句或DataFrame API来查询和操作数据,同时还支持使用Spark的分布式计算引擎进行高效的并行计算。

  • Spark SQL支持多种数据源,包括Hive、JSON、Parquet、Avro、ORC等,这些数据源可以通过DataFrame API或SQL语句进行查询和操作。同时,Spark SQL还提供了一些高级功能,如窗口函数、聚合函数、UDF等,以满足更复杂的数据分析需求。

  • Spark SQL还支持将SQL查询结果写入到外部数据源,如Hive表、JSON文件、Parquet文件等。此外,Spark SQL还提供了一些工具,如Spark SQL CLI、JDBC/ODBC驱动程序等,方便用户进行交互式查询和数据分析。

  • 使用前需要新引入对应依赖

依赖引入

使用Spark SQL需要在项目中添加以下依赖:

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.12</artifactId>
        <version>3.1.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.1.2</version>
    </dependency>
</dependencies>

其中,spark-sql_2.12是Spark SQL的核心依赖,spark-core_2.12是Spark的核心依赖。注意,版本号可以根据实际情况进行调整。

如果需要使用其他数据源,如MySQL、Hive等,则需要添加相应的依赖。例如,如果需要连接MySQL数据库,则需要添加以下依赖:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
    <version>3.1.2</version>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.25</version>
</dependency>

其中,spark-sql-kafka-0-10_2.12是连接Kafka数据源的依赖,mysql-connector-java是连接MySQL数据库的依赖。注意,版本号也可以根据实际情况进行调整。

以上是使用Maven进行依赖配置的方式。

SparkSQL快速入门案例
  • 准备数据
  • 我们假设有一个CSV文件employee.csv,包含了员工的信息,如下所示:
id,name,age,gender,salary
1,Jack,25,M,5000
2,Lucy,28,F,6000
3,Tom,30,M,8000
4,Lily,27,F,7000
5,David,32,M,9000

创建SparkSession对象
首先,我们需要创建一个SparkSession对象,它是Spark SQL的入口点。可以使用以下代码创建SparkSession对象:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder
  .appName("Spark SQL Demo")
  .getOrCreate()
//加载CSV文件
//使用SparkSession对象的read方法加载CSV文件:

val df = spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv("employee.csv")
//其中,header=true表示第一行是列名,inferSchema=true表示自动推断列的数据类型。

//创建临时表
//使用DataFrame的createOrReplaceTempView方法将DataFrame注册为一个临时表:

df.createOrReplaceTempView("employee")
//执行SQL查询
//使用SparkSession对象的sql方法执行SQL查询:
val result = spark.sql("SELECT * FROM employee WHERE age > 27")
这将返回所有年龄大于27岁的员工信息。

//输出结果
//使用DataFrame的show方法输出查询结果:

result.show()
//这将输出所有符合条件的员工信息。
  • 完整代码如下:
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder
  .appName("Spark SQL Demo")
  .getOrCreate()

val df = spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv("employee.csv")

df.createOrReplaceTempView("employee")

val result = spark.sql("SELECT * FROM employee WHERE age > 27")

result.show()
输出结果:

+---+----+---+------+-----+
| id|name|age|gender|salary|
+---+----+---+------+-----+
|  2|Lucy| 28|     F| 6000|
|  3| Tom| 30|     M| 8000|
|  5|David| 32|     M| 9000|
+---+----+---+------+-----+
手机流量日志数据格式与处理要求
  • 日志字段与字段说明如下
    【Spark手机流量日志处理】使用SparkSQL按月统计流量使用量最多的用户1.需要实现的需求1.按月统计流量使用量最多的用户(每个月使用流量最多的用户)
    2.将结果数据持久化到硬盘
处理程序
/**
  * @Description
  * @Author xiaochan
  * @Version 1.0
  */
// 时间戳         手机号码          基站物理地址             ip        接受数 接受数据包 上行流量  下行流量  状态码
//2020-03-10	15707126156	QK-X7-7N-G2-1N-QZ:CMCC	212.188.187.220	33	     40	    67584	   81920	200
//使用量 =上+下  手机号码就是用户   RDD处理方式->((月,号码),(上行+下行))
//1.下载手机流量日志
//2.按月统计流量使用量最多的用户
//3.将结果数据持久化到硬盘
object LogPhone {
  System.setProperty("hadoop.home.dir","F:\\hadoop-2.7.3\\hadoop-2.7.3")
  def main(args: Array[String]): Unit = {
    //1.创建sparksession
    val sc = new sql.SparkSession.Builder()
      .appName("test")
      .master("local[6]")
      .config("spark.testing.memory", "471859201")
      .getOrCreate()
    // 读取输入文件
    val log = sc.sparkContext.textFile("dataset\\phone.log")
    val value = log.map(_.split("\t")).filter(arr => {
      !(arr(1) == null)
    }).map(tmp => {
      //处理日期 获取月份
      val month: String = tmp(0).split("-")(1)
      //号码
      val user = tmp(1)
      //使用流量数
      var use = tmp(6) + tmp(7)
      Log(user, use.toLong, month)
    })
    sc.createDataFrame(value).createOrReplaceTempView("log")
    //每个月流量使用做多的用户 group by行数会减少,开窗函数over()行数不会减少
    val data: DataFrame = sc.sql("select user,month,useall from " +
      "(select user,month,sum(use) over(partition by user,month order by use desc) as useall," +
      "dense_rank() over(partition by month order by use desc) as rn from log)t1 where rn=1 order by month")
    data.show()
    data.write.parquet("dataset\\output\\directory")

    sc.close()
  }
}

/**
  * @Description
  * @Author xiaochan
  * @Version 1.0
  */
case class Log(
    user: String,
    use: Long,
    month: String)


  • 结果如下
    【Spark手机流量日志处理】使用SparkSQL按月统计流量使用量最多的用户

【Spark手机流量日志处理】使用SparkSQL按月统计流量使用量最多的用户文章来源地址https://www.toymoban.com/news/detail-413942.html

到了这里,关于【Spark手机流量日志处理】使用SparkSQL按月统计流量使用量最多的用户的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • MYSQL按照小时、按天、按月分组统计,无数据补0

    MYSQL按照小时、按天、按月分组统计,无数据补0

    在开发过程中,写统计接口时,总会遇到按时、按天、按月统计,无数据进行补零的业务需求 现在市面既有补0的做法;也有不补0的做法: 不补0对于数据统计来说简洁明了,一个单表分组查询完事。(也就是在前端展示时,某个时间没有数据则不展示该时间,对于用户来说,

    2024年02月03日
    浏览(9)
  • Spark大数据处理学习笔记(2.4)IDEA开发词频统计项目

    Spark大数据处理学习笔记(2.4)IDEA开发词频统计项目

    该文章主要为完成实训任务,详细实现过程及结果见【http://t.csdn.cn/0qE1L】 从Scala官网下载Scala2.12.15 - https://www.scala-lang.org/download/2.12.15.html 安装在默认位置 安装完毕 在命令行窗口查看Scala版本(必须要配置环境变量) 启动HDFS服务 启动Spark集群 在master虚拟机上创建单词文件

    2024年02月08日
    浏览(18)
  • MapReduce序列化【用户流量使用统计】

    MapReduce序列化【用户流量使用统计】

    目录 什么是序列化和反序列化? 序列化 反序列化 为什么要序列化? 序列化的主要应用场景 MapReduce实现序列化 自定义bean对象实现Writable接口 1.实现Writable接口 2.无参构造 3.重写序列化方法 4.重写反序列化方法 5.顺序一致 6.重写toString 7.实现Comparable接口 MapReduce自定义序列化案

    2024年02月06日
    浏览(11)
  • Spark---SparkSQL介绍

    Spark---SparkSQL介绍

    Shark是基于Spark计算框架之上且兼容Hive语法的SQL执行引擎,由于底层的计算采用了Spark,性能比MapReduce的Hive普遍快2倍以上,当数据全部load在内存的话,将快10倍以上,因此Shark可以作为交互式查询应用服务来使用。除了基于Spark的特性外,Shark是完全兼容Hive的语法,表结构以及

    2024年01月21日
    浏览(8)
  • 【spark】SparkSQL

    【spark】SparkSQL

    什么是SparkSQL SparkSQL是Spark的一个模块,用于处理海量 结构化数据 为什么学习SparkSQL SparkSQL是非常成熟的海量结构化数据处理框架: 学习SparkSQL主要在2个点: SparkSQL本身十分优秀,支持SQL语言、性能强、可以自动优化、API简单、兼容HIVE等等 企业大面积在使用SparkSQL处理业务数

    2024年01月20日
    浏览(26)
  • Spark(15):SparkSQL之DataFrame

    目录 0. 相关文章链接 1. DataFrame的作用 2. 创建DataFrame 3. SQL 语法 4. DSL 语法 5. RDD 转换为 DataFrame 6. DataFrame 转换为 RDD  Spark文章汇总          Spark SQL 的 DataFrame API 允许我们使用 DataFrame 而不用必须去注册临时表或者生成 SQL 表达式。DataFrame API 既有 transformation 操作也有

    2024年02月13日
    浏览(11)
  • Spark(16):SparkSQL之DataSet

    目录 0. 相关文章链接 1. DataSet的定义 2. 创建DataSet 2.1. 使用样例类序列创建 DataSet 2.2. 使用基本类型的序列创建 DataSet 2.3. 注意 3. RDD 转换为 DataSet 4. DataSet 转换为 RDD  Spark文章汇总  DataSet 是具有强类型的数据集合,需要提供对应的类型信息。 在实际使用的时候,很少用到

    2024年02月13日
    浏览(6)
  • 【SparkSQL】SparkSQL的运行流程 & Spark On Hive & 分布式SQL执行引擎

    【SparkSQL】SparkSQL的运行流程 & Spark On Hive & 分布式SQL执行引擎

    【大家好,我是爱干饭的猿,本文重点介绍、SparkSQL的运行流程、 SparkSQL的自动优化、Catalyst优化器、SparkSQL的执行流程、Spark On Hive原理配置、分布式SQL执行引擎概念、代码JDBC连接。 后续会继续分享其他重要知识点总结,如果喜欢这篇文章,点个赞👍,关注一下吧】 上一篇

    2024年02月04日
    浏览(6)
  • 【Spark精讲】一文讲透SparkSQL执行过程

    【Spark精讲】一文讲透SparkSQL执行过程

    逻辑计划阶段会将用户所写的 SQL语句转换成树型数据结构( 逻辑算子树 ), SQL语句中蕴含的逻辑映射到逻辑算子树的不同节点。 顾名思义,逻辑计划阶段生成的逻辑算子树并不会直接提交执行,仅作为中间阶段 。 最终逻辑算子树的生成过程经历 3 个子阶段,分别对应 未解析

    2024年02月03日
    浏览(9)
  • spark第四章:SparkSQL基本操作

    spark第四章:SparkSQL基本操作

    spark第一章:环境安装 spark第二章:sparkcore实例 spark第三章:工程化代码 spark第四章:SparkSQL基本操作 接下来我们学习SparkSQL他和Hql有些相似。Hql是将操作装换成MR,SparkSQL也是,不过是使用Spark引擎来操作,效率更高一些 以上是这次博客需要的所有依赖,一次性全加上。 一共

    2024年02月07日
    浏览(9)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包