Java-Spark系列10-Spark性能调优概述

一.Spark 性能优化概述

首先笔者能力优先,使用Spark有一段时间,如下是笔者的工作经验的总结。

Spark任务运行图:

Spark的优化思路: 一般是从3个层面进行Spark程序的优化: 1) 运行环境优化 2) RDD算子优化 3) 参数微调

二.运行环境优化

2.1 数据本地性

我们知道HDFS的数据文件存储在不同的datanode,一般数据副本数量是3,因为Spark计算的数据量比较大,如果数据不在本节点,需要通过网络去其它的datanode读取数据。

所以此时我们可以通过提高数据本地性,减少网络传输,来达到性能优化的目的。 1) 计算和存储同节点(executor和HDFS的datanode、hbase的region server同节点) 2) executor数目合适: 如果100个数据界定,3个计算节点,就有97份网络传递,所以此种情况可以适当增加计算节点。 3) 适当增加数据副本数量

2.2 数据存储格式

推荐使用列式存储格式: parquet. parquet存在如下优先: 1) 相同数据类型的数据有很高压缩比 2) Hive主要支持ORC、也支持parquet

三.RDD算子优化

3.1 尽可能复用同一个RDD

每创建一个RDD都会带来性能的开销,尽可能的对同一个RDD做算子操作,而不要频繁创建新的 RDD。

3.2 对多次使用的RDD进行持久化

如果RDD的算子特别多,需要频繁多次操作同一个RDD,最好的办法是将该RDD进行持久化,

四.参数微调

1) num-executors 参数说明:该参数用于设置每个Executor进程的内存。Executor内存的大小,很多时候直接决定了Spark作业的性能,而且跟常见的JVM OOM异常,也有直接的关联。

2) executor-cores 参数说明:该参数用于设置每个Executor进程的CPU core数量。

3) driver-memory 参数说明:该参数用于设置Driver进程的内存。

4) spark.default.parallelism 参数说明:该参数用于设置每个stage的默认task数量。

5) spark.storage.memoryFraction 参数说明:该参数用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6。

6) spark.shuffle.memoryFraction 参数说明:该参数用于设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2。

资源参数参考示例:

./bin/spark-submit \ –master yarn-cluster \ –num-executors 100 \ –executor-memory 6G \ –executor-cores 4 \ –driver-memory 1G \ –conf spark.default.parallelism=1000 \ –conf spark.storage.memoryFraction=0.5 \ –conf spark.shuffle.memoryFraction=0.3 \

五.数据倾斜

绝大多数task执行得都非常快,但个别task执行极慢。比如,总共有1000个task,997个task都在1分钟之内执行完了,但是剩余两三个task却要一两个小时。这种情况很常见。

数据倾斜图例:

解决数据倾斜一般有如下几种常用方法: 1) 使用Hive ETL预处理数据 先使用Hive进行预处理数据,也就是使用Hive先计算一层中间数据,Spark从中间层数据开始计算。

2) 过滤少数导致倾斜的key 如果发生导致倾斜的key非常少,可以将Spark任务拆分为包含 导致倾斜的key的任务和不包含key的任务。

3) sample采样倾斜key单独进行join 通过采样,提前预估会发生数据倾斜的key,然后将一个join拆分为两个join,其中一个不包含该key,一个只包含该key,最后将结果集进行union。

4) 调整并行度 调整Shuffle并行度,数据打散

5) 广播小数据集 适用于一个大表,一个小表 不用join连接操作,而改用Broadcast变量与map模拟join操作,完全规避shuffle操作 spark.sql: spark.sql.autoBroadcastJoinThreshold=104857600

6) 增加随机前缀 对发生倾斜的RDD增加随机前缀 对另外一个RDD等量扩容 如果少量的key发生倾斜,可以先过滤出一个单独的RDD,对另外一个RDD同理吹,join之后再合并

六. Spark常用的调优参数

6.1 在内存中缓存数据

Spark SQL可以通过调用Spark.catalog.cachetable (“tableName”)或DataFrame.cache()来使用内存中的columnar格式缓存表。然后Spark SQL将只扫描所需的列,并自动调优压缩以最小化内存使用和GC压力。你可以调用spark.catalog.uncacheTable(“tableName”)从内存中删除表。

内存缓存的配置可以在SparkSession上使用setConf方法或者使用SQL运行SET key=value命令来完成。

