Spark(30):Spark性能调优之常规性能调优

这篇具有很好参考价值的文章主要介绍了Spark(30):Spark性能调优之常规性能调优。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

0. 相关文章链接

1. 最优资源配置

2. RDD优化

2.1. RDD复用

2.2. RDD持久化

2.3. RDD尽可能早的 filter 操作

3. 并行度调节

4. 广播大变量

5. Kryo序列化

6. 调节本地化等待时长


0. 相关文章链接

 Spark文章汇总 

1. 最优资源配置

        Spark 性能调优的第一步,就是为任务分配更多的资源,在一定范围内,增加资源的分配与性能的提升是成正比的,实现了最优的资源配置后,在此基础上再考虑进行后面论述的性能调优策略。 

资源的分配在使用脚本提交 Spark 任务时进行指定,标准的 Spark 任务提交脚本如下所示: 

bin/spark-submit \ 
--class com.spark.WordCount \ 
--master yarn 
--deploy-mode cluster 
--num-executors 80 \ 
--driver-memory 6g \ 
--executor-memory 6g \ 
--executor-cores 3 \ 
/usr/opt/modules/spark/jar/spark.jar \ 

可以进行分配的资源如表所示: 

名称  说明 
--num-executors  配置 Executor 的数量 
--driver-memory  配置 Driver 内存(影响不大) 
--executor-memory  配置每个 Executor 的内存大小 
--executor-cores  配置每个 Executor CPU core 数量 

调节原则:尽量将任务分配的资源调节到可以使用的资源的最大限度。

对于具体资源的分配,我们分别讨论 Spark 的两种 Cluster 运行模式: 

  • 第一种是 Spark Standalone 模式,你在提交任务前,一定知道或者可以从运维部门获取到你可以使用的资源情况,在编写 submit 脚本的时候,就根据可用的资源情况进行资源的分配,比如说集群有 15 台机器,每台机器为 8G 内存,2 个 CPU core,那么就指定 15 个 Executor,每个 Executor 分配 8G 内存,2 个 CPU core。 
  • 第二种是 Spark Yarn 模式,由于 Yarn 使用资源队列进行资源的分配和调度,在编写 submit 脚本的时候,就根据 Spark 作业要提交到的资源队列,进行资源的分配,比如资源队列有 400G 内存,100 个 CPU core,那么指定 50 个 Executor,每个 Executor 分配8G 内存,2 个 CPU core。 

对各项资源进行了调节后,得到的性能提升会有如下表现: 

名称  解析 
增加 Executor·个数   在资源允许的情况下,增加 Executor 的个数可以提高执行 task 的并行度。比如有 4 Executor,每个 Executor 2 CPU core,那么可以并行执行 8 task,如果将 Executor 的个数增加到 8 个(资源允许的情况下),那么可以并行执行 16 task,此时的并行能力提升了一倍。 
 增加每个 Executor 的 CPU core 个数  在资源允许的情况下,增加每个 Executor 的Cpu core 个数,可以提高执行 task 的并行度。比如有 4 个 Executor,每个 Executor 有 2 个 CPU core,那么可以并行执行 8 个 task,如果将每个 Executor 的 CPU core 个数增加到 4 个(资源允许的情况下),那么可以并行执行 16 个 task,此时的并行能力提升了一倍。 
增加每个 Executor 的内存量  在资源允许的情况下,增加每个 Executor 的内存量以后,对性能的提升有三点:
1. 可以缓存更多的数据(即对 RDD 进行 cache),写入磁盘的数据相应减少,甚至可以不写入磁盘,减少了可能的磁盘 IO;
2. 可以为 shuffle 操作提供更多内存,即有更多空间来存放 reduce 端拉取的数据,写入磁盘的数据相应减少,甚至可以不写入磁盘,减少了可能的磁盘 IO; 
3. 可以为 task 的执行提供更多内存,在 task 的执行过程中可能创建很多对象,内存较小时会引发频繁的 GC,增加内存后,可以避免频繁的 GC,提升整体性能。 

生产环境 Spark submit 脚本配置 :

bin/spark-submit \
--class com.spark.WordCount \
--master yarn\
--deploy-mode cluster\
--num-executors 80 \
--driver-memory 6g \
--executor-memory 6g \
--executor-cores 3 \
--queue root.default \
--conf spark.yarn.executor.memoryOverhead=2048 \
--conf spark.core.connection.ack.wait.timeout=300 \
/usr/local/spark/spark.jar

参数配置参考值:

--num-executors: 50~100
--driver-memory: 1G~5G
--executor-memory: 6G~10G
--executor-cores: 3
--master:实际生产环境一定使用 yarn

