鉴于 Spark 基于内存计算这一天性,以下集群资源可能会造成 Spark 程序的瓶颈:
CPU,带宽和内存。通常情况下,如果内存足够的情况下,瓶颈只可能出现在网络带
宽方面;但有时,你也需要做一些例如序列化优化来降低内存使用率。这份指导主要
集中于两方面:数据序列化,这是充分提升网络表现和降低内存消耗、内存优化的关
键;我们也会简要阐述一些小技巧。
数据序列化
序列化在任何分布式应用的运行中扮演了重要的角色。采用那些序列化慢的格式、或
者消费巨量字节时将会严重拖慢计算效率。通常情况下,调整数据的序列化方式是你
优化 Spark 程序时首先需要做的事。Spark 程序试图在简洁(循序你在代码中使用任
何 Java 的数据类型)和效率之间取得一种平衡。Spark 提供了两种序列化库。
Java
serialization:默认情况下,Spark 序列话一个对象时使用 Java 自带的
ObjectOutputStream 框架,对于任何实现了 java.io.Serializable 接口的类都
有效。有也可以通过继承 java.io.Externalizable 来自定义你的序列化过程。
Java serialization 是灵活的,但通常相当缓慢并且导致很多类的序列化格式很
臃肿。
·
Kryo
serialization:
Spark 也可以使用更快的序列化类库 Kryo
library
(version 2)来序列化对象。相比 Java serialization,Kryo 具有更快和更加紧凑
(通常提供 10 倍于 Java 序列化的效率)的优势。但对于所有可序列化的类型
不是全部都支持,因此为了更好的效率,你需要提前为你的程序注册这些类。
又可以通过设置初始化时的 SparkConf 和调用
conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")来切换到
Kryo 模式。这项配置不仅会在工作节点进行数据混洗时用到 Kryo 序列化,而且在将
RDD 序列化到硬盘时也会使用到 kryo。Kryo 之所以没有成为默认设置是因为使用者
需要自行注册一些类,但是我们建议在一些网络密集型应用中尝试使用 kryo 序列化。
从 Spark2.0.0 开始,当对于简单类型,简单类型数组和字符串类型的 RDD 进行混洗
时,Spark 已经使用 Kryo 进行了内部整合。
Spark 已经自动对很多常用的核心 Scala 类(包含于 AllScalaRegistrar,位于
Twitter chill library)进行了 Kryo 序列化注册。
要使自定义类应用 Kryo 注册,你需要 registerKryoClasses 方法:
[java] view plain copy
1. SparkConf sparkConf = new SparkConf().setAppName("Kryo");
2. Class<?>[] classs={MyClass.class,YourClass.class};
3. sparkConf.registerKryoClasses(classs);
4. JavaSparkContext sc = new JavaSparkContext(sparkConf);
kryo 文档描述了进阶注册选项,例如添加自定义序列化编码。
如果你的对象很大,你可能需要增加 spark.kryoserializer.buffer。这个值需要足够
大,以至于能够容纳下被序列化的最大的对象。
如果你不注册你的自定义类,Kryo 仍然会执行下去,但它将会存储每个对象的类全
名,这真是一种浪费。
内存优化
在优化内存使用率时主要有三方面可考虑的因素:你创建的对象占用的内存量(你也
许想将整个数据据装进内存),访问这些数据的代价和累计回收的开销(当你的对象
在内存中具有较高轮换率时)
默认情况下,访问 Java 对象是快速的。但一不留神就会消耗 2 到 5 倍的空间用于存储
对象中的原始属性变量,这主要是出于以下原因:
·
每个不同的 Java 对象拥有一个“对象头”,这个“对象头”占用 16 字节,包
含指向所属类型的指针信息。对于一个包含很少数据的对象(例如一个 Int 属
性),这些“对象头”信息占用的内存空间可能比数据本身更大。
Java 的 String 对象包含将近 40 字节的开销用于描述这些原始字符串数据(因
为他们将其存储与一个字符数组并保存了额外的信息,例如字符串长度),同
时每个字符使用两个字节存储(UTF-16)。因此一个 10 个字符的 String 对
象,轻轻松松就能消耗 60 字节的空间。
·
常见的集合类,例如 HashMap 和 LinkedList,使用了链式结构,针对每个实
体(Map.Entry)都对应一个包装对象。这个对象不仅包含了“头信息”,而
且存储了指向下一个对象的指针。
原始基本数据类型的集合对象在存储每一个基本类型时还是用了包装类对象,
例如 java.lang.Integer
这一部分将首先简要概述一下 Spark 的内存管理,然后列举一些特殊的策略,来帮助
你在优化你的应用时采取更高效的方式。我们将着重描述如何确定对象的内存占用和
如何改变数据结构和序列化方式来降低内存占用。然后,我们会介绍如何优化 Spark
的缓存大小和 Java 垃圾回收。
内存管理概览
Spark 的内存使用大致可划分两类:执行和存储。执行存储指的是计算(shuffles,
join,sorts 和 aggregations)时用的内存,而存储内存指的是用于缓存和在集群内
部传播的数据。在 Spark 中,执行和存储共享统一的区域(M)。当执行模块没有占
用内存时,存储模块可以获取全部内存(统一区域),反之亦然。执行模块可以驱逐
存储模块,当且仅当全部内存使用落到某个设定的阈值时(R)。换句话说,R 从 M
划分出一个亚区,这个亚区的缓存不可被驱逐。出于实现的复杂性的考虑,存储模块
无法驱逐执行模块。
这个设计确保了一些吸引人的特性。1、如果应用不使用缓存的话,计算模块可以使用
整个内存空间进行计算,排除不必要的硬盘溢写。2、如果一个应用可以通过 R 预定一
个最低的存储空间用于缓存,那这些缓存对于驱逐是免疫的。3、这提供了可靠的开箱
即用的方式来应对不同的工作,即便你不是一个对内部存储分配了如指掌的专家。
尽管 Spark 提供了两个相关的配置项,但大部分用户并不需要去调整它们,因为配置
项的默认值已经足够应对大多数工作任务。
·
spark.memory.fraction:代表了上文中的 M,表示内存占用(JVM
heap
space-300MB)比率(默认值 0.6)。剩余的 40%的空间主要用来存储数据
结构、内部元数据并预防由稀疏、大记录引发 OOM。
spark.memory.storageFraction:表示上文提到的 R,表示从 M 中划分出 R
大小的一个区域(默认值 0.5),这个被划分出的区域中的缓存数据块对于计
算模块的驱逐是免疫的。
spark.memory.fraction 值的配置应当使得 JVM 中的堆内存与老代和永久代的空间相
协调。具体配置见下文 GC 优化调整细节。
判断内存消耗
判断一个数据集到底消耗多少内存的最佳方式是:将数据集加载到 RDD 并将其缓存下
来,然后去 Spark Web UI 查看“Storage”页面。这个页面将告诉你,你的 RDD 正
在申请多大的内存。
要预估某个指定对象的内存消耗时,请使用 SizeEstimator 的 estimate 方法,这是对
于哪些想试验一下如何通过改变数据类型来消减内存和判断某个广播变量将在每个执
行器申请多大内存的朋友来说是个好工具。
优化数据结构
降低内存消耗的首要方法就是避免使用添加额外开销的 Java 特征,例如基于指针的数
据结构和包装对象。具体小贴士如下:
1. 使用对象数组和原始类型来构造你的数据结构,而不是使用标准的 Java 和
Scala 集合类(例如 HashMap)。fastutil 库提供了针对原始类型的便捷的集
合类,这些类兼容 Java 标准库。
2. 避免使用包含过多小对象和指针的嵌套结构。
3. 考虑使用数字和枚举对象代替字符串作为键值。
4. 如果你使用的随机内存少于 32G,设置 JVM 的标志-
XX:+UseCompressedOops 来使引用只占用 4 字节而不是 8 字节。同学你可
以在 spark-env.sh 中添加这个配置项哦
序列化 RDD 存储
当你的对象太大以至于以上优化均被无视的情况下,有一个副更简单的药可以拯救你
的对象,那就是将它存储为序列化格式来降低内存使用。通过使用序列化级别来将
RDD 持久化,例如 MEMORY_ONLY_SER。随后,Spark 将 RDD 的每个分区存储成
一个个字节数组。这粒药丸只有一个副作用,那就是访问这些序列化的数据是需要多
耗费些时间,因为在读取前需要先反序列化这些数据。如果你觉得你的 Spark 程序需
要吃药的话,我们强烈建议你使用 Kryo 这一序列化格式来缓存你的数据,因为相比
Java 自带的序列化方式,Kryo 可以让你的对象更瘦(这就是抽脂和整容流行的原
因)。
垃圾回收优化
当你的程序存储的 RDD 需要频繁轮换时,JVM 垃圾回收可能会出现问题。(当对一
个 RDD 仅读取一次,然后在其上进行多次操作时并不会带来问题)当 Java 需要回收
老对象占用的空间时,它将扫描你所有的对象来找到其中不被使用的。需要指出的一
点是,垃圾回收的消耗和你的 Java 对象个数成正比,因此你所应用的数据结构拥有的
对象越少越好(例如使用 int 数组代替 LinkedList)。一个更好的方法是使用序列化
格式来持久化你的对象,如上所述:一旦序列化后,每个 RDD 将只对应一个对象(一
个字节数组)。所以当存在 GC 问题时,在尝试其他技巧前,你首先要做的是使用序
列化的缓存技术。
由于工作节点上任务工作内存和 RDD 缓存之间的冲突也会导致 GC 问题。我们将会讨
论如何分配空间去存储 RDD 缓存来缓解这个问题。
测算 GC 的影响
第一步是收集关于垃圾处理的频率和 GC 消耗时间的统计数据。这个可以通过添加如
下 Java 选项-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps 来实
现。
[java] view plain copy
1. ./bin/spark-submit --name "My app" --master local[4] --
conf spark.eventLog.enabled=false
2.
--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -
XX:+PrintGCTimeStamps" myApp.jar
下次 Spark 启动时,每次 GC 的操作日志将会打印出来。需要注意的是这些日志将出
现在你的集群工作节点上(在工作目录的标准输出文件里),而不是你的驱动程序
里。
GC 进阶优化
为了准备更深入的垃圾回收优化,我们先要理解一些关于 JVM 内存管理的基本知识:
·
Java Heap 空间被分成了两个区域 Young 和 Old。Young 代主要保存短生命
周期的对象,而 Old 代用于保存具有长生命周期的对象。
·
·
Young 代又被进一步划分为三个区[Eden,Survivor1,Survivor2]
简述一下内存碎片整理步骤:当 Eden 满了,一个小型的 GC 被触发,Eden
和 Survivor1 中幸存的仍被使用的对象被复制到 Survivor2。Survivor1 和
Survivor2 区域进行交换。当一个对象生存的时间足够长或者 Survivor2 满
了,它被转移到 Old 代。最终当 Old 空间快满时,一个全面的 GC 被召唤。
GC 优化的目的是使 Spark 保证只有长生存周期的 RDDs 才会被存储在 Old
代,并且 Young 代设计为满足存储短生命周期的对象。
这将帮你避免全面 GC 去收集 Spark 运行期间产生的临时对象。一些实用的小技巧如
下:
·
首先检查 GC 日志中是否有过于频繁的 GC。如果在一个任务完成前,全量 GC
被唤醒了多次,它意味着对于执行任务来说没有分配足够的内存。
如果有太多的小型垃圾收集但全量 GC 出现并不多,给 Eden 分配更多的内存
会很有帮助。你可以为每个任务设置为一个高于其所需内存的值。假设 Eden
代的内存需求量为 E,你将可以设置 Young 代的内存为-Xmn=4/3*E。(这一
设置同样也会导致 survivor 区同时扩张)
·
在 GC 打印的日志中,如果 OldGen 接近满时,可以通过降低
spark.memory.fraction 来减少用于缓存的空间。更好的方式是缓存更少的对
象而不是降低作业执行时间。一个可选的方案是减少 Young 代的规模。如果
你设置了“-Xmn”,可以降低-Xmn。如果没有设置,可以尝试改变 JVM 的
NewRatio 参数。很多 JVM 的 NewRation 默认值是 2,这意味着 Old 代申请
2/3 的堆空间。它的值应在足够大以至可以超过 spark.memory.fraction。
尝试使用 G1GC 垃圾收集选项:-XX:+UseG1GC。当 GC 存在瓶颈时,采用这
一选项在某些情况下可以提升性能。当执行器的堆空间比较大时,提升 G1
region size(-XX:G1HeapRegionSize)是一种重要的选择。
·
·
如果你的任务需要从 HDFS 系统读取数据,可以通过估计 HDFS 文件的大小来
预估任务所需的内存量。需要注意的是解压后的块大小是原大小的 2 到 3 倍。
因此我们需要设置 3 到 4 倍的工作空间用于作业执行,例如 HDFS 的块大小为
128MB,我们需要预估 Eden 的大小为 4*3*128MB。
·
监控在新变化和设置生效后,GC 的频率和耗费的事件。
我们的经验建议是,GC 优化的成效依赖与你的应用和可用内存的多少。网上也有许多
优化策略,但是需要更深的知识基础,例如通过控制全量 GC 发生的频率来降少总开
销。
通过设置 spark.executor.extraJavaOptions 可以实现对执行器中 GC 的优化调整。
其他的考虑
并行度
如果合理分配每个操作的并行度,将极大的发挥集群的优势。Spark 按照文件的大小
自动设置 map 任务数来处理每个文件(当然你可以通过设置 SparkContext.textFile
的可选参数来控制并行程度),并且对于分布式的 reduce 操作,例如 groupByKey
和 reduceByKey 他们使用父 RDD 的最大分区数来设置并行数。你也可以通过传递第
二个参数来控制并行级别(参考 spark.PairRDDFunctions 文档)或者设置启动时的
并行级别来改变默认值(spark.default.parallelism),我们建议你为每个 CPU 分配
2-3 个并行任务。
Reduce 任务的内存分配
有时,你会收到一个 OOM 的错误,因为你的 RDD 超出了内存的大小,这有可能是
因为某个任务的工作集太大造成的,例如 groupByKey 这个 reduce 任务。Spark 的
混洗操作(例如 sortByKey,groupByKey,reduceByKey,join 等)在每个子任务中构造
并维护了一个哈希表,来完成归类操作,通常情况下此表很大。最简单的修复方法是
提升并行程度,以便使每个任务的输入足够小。Spark 可以高效的支持短如 200ms 的
任务,因为它可以跨多任务重用执行器的 JVM,并且它的任务加载和启动开销非常
小,因此你可以放心的增加并行度,甚至可以设置比集群核心总数更多的并行任务
数。
广播大变量
使用 SparkContext 的广播功能极大的减少每个序列化人物的大小,并且降低集群中
任务加载的开销。如果你的任务使用任何来自驱动器的大对象(例如一个静态查找
表),应该考虑将这个大对象加载到广播变量中去。Spark 可以打印序列化的任务的
大小,因此你可以通过查看输出来判断你的作业是否太大了;通常情况下,一个大于
20KB 的对象是值得放进广播变量中来进行优化的。
数据存放位置
数据的存放位置对于 Spark 作业的执行效率具有重要影响。如果数据和操作它的代码
在一起的话,计算将是非常高效的。但是,如果代码和数据是分离的,一方需要移动
到另一方那里。显而易见的是移动代码比移动数据高效的多,因为代码的字节数源小
于数据。Spark 是基于这一常规原则来构建她的数据存放策略的。
数据本地化是使得数据和在它之上的操作距离更近。这里列举了一些存放数据的级
别,这些级别的划分是基于数据存储位置的,按照由近及远的顺序。
·
·
PROCESS_LOCAL:数据和运行的代码同时在 JVM 中。这是最好的存储方
式。
NODE_LOCAL:数据和代码在同一个节点上。实例在同一个节点的 HDFS
中,或者在相同节点的另一个执行器里。这是比 PROCESS_LOCAL 稍慢的级
别,因为数据需要在进程间传递。
·
·
NO_PREF:从任何地方访问数据都很快,没有位置偏好。
RACK_LOCAL:数据和代码存在与同一个机架的服务器中。数据在同一个机架
的不同服务中,因此需要依靠网络来传递这一数据,一般只需经过一个交换
器。
·
ANY:数据存储在同一网络环境,但不在同一机架上。
Spark 程序理想状态是调度所有的子任务都处于最佳的存储策略之下,但这通常只是
理想。
(海量Spark, hadoop资源共享请点击: )