| 参数名| 默认值 | 参数说明 | 启始版本 | |-|-|-|-| | spark.sql.inMemoryColumnarStorage.compressed | true | 当设置为true时,Spark SQL会根据数据统计自动为每列选择压缩编解码器。 | 1.0.1 | | spark.sql.inMemoryColumnarStorage.batchSize | 10000 | 控制柱状缓存的批大小。更大的批处理大小可以提高内存利用率和压缩,但在缓存数据时可能会带来OOMs风险。 | 1.1.1 |

6.2 其它配置项

还可以使用以下选项调优查询执行的性能。随着更多的优化被自动执行,这些选项可能会在未来的版本中被弃用。 | 参数名| 默认值 | 参数说明 | 启始版本 | |-|-|-|-| | spark.sql.files.maxPartitionBytes | 134217728 (128 MB) | 读取文件时装入单个分区的最大字节数。此配置仅在使用基于文件的源(如Parquet、JSON和ORC)时有效。 | 2.0.0 | | spark.sql.files.openCostInBytes |4194304 (4 MB) | 打开一个文件的估计成本,由可以在同一时间扫描的字节数来衡量。当将多个文件放入一个分区时使用。最好是高估,那么带有小文件的分区将比带有大文件的分区更快(这是首先安排的)。此配置仅在使用基于文件的源(如Parquet、JSON和ORC)时有效。| 2.0.0 | | spark.sql.files.minPartitionNum | Default Parallelism | 建议的(不是保证的)最小分割文件分区数。如果没有设置,默认值是 spark.default.parallelism 。此配置仅在使用基于文件的源(如Parquet、JSON和ORC)时有效。 | 3.1.0 | | spark.sql.broadcastTimeout | 300 | broadcast join 等待时间的超时(秒) | 1.3.0 | | spark.sql.autoBroadcastJoinThreshold | 10485760 (10 MB) |配置在执行联接时将广播到所有工作节点的表的最大字节大小。通过将此值设置为-1,可以禁用广播。注意:目前统计只支持运行ANALYZE TABLE COMPUTE statistics noscan命令的Hive Metastore表。 | 1.1.0 | | spark.sql.shuffle.partitions | 200 | 配置将数据变换为连接或聚合时要使用的分区数量。 | 1.1.0| | spark.sql.sources.parallelPartitionDiscovery.threshold | 32 | 配置阈值以启用作业输入路径的并行列出。如果输入路径数大于该阈值,Spark将通过Spark分布式作业列出文件。否则,它将退回到顺序列表。此配置仅在使用基于文件的数据源(如Parquet、ORC和JSON)时有效。 | 1.5.0 | | spark.sql.sources.parallelPartitionDiscovery.parallelism | 10000 | 配置作业输入路径的最大列出并行度。如果输入路径的数量大于这个值,它将被降低到使用这个值。与上面一样,此配置仅在使用基于文件的数据源(如Parquet、ORC和JSON)时有效。 | 2.1.1 |

6.3 SQL查询连接的hint

join策略提示BROADCAST、MERGE、SHUFFLE_HASH和SHUFFLE_REPLICATE_NL,在将指定的关系加入到另一个关系时,指示Spark对每个指定的关系使用暗示策略。例如,在表 t1 上使用BROADCAST提示时,广播加入(广播散列连接或广播嵌套循环联接取决于是否有等值连接键)与t1的构建方面将由火花即使大小的优先表t1的建议的统计配置spark.sql.autoBroadcastJoinThreshold之上。

当连接两端指定了不同的连接策略提示时,Spark会优先考虑BROADCAST提示而不是MERGE提示,优先考虑SHUFFLE_HASH提示而不是SHUFFLE_REPLICATE_NL提示。当双方都指定了BROADCAST提示或SHUFFLE_HASH提示时,Spark将根据连接类型和关系的大小选择构建端。

请注意,不能保证Spark会选择提示中指定的连接策略,因为特定的策略可能不支持所有的连接类型。

— We accept BROADCAST, BROADCASTJOIN and MAPJOIN for broadcast hintSELECT /*+ BROADCAST(r) */ * FROM records r JOIN src s ON r.key = s.key

Coalesce hint允许Spark SQL用户控制输出文件的数量,就像Dataset API中的Coalesce、repartition和repartitionByRange一样,它们可以用于性能调优和减少输出文件的数量。COALESCE hint只有一个分区号作为参数。“REPARTITION”提示有一个分区号、列或它们都作为参数。“REPARTITION_BY_RANGE”提示必须有列名,分区号是可选的。

SELECT /*+ COALESCE(3) */ * FROM t SELECT /*+ REPARTITION(3) */ * FROM t SELECT /*+ REPARTITION(c) */ * FROM t SELECT /*+ REPARTITION(3, c) */ * FROM t SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t SELECT /*+ REPARTITION_BY_RANGE(3, c) */ * FROM t