2. RDD优化

2.1. RDD复用

在对 RDD 进行算子时,要避免相同的算子和计算逻辑之下对 RDD 进行重复的计算

Spark(30):Spark性能调优之常规性能调优,# Spark,spark,大数据,分布式,bigdata

对上图中的 RDD 计算架构进行修改,得到如下图所示的优化结果:

Spark(30):Spark性能调优之常规性能调优,# Spark,spark,大数据,分布式,bigdata

2.2. RDD持久化

        在 Spark 中,当多次对同一个 RDD 执行算子操作时,每一次都会对这个 RDD 以之前的父 RDD 重新计算一次,这种情况是必须要避免的,对同一个 RDD 的重复计算是对资源的极大浪费,因此,必须对多次使用的 RDD 进行持久化,通过持久化将公共 RDD 的数据缓存到内存/磁盘中,之后对于公共 RDD 的计算都会从内存/磁盘中直接获取 RDD 数据。 

对于 RDD 的持久化,有两点需要说明: 

  • RDD 的持久化是可以进行序列化的,当内存无法将 RDD 的数据完整的进行存放的时候,可以考虑使用序列化的方式减小数据体积,将数据完整存储在内存中。 
  • 如果对于数据的可靠性要求很高,并且内存充足,可以使用副本机制,对 RDD 数据进行持久化。当持久化启用了复本机制时,对于持久化的每个数据单元都存储一个副本,放在其他节点上面,由此实现数据的容错,一旦一个副本数据丢失,不需要重新计算,还可以使用另外一个副本。 

2.3. RDD尽可能早的 filter 操作

获取到初始 RDD 后,应该考虑尽早地过滤掉不需要的数据,进而减少对内存的占用,从而提升 Spark 作业的运行效率。 

3. 并行度调节

        Spark 作业中的并行度指各个stage的task的数量。 如果并行度设置不合理而导致并行度过低,会导致资源的极大浪费,例如,20 个 Executor,每个 Executor 分配 3 个 CPU core,而 Spark 作业有 40 个 task,这样每个 Executor 分配到的 task 个数是 2 个,这就使得每个 Executor 有一个 CPU core 空闲,导致资源的浪费。 理想的并行度设置,应该是让并行度与资源相匹配,简单来说就是在资源允许的前提下,并行度要设置的尽可能大,达到可以充分利用集群资源。合理的设置并行度,可以提升整个Spark 作业的性能和运行速度。 

        Spark 官方推荐,task 数量应该设置为 Spark 作业总 CPU core 数量的 2~3 倍。之所以没有推荐 task 数量与 CPU core 总数相等,是因为 task 的执行时间不同,有的 task 执行速度快而有的 task 执行速度慢,如果 task 数量与 CPU core 总数相等,那么执行快的 task 执行完成后,会出现 CPU core 空闲的情况。如果 task 数量设置为 CPU core 总数的 2~3 倍,那么一个 task 执行完毕后,CPU core 会立刻执行下一个 task,降低了资源的浪费,同时提升了 Spark 作业运行的效率。 

Spark 作业并行度的设置如下所示: 

val conf = new SparkConf()
conf.set("spark.default.parallelism", "500")

4. 广播大变量

        默认情况下,task 中的算子中如果使用了外部的变量,每个 task 都会获取一份变量的复本,这就造成了内存的极大消耗。一方面,如果后续对 RDD 进行持久化,可能就无法将 RDD 数据存入内存,只能写入磁盘,磁盘 IO 将会严重消耗性能;另一方面,task 在创建对象的时候,也许会发现堆内存无法存放新创建的对象,这就会导致频繁的 GC,GC 会导致工作线程停止,进而导致 Spark 暂停工作一段时间,严重影响 Spark 性能。 

        假设当前任务配置了 20 个 Executor,指定 500 个 task,有一个 20M 的变量被所有 task 共用,此时会在 500 个 task 中产生 500 个副本,耗费集群 10G 的内存,如果使用了广播变量, 那么每个 Executor 保存一个副本,一共消耗 400M 内存,内存消耗减少了 5 倍。广播变量在每个 Executor 保存一个副本,此 Executor 的所有 task 共用此广播变量,这让变量产生的副本数量大大减少。 

        在初始阶段,广播变量只在 Driver 中有一份副本。task 在运行的时候,想要使用广播变量中的数据,此时首先会在自己本地的 Executor 对应的 BlockManager 中尝试获取变量,如果本地没有,BlockManager 就会从 Driver 或者其他节点的 BlockManager 上远程拉取变量的复本,并由本地的 BlockManager 进行管理;之后此 Executor 的所有 task 都会直接从本地的BlockManager 中获取变量。 

