Spark优化和问题

这篇具有很好参考价值的文章主要介绍了Spark优化和问题。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

优化

spark sql 优化

  1. 在配置SparkSQL任务时指定executor核心数 建议为4
    (同一executor[进程]内内存共享,当数据倾斜时,使用相同核心数与内存量的两个任务,executor总量少的任务不容易OOM,因为单核心最大可用内存大.但是并非越大越好,因为单个exector最大core受服务器剩余core数量限制,过大的core数量可能导致资源分配不足)

  2. 设置spark.default.parallelism=600 每个stage的默认task数量
    (计算公式为num-executors * executor-cores 系统默认值分区为40,这是导致executor并行度上不去的罪魁祸首,之所以这样计算是为了尽量避免计算最慢的task决定整个stage的时间,将其设置为总核心的2-3倍,让运行快的task可以继续领取任务计算直至全部任务计算完毕)

  3. 开启spark.sql.auto.repartition=true 自动重新分区
    (每个stage[阶段]运行时分区并不尽相同,使用此配置可优化计算后分区数,避免分区数过大导致单个分区数据量过少,每个task运算分区数据时时间过短,从而导致task频繁调度消耗过多时间)

  4. 设置spark.sql.shuffle.partitions=400 提高shuffle并行度
    (shuffle read task的并行度)

  5. 设置spark.shuffle.service.enabled=true 提升shuffle效率 --!并未测试
    (Executor 进程除了运行task 也要进行写shuffle 数据,当Executor进程任务过重时,导致GC不能为其他Executor提供shuffle数据时将会影响效率.此服务开启时代替Executor来抓取shuffle数据)

//1.下列Hive参数对Spark同样起作用。
set hive.exec.dynamic.partition=true; // 是否允许动态生成分区
set hive.exec.dynamic.partition.mode=nonstrict; // 是否容忍指定分区全部动态生成
set hive.exec.max.dynamic.partitions = 100; // 动态生成的最多分区数

//2.运行行为
set spark.sql.autoBroadcastJoinThreshold; // 大表 JOIN 小表,小表做广播的阈值
set spark.dynamicAllocation.enabled; // 开启动态资源分配
set spark.dynamicAllocation.maxExecutors; //开启动态资源分配后,最多可分配的Executor数
set spark.dynamicAllocation.minExecutors; //开启动态资源分配后,最少可分配的Executor数
set spark.sql.shuffle.partitions; // 需要shuffle是mapper端写出的partition个数
set spark.sql.adaptive.enabled; // 是否开启调整partition功能,如果开启,spark.sql.shuffle.partitions设置的partition可能会被合并到一个reducer里运行
set spark.sql.adaptive.shuffle.targetPostShuffleInputSize; //开启spark.sql.adaptive.enabled后,两个partition的和低于该阈值会合并到一个reducer
set spark.sql.adaptive.minNumPostShufflePartitions; // 开启spark.sql.adaptive.enabled后,最小的分区数
set spark.hadoop.mapreduce.input.fileinputformat.split.maxsize; //当几个stripe的大小大于该值时,会合并到一个task中处理

//3.executor能力
set spark.executor.memory; // executor用于缓存数据、代码执行的堆内存以及JVM运行时需要的内存
set spark.yarn.executor.memoryOverhead; //Spark运行还需要一些堆外内存,直接向系统申请,如数据传输时的netty等。
set spark.sql.windowExec.buffer.spill.threshold; //当用户的SQL中包含窗口函数时,并不会把一个窗口中的所有数据全部读进内存,而是维护一个缓存池,当池中的数据条数大于该参数表示的阈值时,spark将数据写到磁盘
set spark.executor.cores; //单个executor上可以同时运行的task数

问题:

driver拉去数据量过大:

报错信息:spark.driver.maxResultSize

问题原因:样本不均衡

解决办法:将spark.driver.maxResultSize设置更大一点

spark.driver.maxResultSize 4G
因为作业内存不足,导致GC,GC可能导致executor与AM通信超时,故AM认为executor挂了,会发停止的signal

报错信息:worker机器,ERROR SIGTERM handler CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM

问题原因:设置了动态资源配置导致的

解决办法:

1. 增加心跳时长:spark.executor.heartbeatInterval=30

2. 关闭动态资源配置

