主要内容:

  • 介绍Join的三种实现方式
  • 介绍Spark中的Join策略

在数据分析领域,Join操作是非常常见的数据处理操作。Spark作为一个统一的大数据处理引擎,提供了丰富的Join场景。

1、Join的实现方式

循环嵌套-Nested Loop Join(NLJ):对于关联的两张表,大表作为外表或者驱动表;小表作为内表或者基表。NLJ是采用“循环嵌套”的方式实现关联。使用两个嵌套的for循环扫描数据,判断关联条件是否满足。外层for循环负责遍历外表中的每一条数据,内层for循环负责遍历内表中的每一条数据,即外表中的一条数据,就会遍历内表中的所有记录。假设外表有M行,内表有N行,那么NLJ算法的时间复杂度是O(M*N)。

排序归并-Sort Merge Join(SMJ):SMJ的思路是先排序,后归并。具体来说,就是参与Join的两张表先分别按照Join Key进行排序,然后使用两个独立的游标对排好序的两张表完成归并关联。开始时,内外表的游标都会先指向各自表的第一行记录,然后对比游标所在记录的Join Key。对比结果以及后续操作主要分为三种情况:

  • 外表 Join Key 等于内表 Join Key,满足关联条件,把两边的数据记录拼接并输出,然后把外表的游标滑动到下一条记录
  • 外表 Join Key 小于内表 Join Key,不满足关联条件,把外表的游标滑动到下一条记录
  • 外表 Join Key 大于内表 Join Key,不满足关联条件,把内表的游标滑动到下一条记录

SMJ对排序的要求比较苛刻。其能具有线性的时间复杂度,仰仗的是两张表已经事先排好序。而排序在大数据领域本身就是一项非常耗时的操作。由于SMJ中内表按Join Key升序排序,且扫描的起始位置为游标所在位置,因此,SMJ算法的时间复杂度是O(M+N)。

哈希-Hash Join(HJ):HJ的设计初衷:借助Hash Map,把内表扫描的时间复杂度降低至O(1)。具体做法:基于内表,根据给定的Hash函数构建哈希表。Hash表中的Key是Join Key应用哈希函数只有的哈希值。然后基于外表中的每一条数据,先使用同样的哈希函数以动态的方式计算Join Key的哈希值,用计算得到的哈希值去查询创建的哈希表。如果查询失败,说明外表中的这条记录与内表中的记录不存在关联关系;如果查询成功,就把两边的记录进行拼接输出。HJ算法的时间复杂度O(M)。

2、Spark中的Join策略-JoinSelection

在 Spark 的物理计划(physical plan)阶段,Spark 的 JoinSelection 类会根据以下几种条件来选择最终的 Join 策略(join strategies):

  • Join hints 策略
  • Join 表的大小
  • Join 是否等值
  • 以及参与 Join 的 key 是否可以排序等

当前 Spark(Apache Spark 3.0)一共支持五种 Join 策略:

2.1 Broadcast Hash Join (BHJ)

基本原理(过程):

  1. 将小表从各Executor端Collect到Driver端;
  2. Driver端调用sparkContext.broadcast将表数据广播到计算的executor端;
  3. 在Executor端将广播的表加载到内存中,大表数据和内存中的Hash小表进行Join。

使用条件与特点:

  • 仅支持等值连接,join key不需要排序。
  • 支持除了全外连接(full outer joins)之外的所有join类型。
  • Broadcast Hash Join相比其他的JOIN机制而言,效率更高。但是,Broadcast Hash Join属于网络密集型的操作(数据冗余传输),除此之外,需要在Driver端缓存数据,所以当小表的数据量较大时,会出现OOM的情况。
  • 被广播的小表的数据量要小于spark.sql.autoBroadcastJoinThreshold值,默认是10MB(10485760)。

2.2 Shuffle Hash Join(SHJ)

基本原理(过程):

  1. 对大表和小表的Join Key使用相同的分区算法和分区数进行分区(Shuffle),保证相同Join Key在同一个分区,方便分区内进行Join;
  2. 将小表加载到内部才能构建Hash Map,本地进行Hash Join。

使用条件与特点:

  • 仅支持等值连接,join key不需要排序。
  • 支持除了全外连接(full outer joins)之外的所有join类型。
  • 需要对小表构建Hash map,属于内存密集型的操作,如果构建Hash表的一侧数据比较大,可能会造成OOM。
  • 参数设置:将参数spark.sql.join.prefersortmergeJoin (default true)置为false。
  • 大表必须是小表的三倍以上。

2.3 Shuffle Sort Merge Join (SMJ)

基本原理(过程):

  1. 两张大表根据Join key进行Shuffle重分区;
  2. 分区后对每个分区内的数据进行排序,排序后再对相应的分区内的记录进行连接,不需要将一个表加载到内存中;
  3. 从头遍历,碰到key相同的就输出,如果不同,左边小就继续取左边,反之取右边。

使用条件与特点:

  • 仅支持等值连接。
  • 支持所有join类型。
  • Join Keys是排序的。
  • 参数设置:参数spark.sql.join.prefersortmergeJoin (默认true)设定为true。
  • 对表的大小没有限制:前两种策略都对小表有大小限制,而这种策略是不限制大小的;