5. Kryo序列化

        默认情况下,Spark 使用 Java 的序列化机制。Java 的序列化机制使用方便,不需要额外的配置,在算子中使用的变量实现 Serializable 接口即可,但是,Java 序列化机制的效率不高,序列化速度慢并且序列化后的数据所占用的空间依然较大。 

        Kryo 序列化机制比 Java 序列化机制性能提高 10 倍左右,Spark 之所以没有默认使用Kryo 作为序列化类库,是因为它不支持所有对象的序列化,同时 Kryo 需要用户在使用前注册需要序列化的类型,不够方便,但从 Spark 2.0.0 版本开始,简单类型、简单类型数组、字符串类型的 Shuffling RDDs 已经默认使用 Kryo 序列化方式了。 

public class MyKryoRegistrator implements KryoRegistrator { 
  @Override 
  public void registerClasses(Kryo kryo) {     
    kryo.register(StartupReportLogs.class); 
  } 
} 

配置 Kryo 序列化方式的实例代码:

//创建 SparkConf 对象
val conf = new SparkConf().setMaster(…).setAppName(…)
//使用 Kryo 序列化库,如果要使用 Java 序列化库,需要把该行屏蔽掉
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
//在 Kryo 序列化库中注册自定义的类集合,如果要使用 Java 序列化库,需要把该行屏蔽掉
conf.set("spark.kryo.registrator", "atguigu.com.MyKryoRegistrator");

6. 调节本地化等待时长

        Spark 作业运行过程中,Driver 会对每一个 stage 的 task 进行分配。根据 Spark 的 task 分配算法,Spark 希望 task 能够运行在它要计算的数据算在的节点(数据本地化思想),这样就可以避免数据的网络传输。通常来说,task 可能不会被分配到它处理的数据所在的节点,因为这些节点可用的资源可能已经用尽,此时,Spark 会等待一段时间,默认 3s,如果等待指定时间后仍然无法在指定节点运行,那么会自动降级,尝试将 task 分配到比较差的本地化级别所对应的节点上,比如将 task 分配到离它要计算的数据比较近的一个节点,然后进行计算,如果当前级别仍然不行,那么继续降级。 

        当 task 要处理的数据不在 task 所在节点上时,会发生数据的传输。task 会通过所在节点的BlockManager 获取数据,BlockManager 发现数据不在本地时,户通过网络传输组件从数据所在节点的 BlockManager 处获取数据。 

        网络传输数据的情况是我们不愿意看到的,大量的网络传输会严重影响性能,因此,我们希望通过调节本地化等待时长,如果在等待时长这段时间内,目标节点处理完成了一部分 task,那么当前的 task 将有机会得到执行,这样就能够改善 Spark 作业的整体性能。 

Spark 的本地化等级如表所示: 

名称  解析 
PROCESS_LOCAL  进程本地化,task 和数据在同一个 Executor 中,性能最好。 
NODE_LOCAL  节点本地化,task和数据在同一个节点中,但是task 和数据不在同一个 Executor 中,数据需要在进程间进行传输。 
RACK_LOCAL  机架本地化,task 和数据在同一个机架的两个节点上,数据需要通过网络在节点之间进行传输。 
NO_PREF  对于 task 来说,从哪里获取都一样,没有好坏之分。 
ANY  task 和数据可以在集群的任何地方,而且不在一个机架中,性能最差。 

        在 Spark 项目开发阶段,可以使用 client 模式对程序进行测试,此时,可以在本地看到比较全的日志信息,日志信息中有明确的 task 数据本地化的级别,如果大部分都是PROCESS_LOCAL,那么就无需进行调节,但是如果发现很多的级别都是 NODE_LOCAL、ANY,那么需要对本地化的等待时长进行调节,通过延长本地化等待时长,看看 task 的本地化级别有没有提升,并观察 Spark 作业的运行时间有没有缩短。 注意,过犹不及,不要将本地化等待时长延长地过长,导致因为大量的等待时长,使得Spark 作业的运行时间反而增加了。 

Spark 本地化等待时长的设置如代码所示: 

val conf = new SparkConf() 
conf.set("spark.locality.wait", "6") 

注:其他Spark相关系列文章链接由此进 ->  Spark文章汇总 文章来源地址https://www.toymoban.com/news/detail-594885.html