--conf spark.network.timeout=600 \
--conf spark.sql.shuffle.partitions=100 \
--conf hive.exec.dynamic.partition=true \
--conf spark.executor.heartbeatInterval=30 \
解决pandas转dataframe的问题

报错信息:TypeError: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'>

问题原因:pandas有为空的数据,空值为np.NaN,spark无法识别

解决办法: 需要进行空值处理和数量类型处理

# 空值处理
 fillna_value = {
     "time_id": 0,
     "hex_type_danger_score": "-",
 }
 print("空值处理")
 danger_score_to_platform_df = danger_score_to_platform_df.fillna(fillna_value)
 print(danger_score_to_platform_df.head())

print("数据类型处理")
astype_value = {
    "time_id": int,
    "city_id": int,
}
# 数据类型处理
for columns, dtype in astype_value.items():
    danger_score_to_platform_df[columns] = danger_score_to_platform_df[columns].astype(dtype)
解决array数据类型保持到csv保存的问题

报错信息:java.lang.UnsupportedOperationException: CSV data source does not support array<string> data type.

问题原因:spark中存在array相关的数据类,csv无法识别

解决方案:将array类型数据转化为string

from pyspark.sql.types import StringType

for dtype in result_df.dtypes:
   column = dtype[0]
   data_type = dtype[1]
   print(dtype[0], dtype[1])
   if "array" in str(data_type):
#     if "double" in str(data_type):
       result_df = result_df.withColumn(column, result_df[column].cast(StringType()))
解决spark依赖的UDF jar包问题

报错信息:u"Error while instantiating 'org.apache.spark.sql.hive.HiveSessionStateBuilder':"

问题原因:spark 提交参数中的jars包含org.apache.hadoop和org.apache.spark相关的jar包

解决方案:jar包中不加入hadoop和spark相关的包, maven中:加入<scope>provided</scope >, 还注意多个jar包时,需要逗号隔开,

如: --jars "viewfs:///user/hadoop-shangchao/user_upload/hex-udf-0.1.jar,viewfs:///user/hadoop-shangchao/user_upload/zhuyong05_SafeDispatchUDF-1.0.jar"

Spark优化和问题,spark,大数据,分布式

解决spark执行持久化过程报错:

报错信息: py4j.protocol.Py4JJavaError: An error occurred while calling o151.csv.

: org.apache.spark.SparkException: Job aborted.

问题原因:暂时未弄清楚

解决方案: spark-submit增加几个配置参数文章来源地址https://www.toymoban.com/news/detail-810747.html

解决spark执行持久化过程报错:

报错信息: py4j.protocol.Py4JJavaError: An error occurred while calling o151.csv.

: org.apache.spark.SparkException: Job aborted.

问题原因:暂时未弄清楚

解决方案: spark-submit增加几个配置参数

--conf spark.yarn.job.priority=1  \
--conf spark.dynamicAllocation.maxExecutors=100 \
--conf spark.network.timeout=600 \
--conf spark.driver.maxResultSize=4G \
--conf spark.sql.shuffle.partitions=100 \
--conf hive.exec.dynamic.partition=true \
--conf spark.executor.heartbeatInterval=120 \

到了这里,关于Spark优化和问题的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • 大数据课程K2——Spark的RDD弹性分布式数据集

    大数据课程K2——Spark的RDD弹性分布式数据集

    文章作者邮箱:yugongshiye@sina.cn              地址:广东惠州 ⚪ 了解Spark的RDD结构; ⚪ 掌握Spark的RDD操作方法; ⚪ 掌握Spark的RDD常用变换方法、常用执行方法; 初学Spark时,把RDD看做是一个集合类型(类似于Array或List),用于存储数据和操作数据,但RDD和普通集合的区别

    2024年02月12日
    浏览(50)
  • 大数据开源框架环境搭建(七)——Spark完全分布式集群的安装部署

    大数据开源框架环境搭建(七)——Spark完全分布式集群的安装部署

    前言:七八九用于Spark的编程实验 大数据开源框架之基于Spark的气象数据处理与分析_木子一个Lee的博客-CSDN博客_spark舆情分析 目录 实验环境: 实验步骤: 一、解压 二、配置环境变量:  三、修改配置文件  1.修改spark-env.sh配置文件: 2.修改配置文件slaves: 3.分发配置文件:

    2024年02月11日
    浏览(51)
  • 云计算与大数据第16章 分布式内存计算平台Spark习题

    1、Spark是Hadoop生态(  B  )组件的替代方案。 A. Hadoop     B. MapReduce        C. Yarn             D.HDFS 2、以下(  D  )不是Spark的主要组件。 A. Driver      B. SparkContext       C. ClusterManager D. ResourceManager 3、Spark中的Executor是(  A  )。 A.执行器      B.主节

    2024年02月14日
    浏览(227)
  • 分布式计算框架:Spark、Dask、Ray
