主要内容:

  • Spark数据倾斜的几种场景以及对应的解决方案

对 Spark/Hadoop 这样的大数据系统来讲,数据量大并不可怕,可怕的是数据倾斜。此时 Spark 作业的性能会比期望差很多。对于数据倾斜,并无一个统一的一劳永逸的方法。更多的时候,是结合数据特点(数据集大小,倾斜 Key 的多少等)综合使用多种方法。

数据倾斜的现象

  • 绝大多数 Task 执行非常快,但是个别 Task 执行很慢。
  • 任务突然报 OOM 异常。

数据倾斜的原理

  • Shuffle 发生时,必须将各个节点上相同的 Key 拉取到同一个节点上,并交由一个 Task 进行处理,比如按照 Key 进行聚合或者 Join 等操作。
  • 如果此时某个 Key 对应的数据量特别大,就会发生数据倾斜。

数据倾斜的影响

  • 任务整体运行缓慢:同一个 Stage 的不同 Partition 可以并行处理,但是有依赖关系的不同 Stage 之间是串行处理。假设某个 Spark 作业分为 Stage0 和 Stage1,且 Stage1依赖 Stage0,则 Stage0 完全处理完之前是不会处理 Stage1。Stage0 可能包含 N 个 Task,这个 N 个 Task 可以并行处理。如果其中 N-1 个 Task 都在10秒内完成,另外一个 Task 却需要10分钟,那该 Stage0 花费的总时间至少是10分钟。即一个Stage所耗费的时间,主要由最慢的那个Task决定。

  • 任务运行失败:当发生数据倾斜时,部分任务处理的数据量过大,可能造成内存不足使得任务失败,并进而引进整个应用失败。

分析定位数据倾斜

步骤一:定位数据倾斜的代码位置

如果是某个 Task 执行慢的情况

  1. 首先,要判断出数据倾斜发生在第几个 Stage 中。通过 yarn-client 模式(直接查看客户端打印的 Log 日志)或者yarn-cluster 模式(查看 Spark Web UI) 。无论是哪种部署方式,都要通过 Spark Web UI 查看 Stage 中 各个 Task 分配的数据量运行时间
  2. 其次,根据 Stage 的划分原理,定位到产生数据倾斜的代码位置。介绍一个相对简单实用的推算方法:只要看到 Spark 代码中出现了一个 Shuffle 类算子或者是 Spark SQL 的 SQL 语句中出现了会导致 Shuffle 的语句(比如group by语句),那么就可以判定,那个地方为界限划分出了前后两个 Stage。数据倾斜只会发生在 Shuffle 过程中。一些常见的可能会触发 Shuffle 的算子:
  • distinct
  • groupByKey
  • reduceByKey
  • aggregateByKey
  • join
  • cogroup
  • repartition

如果是某个 Task 报 OOM:

直接查看 Log 日志的异常栈。一般来说,通过异常栈信息就可以定位到你的代码中哪一行发生了内存溢出。然后在那行代码附近找找,一般也会有 Shuffle 类算子,此时很可能就是这个算子导致了数据倾斜。

步骤二:查看数据倾斜 Key 的分布情况

  • 查看使用表的 Key 的分布:分组统计 Key 的个数。
  • 查看 RDD 或 DataFrame 的分布:在代码中插入查看 Key 分布的代码(可抽样),统计 Key 个数。

了解数据倾斜 Key 的分布情况,为之后选择哪一种技术方案提供依据。

