2023_Spark_实验十一:RDD高级算子操作

这篇具有很好参考价值的文章主要介绍了2023_Spark_实验十一:RDD高级算子操作。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


//checkpoint :

sc.setCheckpointDir("hdfs://Master:9000/ck") // 设置检查点

val rdd = sc.textFile("hdfs://Master:9000/input/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) // 执行wordcount任务的转换



rdd.checkpoint // Mark this RDD for checkpointing.

rdd.isCheckpointed

rdd.count //触发计算,日志显示:ReliableRDDCheckpointData: Done checkpointing RDD 27 to hdfs://hadoop001:9000/ck/fce48fd4-d76f-

4322-8d23-6a48d1aed7b5/rdd-27, new parent is RDD 28

rdd.isCheckpointed // res61: Boolean = true

rdd.getCheckpointFile // Option[String] = Some(hdfs://Master:9000/ck/b9a5add8-18d8-4056-9e8e-271d9522a29c/rdd-4)

coalesce :

总所周知,spark的rdd编程中有两个算子repartition和coalesce。公开的资料上定义为,两者都是对spark分区数进行调整的算子。

        repartition会经过shuffle,其实际上就是调用的coalesce(shuffle=true)。

        coalesce,默认shuffle=false,不会经过shuffle。

        当前仅针对coalesce算子考虑,我们看一下官方的定义:

        大概意思为:如果你想要从1000个分区到100个分区,并且不经过shuffle,近乎平均分配10个父分区到1个子分区。

        首先我说下我个人简单理解:不经过shuffle,就意味着coalesce算子前后都是在一个stage中的。从该stage开始到coalesce算子之前的任务的迭代执行的并行度都是1000,从coalesce算子开始到该stage结束的任务的迭代执行的并行度都是100。


val rdd1 = sc.parallelize(1 to 10, 10)

// 重新分区,分为两个2 ,不产生shuffle

val rdd2 = rdd1.coalesce(2, false)



// 获取新的RDD分区数

rdd2.partitions.length

def func1(index:Int,iter:Iterator[Int]):Iterator[String] = {

iter.toList.map(x=>"[PartID:"+index + ",value=" + x +"]").iterator

}

// 查看分区后的结果:

rdd2.mapPartitionsWithIndex(func1).collect



repartition:

val rdd1 = sc.parallelize(1 to 10, 4)

val rdd2 = rdd1.repartition(5)

collect、toArray

将RDD转换为Scala的数组。

collectAsMap

与collect、toArray相似。collectAsMap将key-value型的RDD转换为Scala的map。

注意:map中如果有相同的key,其value只保存最后一个值。


# 创建一个2分区的RDD

scala> var z = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)

z: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[129] at parallelize at <console>:21

# 输出所有分区的数据

scala> z.collect

res44: Array[(String, Int)] = Array((cat,2), (cat,5), (mouse,4), (cat,12), (dog,12), (mouse,2))



# 转化为字典

scala> z.collectAsMap

res45: scala.collection.Map[String,Int] = Map(dog -> 12, cat -> 12, mouse -> 2)

scala>


collectAsMap

val rdd = sc.parallelize(List(("a", 1), ("b", 2)))

rdd.collectAsMap

//res2: scala.collection.Map[String,Int] = Map(b -> 2, a -> 1)

combineByKey与aggregateByKey相比较:

  1. 1.相同点:

  • 两者都能映射key值分别进行分区内计算和分区间计算。

    2. 不同点:

  • combineByKey有三个参数列表而且不需要初始值,而aggregateByKey只有两个参数列表且需要初始值。

2023_Spark_实验十一:RDD高级算子操作,spark,大数据,分布式

aggregateByKey分区内计算示意图

//aggregateByKey存在函数颗粒化,有两个参数列表

//第一个参数列表,需要传递一个参数,表示为初始值

// 主要当碰见第一个key时候,和value进行分区内计算

//第二个参数列表,需要传递2个参数

// 第一个参数表示分区内计算

// 第二个参数表示分区间计算



rdd.aggregateByKey(zeroValue = 0)(

(x, y) => math.max(x, y),

(x, y) => x + y

).collect().foreach(println)

2023_Spark_实验十一:RDD高级算子操作,spark,大数据,分布式

combineByKey分区内计算示意图

​
//combineByKey方法需要三个参数:

//第一个参数表示:将相同key的第一个数据进行结构转换,实现操作

//第二个参数:分区内的计算规则

//第三个参数:分区间的计算规则

val newRDD: RDD[(String,(Int,Int))] = rdd.combineByKey(v=>(v,1),
(t:(Int,Int),v:Int)=> {t._1 + v, t._2 + v},
(t1:(Int,Int),t2:(Int,Int))=>{t1._1+t2._1,t1._2 + t2._2})

combineByKey与aggregateByKey两者的核心区别,就是在组内初始计算时少许不同。