到了这里,关于Spark(30):Spark性能调优之常规性能调优的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • 分布式计算中的大数据处理:Hadoop与Spark的性能优化

    大数据处理是现代计算机科学的一个重要领域,它涉及到处理海量数据的技术和方法。随着互联网的发展,数据的规模不断增长,传统的计算方法已经无法满足需求。因此,分布式计算技术逐渐成为了主流。 Hadoop和Spark是目前最为流行的分布式计算框架之一,它们都提供了高

    2024年01月23日
    浏览(47)
  • 如何对 Spark 进行全方位性能调优?

    如何对 Spark 进行全方位性能调优?

    日志收集 如果作业执行报错或者速度异常,通常需要查看 Spark 作业日志,Spark 日志通常是排错的唯一根据,更是作业调优的好帮手。查看日志的时候,需要注意的是 Spark 作业是一个分布式执行的过程,所以日志也是分布式的,联想到 Spark 的架构,Spark 的日志也分为两个级别

    2024年02月21日
    浏览(16)
  • 【spark床头书系列】SparkSQL性能调优官网权威资料

    SparkSQL性能调优官网权威资料点击这里也可看全文 对于某些工作负载,可以通过将数据缓存在内存中或打开一些实验选项来提高性能。 Spark SQL可以 使用内存中的列式格式缓存表格 ,通过调用 spark.catalog.cacheTable(\\\"tableName\\\") 或 dataFrame.cache() 方法。然后,Spark SQL将只扫描所需的列

    2024年01月20日
    浏览(28)
  • Linux性能调优之sar详解

    sar是一个采集,报告和存储计算机负载信息的工具。 有的时候,我们要通过对系统的cpu负载等性能数值的查看,来判排查系统产生某种故障(经常死机或者运行速度突然变慢)的原因。但是,简单的top,uptime,w等命令只可以查看当前的负载,而无法查看过去的某一时间段的cpu的

    2024年02月01日
    浏览(12)
  • Linux 性能调优之网络优化

    Linux 性能调优之网络优化

    考试整理相关笔记 分享一些 Linux 中网络内核参数调优的笔记 理解不足小伙伴帮忙指正 对每个人而言,真正的职责只有一个:找到自我。然后在心中坚守其一生,全心全意,永不停息。所有其它的路都是不完整的,是人的逃避方式,是对大众理想的懦弱回归,是随波逐流,是

    2024年02月07日
    浏览(9)
  • spark 数据序列化和内存调优(翻译)

    由于大多数Spark计算的内存性质,Spark程序可能会被集群中的任何资源瓶颈:CPU、网络带宽或内存。大多数情况下,如果数据能放在内存,瓶颈是网络带宽,但有时,您还需要进行一些调整,例如以序列化形式存储RDD,以减少内存使用。本指南将涵盖两个主要主题:数据序列化

    2024年03月11日
    浏览(14)
  • 性能调优之JMH必知必会3:编写正确的微基准测试用例

      性能调优之JMH必知必会1:什么是JMH 性能调优之JMH必知必会2:JMH的基本用法 性能调优之JMH必知必会4:JMH的高级用法 性能调优之JMH必知必会5:JMH的Profiler       在前面两篇文章中分别介绍了什么是JMH、JMH的基本法。现在来介绍JMH正确的微基准测试用例如何编写。【 单位

    2023年04月08日
    浏览(12)
  • spark 的group by ,join数据倾斜调优

    spark任务中最常见的耗时原因就是数据分布不均匀,从而导致有些task运行时间很长,长尾效应导致的整个job运行耗时很长 首先我们要定位数据倾斜,我们可以通过在spark ui界面中查看某个stage下的task的耗时,如果发现某些task耗时很长,对应要处理的数据很多,证明有数据倾斜

    2024年02月21日
    浏览(12)
  • Spark大数据分析与实战笔记(第三章 Spark RDD 弹性分布式数据集-02)

    Spark大数据分析与实战笔记(第三章 Spark RDD 弹性分布式数据集-02)

    人生很长,不必慌张。你未长大,我要担当。 传统的MapReduce虽然具有自动容错、平衡负载和可拓展性的优点,但是其最大缺点是采用非循环式的数据流模型,使得在迭代计算式要进行大量的磁盘IO操作。Spark中的RDD可以很好的解决这一缺点。 RDD是Spark提供的最重要的抽象概念

    2024年02月22日
    浏览(186)
  • Spark弹性分布式数据集

    Spark弹性分布式数据集

    1. Spark RDD是什么 RDD(Resilient Distributed Dataset,弹性分布式数据集)是一个不可变的分布式对象集合,是Spark中最基本的数据抽象。在代码中RDD是一个抽象类,代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。 每个RDD都被分为多个分区,这些分区运行在集群中

    2024年02月13日
    浏览(50)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包