数据倾斜的解决方案

  • 使用 Hive ETL 预处理数据
    • 场景:Hive 表数据本身很不均匀,且业务需要频繁使用 Spark 对 Hive 表执行某个操作。
    • 思路:通过 Hive 进行预处理,在 Spark 中也就不会产生数据倾斜。
  • 过滤少数导致数据倾斜的 Key
    • 场景:导致数据倾斜的 Key 很少,且这部分 Key 对运行结果不会产生影响或影响很小。如空值和、异常值等特殊数据。
    • 思路:计算前直接过滤掉这些 Key。
  • 提高 Shuffle 并行度
    • 场景:Shuffle 并行度设置不当,导致大量不相同的 Key 被分配到同一个 Task 上处理,造成该 Task 所处理的数据远大于其它 Task。
    • 思路:通过调整 Shuffle 时的并行度,使得原本被分配到同一 Task 的不同 Key 发配到不同 Task 上处理,则可降低原 Task 所需处理的数据量,从而缓解数据倾斜问题造成的短板效应。
  • 将 Reduce Join 转为 Map Join
    • 场景:Join 操作中的一个表足够小。
    • 思路:通过 Spark 的 Broadcast 广播机制,将 Reduce 侧 Join 转化为 Map 侧 Join,避免 Shuffle 从而完全消除Shuffle带来的数据倾斜。普通的 Join 是会走 Shuffle 过程的,而一旦 Shuffle,就相当于会将相同 Key 的数据拉取到一个 Shuffle Read Task 中再进行 Join,此时就是 Reduce join。但是如果一个 RDD 或表是比较小的,则可以采用广播小 RDD 或表的全量数据+ Map 算子来实现与 Join 同样的效果,也就是 Map Join,此时就不会发生 Shuffle 操作,也就不会发生数据倾斜。
  • 采样倾斜的 Key,并分拆 Join
    • 场景:Join 操作中的两个表数据量都比较大(T1 和 T2),且其中一个表(T1)的少数几个 Key 的数据量过大,而另一个表中(T2)的所有 Key 都分布比较均匀。
    • 思路:
      • 从 T1 筛选出数据量最大的几个 Key,单独形成一个表 T11,并给每个 Key 都打上 n 以内的随机数作为前缀。不会导致倾斜的大部分 Key 形成另外一个表 T12。
      • 从T2 筛选出那几个 Key,单独形成一个表 T21,将每条数据膨胀成 n 条数据,这 n 条数据都按顺序附加一个0~n的前缀。不会导致倾斜的大部分key也形成另外一个表 T22。
      • 将附加了随机前缀的表 T11 与另一个膨胀 n 倍的表 T21 进行 Join,此时就可以将原先相同的 Key 打散成 n 份,分散到多个 Task中去进行 Join了。
      • 另外两个普通的表,T12 和 T22 就照常 Join 即可。
      • 将两次 Join 的结果使用 union 算子合并起来即可,就是最终的 Join 结果。
  • 使用随机前缀和扩容表
    • 场景:如果在进行 Join 操作时,表中有大量的 Key 导致数据倾斜。
    • 思路:同上一种方案:将一个有倾斜 Key 的表的每条数据打上一个 n 以内的随机前缀,同时对另一个表进行扩容,最后将两个处理后的表进行 Join 即可。上一种方案是尽量只对少数倾斜 Key 对应的数据进行特殊处理,由于处理过程需要扩容,因此上一种方案扩容后对内存的占用并不大;而这一种方案是针对有大量倾斜 Key 的情况,没法将部分 Key 拆分出来进行单独处理,因此只能对整个表进行数据扩容,对内存资源要求很高。
  • 两阶段聚合(局部聚合+全局聚合)
    • 场景:对 RDD 执行 reduceByKey 等聚合类 Shuffle 算子或者在 Spark SQL 中使用 group by 语句进行分组聚合时,比较适用这种方案。仅仅适用于聚合类的 Shuffle 操作,适用范围相对较窄。如果是 Join 类的 Shuffle 操作,还得用其他的解决方案。
    • 思路:
      • 给每个 Key 都打上一个随机数。
      • 对打上随机数后的数据,执行 reduceByKey 等聚合操作,进行局部聚合。
      • 将各个 Key 的前缀给去掉,再次进行全局聚合操作。

相关阅读:

  1. Spark性能优化指南——高级篇
  2. Spark性能优化之道——解决Spark数据倾斜(Data Skew)的N种姿势