combineByKey // 是在这个PairRDDFunctions类下的方法

​
val rdd1 = sc.textFile("hdfs://Master:9000/input/word.txt").flatMap(_.split(" ")).map((_, 1))

val rdd2 = rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n) // 等价与reduceByKey(_ + _),结果:Array[(String, Int)] =

Array((is,1), (Giuyang,1), (love,2), (capital,1), (Guiyang,1), (I,2), (of,1), (Guizhou,2), (the,1))

rdd2.collect

val rdd3 = rdd1.combineByKey(x => x + 10, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n) // 将每个key的value各自加10,结果:Array[(String, Int)]

rdd3.collect

val rdd4 = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)

val rdd5 = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)

val rdd6 = rdd5.zip(rdd4)

​

一、Rdd行动算子

1、【countByKey】统计存储在rdd中元组的key的个数,key是相同的就会进行计数+1。通过这个key 会生成一个Map Map中的key是原有中的key,value是原有key的个数;

2、【countByValue】统计在Rdd中存储元素的个数。会将rdd中每一个元组看作为一个value,若这个元组中元素是相同的,此时就会将生成Map中的value+1;

3、【filterByRange】对rdd中的元素过滤,并返回指定内容的数据。该函数作用于键值对RDD,对RDD中的元素进行过滤,返回键在指定范围中的元素;

4、【flatMapValues】主要是对存在元组中的value进行扁平化处理;

countByKey // 计算每个键出现的次数

val rdd1 = sc.parallelize(List(("a", 1), ("b", 2), ("b", 2), ("c", 2), ("c", 1)))

rdd1.countByKey

rdd1.countByValue  // countByValue返回每个值的出现次数

filterByRange  // 【filterByRange】对rdd中的元素过滤,并返回指定范围的内容数据

val rdd1 = sc.parallelize(List(("e", 5), ("c", 3), ("d", 4), ("c", 2), ("a", 1)))

val rdd2 = rdd1.filterByRange("b", "d")

rdd2.collect

flatMapValues

val rdd3 = sc.parallelize(List(("a", "1 2"), ("b", "3 4")))

rdd3.flatMapValues(_.split(" "))

foldByKey

函数原型:

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]

def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]

 作用:将RDD[K,V]根据K将V做折叠、合并处理,zeroValue作为初始参数,调用func得到V,

再根据Key按照func对V进行调用。

例子:

scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2)))

rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at makeRDD at <console>:27

scala> rdd1.foldByKey(0)(_+_).collect

res3: Array[(String, Int)] = Array((A,2), (B,3))

说明: 将0应用到_+_上,Array(("A",0+0),("A",2+0)) 再进一步处理得到Array(("A",0+2))最终得到Array(("A",2))

foldByKey

函数原型:

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]

def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]

 作用:将RDD[K,V]根据K将V做折叠、合并处理,zeroValue作为初始参数,调用func得到V,

再根据Key按照func对V进行调用。

例子:

scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2)))

rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at makeRDD at <console>:27

scala> rdd1.foldByKey(0)(_+_).collect

res3: Array[(String, Int)] = Array((A,2), (B,3))

说明: 将0应用到_+_上,Array(("A",0+0),("A",2+0)) 再进一步处理得到Array(("A",0+2))最终得到Array(("A",2))

foldByKey

val rdd1 = sc.parallelize(List("dog", "wolf", "cat", "bear"), 2)

val rdd2 = rdd1.map(x => (x.length, x))

val rdd3 = rdd2.foldByKey("")(_+_)

val rdd = sc.textFile("hdfs://Master:9000/input/word.txt").flatMap(_.split(" ")).map((_, 1))

rdd.foldByKey(0)(_+_)

foreachPartition  // foreachPartition是spark-core的action算子,该算子源码中的注释是:Applies a function func to each parition of this RDD.(将函数func应用于此RDD的每个分区)

val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)

rdd1.foreachPartition(x => println(x.reduce(_ + _)))

2023_Spark_实验十一:RDD高级算子操作,spark,大数据,分布式

keyBy

val rdd1 = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)

val rdd2 = rdd1.keyBy(_.length)

rdd2.collect

keys values文章来源地址https://www.toymoban.com/news/detail-714894.html

val rdd1 = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)

val rdd2 = rdd1.map(x => (x.length, x))

rdd2.keys.collect

rdd2.values.collect