2.4 Cartesian Product Join (CPJ)

基本原理(过程):

  1. 如果 Spark 中两张参与 Join 的表没指定join key(ON 条件)那么会产生 Cartesian product join,这个 Join 得到的结果其实就是两张行数的乘积。

使用条件与特点:

  • 仅支持内连接。
  • 支持等值和不等值连接。
  • 开启参数spark.sql.crossJoin.enabled=true。

2.5 Broadcast Nested Loop Join (BNLJ)

基本原理(过程):

  1. 与Broadcast Hash Join类似,先对小表进行广播,但是不对广播后的小表建立hash表,而是for循环遍历广播表。

使用条件与特点:

  • 支持等值和非等值连接。
  • 支持所有Join类型。
  • 支持所有的JOIN类型,主要优化点如下:
    • 当右外连接时要广播左表。
    • 当左外连接时要广播右表。
    • 当内连接时,要广播左右两张表。

SparkStrategies.scala文件给出了各种Join方式的介绍:

  /**
   * Select the proper physical plan for join based on join strategy hints, the availability of
   * equi-join keys and the sizes of joining relations. Below are the existing join strategies,
   * their characteristics and their limitations.
   *
   * - Broadcast hash join (BHJ):
   *     Only supported for equi-joins, while the join keys do not need to be sortable.
   *     Supported for all join types except full outer joins.
   *     BHJ usually performs faster than the other join algorithms when the broadcast side is
   *     small. However, broadcasting tables is a network-intensive operation and it could cause
   *     OOM or perform badly in some cases, especially when the build/broadcast side is big.
   *
   * - Shuffle hash join:
   *     Only supported for equi-joins, while the join keys do not need to be sortable.
   *     Supported for all join types.
   *     Building hash map from table is a memory-intensive operation and it could cause OOM
   *     when the build side is big.
   *
   * - Shuffle sort merge join (SMJ):
   *     Only supported for equi-joins and the join keys have to be sortable.
   *     Supported for all join types.
   *
   * - Broadcast nested loop join (BNLJ):
   *     Supports both equi-joins and non-equi-joins.
   *     Supports all the join types, but the implementation is optimized for:
   *       1) broadcasting the left side in a right outer join;
   *       2) broadcasting the right side in a left outer, left semi, left anti or existence join;
   *       3) broadcasting either side in an inner-like join.
   *     For other cases, we need to scan the data multiple times, which can be rather slow.
   *
   * - Shuffle-and-replicate nested loop join (a.k.a. cartesian product join):
   *     Supports both equi-joins and non-equi-joins.
   *     Supports only inner like joins.
   */

对于等值Join:

      // If it is an equi-join, we first look at the join hints w.r.t. the following order:
      //   1. broadcast hint: pick broadcast hash join if the join type is supported. If both sides
      //      have the broadcast hints, choose the smaller side (based on stats) to broadcast.
      //   2. sort merge hint: pick sort merge join if join keys are sortable.
      //   3. shuffle hash hint: We pick shuffle hash join if the join type is supported. If both
      //      sides have the shuffle hash hints, choose the smaller side (based on stats) as the
      //      build side.
      //   4. shuffle replicate NL hint: pick cartesian product if join type is inner like.
      //
      // If there is no hint or the hints are not applicable, we follow these rules one by one:
      //   1. Pick broadcast hash join if one side is small enough to broadcast, and the join type
      //      is supported. If both sides are small, choose the smaller side (based on stats)
      //      to broadcast.
      //   2. Pick shuffle hash join if one side is small enough to build local hash map, and is
      //      much smaller than the other side, and `spark.sql.join.preferSortMergeJoin` is false.
      //   3. Pick sort merge join if the join keys are sortable.
      //   4. Pick cartesian product if join type is inner like.
      //   5. Pick broadcast nested loop join as the final solution. It may OOM but we don't have
      //      other choice.

对于非等值Join:

      // If it is not an equi-join, we first look at the join hints w.r.t. the following order:
      //   1. broadcast hint: pick broadcast nested loop join. If both sides have the broadcast
      //      hints, choose the smaller side (based on stats) to broadcast for inner and full joins,
      //      choose the left side for right join, and choose right side for left join.
      //   2. shuffle replicate NL hint: pick cartesian product if join type is inner like.
      //
      // If there is no hint or the hints are not applicable, we follow these rules one by one:
      //   1. Pick broadcast nested loop join if one side is small enough to broadcast. If only left
      //      side is broadcast-able and it's left join, or only right side is broadcast-able and
      //      it's right join, we skip this rule. If both sides are small, broadcasts the smaller
      //      side for inner and full joins, broadcasts the left side for right join, and broadcasts
      //      right side for left join.
      //   2. Pick cartesian product if join type is inner like.
      //   3. Pick broadcast nested loop join as the final solution. It may OOM but we don't have
      //      other choice. It broadcasts the smaller side for inner and full joins, broadcasts the
      //      left side for right join, and broadcasts right side for left join.

相关阅读:

  1. Apache Spark Join Strategies
  2. Spark Join Strategies
  3. Join Hints指南:不同场景下,如何选择Join策略?
  4. Spark中 join的原理
  5. Spark的五种JOIN策略解析