6.4 自适应查询执行

Adaptive Query Execution (AQE)是Spark SQL中的一种优化技术,它利用运行时统计信息来选择最高效的查询执行计划。默认情况下AQE是禁用的。Spark SQL可以使用Spark.SQL.adaptive.enabled的伞配置来控制是否打开/关闭。从Spark 3.0开始,AQE中有三个主要特性,包括合并shuffle后分区、将排序合并连接转换为广播连接以及倾斜连接优化。

6.5 合并分区后重新组合

当spark.sql.adaptive.enabled和spark.sql.adaptive.coalescePartitions.enabled配置都为true时,该特性根据map输出统计信息来合并post shuffle分区。这个特性简化了运行查询时shuffle分区号的调优。您不需要设置合适的shuffle分区号来适合您的数据集。一旦您通过Spark .sql. adaptive.coalescepartitions . initialpartitionnum配置设置了足够大的初始shuffle分区数,Spark就可以在运行时选择适当的shuffle分区号。

| 参数名| 默认值 | 参数说明 | 启始版本 | |-|-|-|-| | spark.sql.adaptive.coalescePartitions.enabled | true | 当true和Spark .sql. adaptive_enabled为true时,Spark会根据目标大小(由Spark .sql. adaptive_advisorypartitionsizeinbytes指定)合并连续的shuffle分区,以避免过多的小任务。 | 3.0.0 | | spark.sql.adaptive.coalescePartitions.minPartitionNum | Default Parallelism | 合并后的最小洗牌分区数。如果不设置,则默认为Spark集群的默认并行度。此配置仅在spark.sql.http://adaptive.netenabled和spark.sql.http://adaptive.netcoalescepartitions .enabled同时启用时有效。 | 3.0.0 | | spark.sql.adaptive.coalescePartitions.initialPartitionNum | 200 | 合并前的初始shuffle分区数。默认情况下它等于spark.sql.shuffle.partitions。此配置仅在spark.sql.http://adaptive.net enabled和spark.sql. http://adaptive.netcoalescepartitions .enabled同时启用时有效。 | 3.0.0 | | spark.sql.adaptive.advisoryPartitionSizeInBytes | 64 MB| 自适应优化期间shuffle分区的建议大小(当spark.sql. adaptive_enabled为true时)。当Spark对小shuffle分区或斜shuffle分区进行合并时生效。 | 3.0.0 |

6.6 将排序合并联接转换为广播联接

当任何连接侧的运行时统计数据小于广播散列连接阈值时,AQE将排序合并连接转换为广播散列连接。这不是一样有效规划一个广播散列连接首先,但这总比继续做分类合并加入,我们可以节省连接双方的排序,并在本地读取洗牌文件节省网络流量(如果spark.sql.adaptive.localShuffleReader.enabled被设置为true)

6.7 优化倾斜连接

数据倾斜会严重降低连接查询的性能。该特性通过将倾斜任务拆分(如果需要的话还可以复制)为大小大致相同的任务,动态处理排序-合并连接中的倾斜任务。当spark.sql.adaptive.enabled和spark.sql.adaptive.skewJoin.enabled配置同时启用时生效。

| 参数名| 默认值 | 参数说明 | 启始版本 | |-|-|-|-| | spark.sql.adaptive.skewJoin.enabled | true | 当true和Spark .sql.adaptive.enabled为true时,Spark通过拆分(并在需要时复制)倾斜分区来动态处理排序-合并连接中的倾斜。 | 3.0.0 | |spark.sql.adaptive.skewJoin.skewedPartitionFactor | 10 | 如果一个分区的大小大于这个因子乘以中值分区大小,并且大于spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes,则认为该分区是倾斜的。 | 3.0.0| |spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes | 256MB | 如果分区的字节大小大于这个阈值,并且大于spark.sql.adaptive.skewJoin.skewedPartitionFactor乘以分区中值大小,则认为该分区是倾斜的。理想情况下,该配置应该设置为大于spark.sql.adaptive.advisoryPartitionSizeInBytes。 |3.0.0 |

参考:

http://spark.apache.org/docs/latest/rdd-programming-guide.htmlhttps://tech.meituan.com/2016/04/29/spark-tuning-basic.htmlhttps://blog.csdn.net/meihao5/article/details/81084876http://spark.apache.org/docs/latest/sql-performance-tuning.html

    THE END
    喜欢就支持一下吧
    点赞8 分享
    评论 抢沙发
    头像
    欢迎您留下宝贵的见解!
    提交
    头像

    昵称

    取消
    昵称表情代码图片

      暂无评论内容