到了这里,关于2023_Spark_实验十一:RDD高级算子操作的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • Spark---RDD算子(单值类型转换算子)

    Spark---RDD算子(单值类型转换算子)

    RDD算子是用于对RDD进行转换(Transformation)或行动(Action)操作的方法或函数。通俗来讲,RDD算子就是RDD中的函数或者方法,根据其功能,RDD算子可以分为两大类: 转换算子(Transformation): 转换算子用于从一个RDD生成一个新的RDD,但是原始RDD保持不变。常见的转换算子包括

    2024年01月21日
    浏览(10)
  • Spark中Rdd算子和Action算子--学习笔记

    filter distinct groupBy groupByKey,sortBy,SortByKey rdd之间的连接 collect,take,count()类的聚合算子,saveAsTextFile, 统计算子,countByKey() countByKey().items() countByValue() , countByValue().items() 词频统计 缓存是将数据存储再内存或者磁盘上,缓存的特点是计算结束后缓存自动清空 为什么使用缓存? 提升

    2024年01月16日
    浏览(12)
  • Spark中RDD的Transformation算子

    Spark中RDD的Transformation算子

    map算子的功能为做映射,即将原来的RDD中对应的每一个元素,应用外部传入的函数进行运算,返回一个新的RDD flatMap算子的功能为扁平化映射,即将原来RDD中对应的每一个元素应用外部的运算逻辑进行运算,然后再将返回的数据进行压平,类似先map,然后再flatten的操作,最后

    2024年02月11日
    浏览(10)
  • Spark源码解析(一):RDD之Transfrom算子

    Spark源码解析(一):RDD之Transfrom算子

    RDD 代表的是分布式数据形态,因此,RDD 到 RDD 之间的转换,本质上是数据形态上的转换(Transformations) 在 RDD 的编程模型中,一共有两种算子,Transformations 类算子和 Actions 类算子。开发者需要使用 Transformations 类算子,定义并描述数据形态的转换过程,然后调用 Actions 类算子

    2024年01月20日
    浏览(9)
  • 2023_Spark_实验八:Scala高级特性实验

    2023_Spark_实验八:Scala高级特性实验

    1、什么是泛型类 和Java或者C++一样,类和特质可以带类型参数。在Scala中,使用方括号来定义类型 参数,如下所示: 2、什么是泛型函数 函数和方法也可以带类型参数。和泛型类一样,我们需要把类型参数放在方法名之 后。 注意:这里的ClassTag是必须的,表示运行时的一些信

    2024年02月08日
    浏览(7)
  • 2023_Spark_实验十四:SparkSQL入门操作

    2023_Spark_实验十四:SparkSQL入门操作

    1、将emp.csv、dept.csv文件上传到分布式环境,再用  hdfs  dfs -put dept.csv /input/ hdfs  dfs -put emp.csv /input/ 将本地文件put到hdfs文件系统的input目录下 2、或者调用本地文件也可以。区别:sc.textFile(\\\"file:///D:\\\\temp\\\\emp.csv\\\") StructType 是个case class,一般用于构建schema. 因为是case class,所以使

    2024年02月08日
    浏览(7)
  • 【Spark练习】RDD分区操作

    【Spark练习】RDD分区操作

    2.1 textFile 对于textFile而言,如果 没有在方法中指定分区数 ,则sc.defaultMinPartitions默认为 min(defaultParallelism,2) ,其中,defaultParallelism对应的就是spark.default.parallelism,如果是从HDFS中读取文件,则分区数为文件分片数(比如,128MB/片) rdd的分区数 = max(本地file的分片数, sc.default

    2024年02月02日
    浏览(9)
  • Spark RDD编程基本操作

    Spark RDD编程基本操作

    RDD是Spark的核心概念,它是一个只读的、可分区的分布式数据集,这个数据集的全部或部分可以缓存在内存中,可在多次计算间重用。Spark用Scala语言实现了RDD的API,程序员可以通过调用API实现对RDD的各种操作,从而实现各种复杂的应用。 Spark采用textFile()方法来从文件系统中加

    2024年02月06日
    浏览(10)
  • 【Spark编程基础】实验三RDD 编程初级实践(附源代码)

    【Spark编程基础】实验三RDD 编程初级实践(附源代码)

    1、熟悉 Spark 的 RDD 基本操作及键值对操作; 2、熟悉使用 RDD 编程解决实际具体问题的方法 1、Scala 版本为 2.11.8。 2、操作系统:linux(推荐使用Ubuntu16.04)。 3、Jdk版本:1.7或以上版本。 请到本教程官网的“下载专区”的“数据集”中下载 chapter5-data1.txt,该数据集包含了某大

    2024年03月25日
    浏览(10)
  • Spark大数据处理讲课笔记---Spark RDD典型案例

    Spark大数据处理讲课笔记---Spark RDD典型案例

    利用RDD计算总分与平均分 利用RDD统计每日新增用户 利用RDD实现分组排行榜 针对成绩表,计算每个学生总分和平均分   读取成绩文件,生成lines;定义二元组成绩列表;遍历lines,填充二元组成绩列表;基于二元组成绩列表创建RDD;对rdd按键归约得到rdd1,计算总分;将rdd1映射

    2024年02月06日
    浏览(17)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包