分布式计算哪家强:Spark、Dask、Ray

    分布式计算框架:Spark、Dask、Ray 分布式计算哪家强:Spark、Dask、Ray

    目录 什么是分布式计算 分布式计算哪家强:Spark、Dask、Ray 2 选择正确的框架 2.1 Spark 2.2 Dask 2.3 Ray 分布式计算是一种计算方法,和集中式计算是相对的。 随着计算技术的发展, 有些应用需要非常巨大的计算能力才能完成,如果采用集中式计算,需要耗费相当长的时间来完成

    2024年02月11日
    浏览(54)
  • 数据存储和分布式计算的实际应用:如何使用Spark和Flink进行数据处理和分析

    作为一名人工智能专家,程序员和软件架构师,我经常涉及到数据处理和分析。在当前大数据和云计算的时代,分布式计算已经成为了一个重要的技术方向。Spark和Flink是当前比较流行的分布式计算框架,它们提供了强大的分布式计算和数据分析功能,为数据处理和分析提供了

    2024年02月16日
    浏览(48)
  • Spark单机伪分布式环境搭建、完全分布式环境搭建、Spark-on-yarn模式搭建

    Spark单机伪分布式环境搭建、完全分布式环境搭建、Spark-on-yarn模式搭建

    搭建Spark需要先配置好scala环境。三种Spark环境搭建互不关联,都是从零开始搭建。 如果将文章中的配置文件修改内容复制粘贴的话,所有配置文件添加的内容后面的注释记得删除,可能会报错。保险一点删除最好。 上传安装包解压并重命名 rz上传 如果没有安装rz可以使用命

    2024年02月06日
    浏览(54)
  • 【Spark分布式内存计算框架——Spark 基础环境】1. Spark框架概述

    【Spark分布式内存计算框架——Spark 基础环境】1. Spark框架概述

    第一章 说明 整个Spark 框架分为如下7个部分,总的来说分为Spark 基础环境、Spark 离线分析和Spark实时分析三个大的方面,如下图所示: 第一方面、Spark 基础环境 主要讲述Spark框架安装部署及开发运行,如何在本地模式和集群模式运行,使用spark-shell及IDEA开发应用程序,测试及

    2024年02月11日
    浏览(46)
  • spark分布式解压工具

    ​ spark解压缩工具,目前支持tar、gz、zip、bz2、7z压缩格式,默认解压到当前路下,也支持自定义的解压输出路径。另外支持多种提交模式,进行解压任务,可通过自定义配置文件,作为spark任务的资源设定 2.1 使用hadoop的FileSystem类,对tos文件的进行读取、查找、写入等操作

    2024年02月02日
    浏览(49)
  • Spark分布式内存计算框架

    Spark分布式内存计算框架

    目录 一、Spark简介 (一)定义 (二)Spark和MapReduce区别 (三)Spark历史 (四)Spark特点 二、Spark生态系统 三、Spark运行架构 (一)基本概念 (二)架构设计 (三)Spark运行基本流程 四、Spark编程模型 (一)核心数据结构RDD (二)RDD上的操作 (三)RDD的特性 (四)RDD 的持

    2024年02月04日
    浏览(56)
  • 分布式计算MapReduce | Spark实验

    分布式计算MapReduce | Spark实验

    题目1 输入文件为学生成绩信息,包含了必修课与选修课成绩,格式如下: 班级1, 姓名1, 科目1, 必修, 成绩1 br (注: br 为换行符) 班级2, 姓名2, 科目1, 必修, 成绩2 br 班级1, 姓名1, 科目2, 选修, 成绩3 br ………., ………, ………, ………, ……… br 编写两个Hadoop平台上的MapRed

    2024年02月08日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包