Loading......

文章背景图

🧠 Spark 计算模型详解(RDD & 算子 & DAG)

2025-11-25
11
-
- 分钟

🧠 1️⃣ RDD 基础概念

✨ 1.1 RDD 是什么?

RDD(Resilient Distributed Dataset,弹性分布式数据集)是 Spark 的核心抽象,它不仅仅是“分布式数组”,更是对并行计算与容错机制的组合封装:

  • 不可变、只读:每次转换都会创建新 RDD(便于血缘恢复)。

  • 分区化:RDD 被划分为多个 partition,每个 partition 在集群上并行处理(task)。

  • 血缘(Lineage):记录 RDD 之间的转换链路,用于在节点或分区失败后重新计算丢失的数据(无需检查点也能恢复)。

  • 惰性执行:Transformation(转换)是惰性记录计算 DAG;Action(行动)触发物理执行。

  • 算子驱动:通过 Transformation 与 Action 完成数据流和计算。


🧩 1.2 RDD 的 5 个核心组成(源码角度)

  1. Partitions(分区列表)

    • 决定并行度;每个 partition 对应一个 Task。

    • rdd.getNumPartitions() 可查看分区数。

  2. Compute function(计算函数)

    • 每个 partition 的处理逻辑(如何生成该 partition 的数据)。

  3. Dependencies(依赖)

    • 描述父 RDD 与子 RDD 的依赖关系(窄 / 宽)。

    • 决定是否会触发 shuffle,从而影响 stage 划分。

  4. Partitioner(分区器)

    • 仅对 KV RDD 有意义(hash / range)。

    • 决定相同 key 后续落到哪个 partition(影响 shuffle 结果)。

  5. Preferred locations(首选位置)

    • 例如 HDFS block 的位置信息,用于提升数据局部性(Data Locality)。


🏗 1.3 RDD 的创建方式

  • 从外部数据源读取(HDFS、S3、Kafka、HBase):sc.textFile(...)sc.hadoopFile(...)

  • sc.parallelize([...], numSlices):将内存集合并行化

  • 从现有 RDD 转换:map, flatMap, filter, join

  • 从 DataFrame 或 Dataset 转换:df.rdd(类型信息丢失)


🔁 2️⃣ 依赖(Lineage)与算子模型

🟥 2.1 窄依赖 vs 🟦 宽依赖

窄依赖(Narrow Dependency)

  • 每个子分区仅依赖于父 RDD 的少量分区(常是 1 个)。

  • 不需要 shuffle,任务可以并行执行且容错时只需重算相关分区。

  • 例:map, filter, flatMap, mapPartitions

宽依赖(Wide Dependency)

  • 子分区依赖于父 RDD 的多个分区(通常所有父分区)。

  • 触发 shuffle:数据需要跨节点重新分区并写入磁盘或网络传输。

  • 例:reduceByKey, groupByKey, join, distinct, sortByKey

为什么重要? 宽依赖是 stage 划分的边界,也是影响性能的关键(shuffle 成本高)。


🔄 2.2 Combiner / map-side aggregation(本地聚合)

  • reduceByKey 内部会在 map 端先做局部合并(combiner),减少网络传输。

  • groupByKey 则不会做 map 端聚合(会传输更多数据),因此 reduceByKey / aggregateByKey 优于 groupByKey


⚙️ 3️⃣ Shuffle 机制(核心)

Shuffle 是 MapReduce 与 Spark 中最昂贵的操作之一。Spark 的 shuffle 流程(Map → Shuffle → Reduce)在实现细节上包含多次排序、spill、磁盘写读、网络传输与合并。

3.1 Map 端(写出阶段)

  1. 环形缓冲区(in-memory buffer):Map task 的输出先写入内存缓冲区(kv 对)。

  2. 达到阈值触发 spill(溢写)

    • 缓冲区超过 spark.shuffle.memoryFraction(历史)或 shuffle.spill阈值时,部分数据被 spill 到磁盘。

    • spill 过程:对 buffer 做按 partition 的排序,然后写出 spill 文件(分区内有序)。

  3. 多个 spill 文件合并(merge):当 map task 结束或达到条件时,合并多个 spill 文件为每个 partition 的输出文件(可做 map-side combine)。

3.2 Reduce 端(读入阶段)

  1. Fetch(拉取):Reduce task 根据 partition id 向所有 map task 的 shuffle 服务发起 HTTP 请求拉取对应 partition 数据(并行拉取多个 map 输出片段)。

  2. 排序与归并:Reduce 将来自不同 map 的数据合并(归并排序),得到按 key 排序或分区内有序的数据,供 reduce function 处理。

  3. 内存 / 磁盘:如果数据过大,会分块写磁盘再归并(外排序)。

3.3 Shuffle 的实现模式(历史与现代)

  • Sort-based shuffle(默认):map 输出按 key 排序并写文件,reduce 端合并;支持 map-side combine、外排序、减少内存压力。

  • Hash-based shuffle(旧):将数据按 partition 哈希写文件;现代 Spark 优先用 sort-based。

3.4 调优点

  • 减少 shuffle 数据量:使用 reduceByKey、使用过滤(filter)早期剪枝、map-side 聚合。

  • 增加并行度(spark.default.parallelism / numPartitions)。

  • 调整 spark.shuffle.compressspark.shuffle.file.bufferspark.reducer.maxSizeInFlight

  • 使用 Kryo 序列化(更小、更快):spark.serializer=org.apache.spark.serializer.KryoSerializer

  • 采用广播 join(broadcast join)或 map-side join(小表广播到所有 executors)。


🧪 4️⃣ Serialization(序列化)与内存管理

4.1 Java 序列化 vs Kryo

  • Java Serializer:易用但慢且占空间大(默认)。

  • Kryo Serializer:速度快、占用小,通常推荐(需要注册类以最佳化)。
    配置示例:

spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.kryo.registrator=com.example.MyRegistrator

4.2 Executor 内存结构(简化)

  • Spark(2.x+)使用统一内存模型:Unified Memory 将执行内存(Shuffle、Sort)与存储内存(cache)合并管理,但会保留策略边界。

  • 重要参数:spark.executor.memoryspark.executor.memoryOverheadspark.memory.fractionspark.memory.storageFraction


📣 5️⃣ 广播变量(Broadcast)与累加器(Accumulator)

广播变量(Broadcast)

  • 用于将小量只读数据(如字典、小表)广播到所有 executors,避免 shuffle。

  • API:sc.broadcast(obj)

  • 用场景:小表 join(broadcast join)、模型参数分发等。

累加器(Accumulator)

  • 用于在 executors 上做计数 / 求和,结果由 driver 汇总(类似 MapReduce 的计数器)。

  • API:sc.longAccumulator("name")

  • 注意:累加器在 task 重试时可能被重复累加 → 需谨慎使用或进行幂等设计。


🔁 6️⃣ 容错机制(Lineage 与 Checkpoint)

6.1 Lineage 重算

  • Spark 不持久化中间结果时,遇到失败会通过血缘链重算丢失的 partition(代价是重复计算)。

  • 优点:不需要写入 HDFS,节省 IO。

  • 缺点:重算开销大(当依赖链很长时)。

6.2 Checkpoint(检查点)

  • 将 RDD 的计算结果写入可靠存储(如 HDFS),截断 lineage,提升容错恢复速度。

  • API:rdd.checkpoint() + 配置 sc.setCheckpointDir("hdfs://...")

  • 常用于迭代计算或 lineage 很长时(如 GraphX 算法)。


⚡ 7️⃣ DAG、Stage、Task 的细节与调度

7.1 Stage 划分规则(更细)

  • 遇到 宽依赖(shuffle)时会将 DAG 划分为新的 Stage。

  • 每个 Stage 包含一系列可以并行执行的 Task(基于 partition)。

  • Stage 之间通过 Shuffle 文件传递数据。

7.2 Task 调度与 Data Locality(数据局部性)

Spark 在调度时尽量满足局部性(优先在数据所在节点或机架执行):

  • LOCAL_PREF — 本地进程(最优)

  • NODE_LOCAL — 同节点不同进程

  • NO_PREF — 无偏好

  • RACK_LOCAL — 同机架

  • ANY — 任意节点(最差,可能跨机架网络传输)

如果长时间无法满足高局部性,Spark 会降级到更低层次的 locality 去避免任务饥饿。

7.3 Speculative Execution(推测执行)

  • 当某些 task 过慢(straggler)时,Spark 可启动备份 task 在其他节点运行,完成后保留最快结果。

  • 开关:spark.speculation

  • 能改善尾延迟,但会浪费资源。


🧭 8️⃣ 常见性能调优(实用清单)

Shuffle 优化

  • 减少 shuffle:reduceByKey 而不是 groupByKey

  • 合理设置分区数:rdd.reduceByKey(func, numPartitions=...)

  • 调整 spark.sql.shuffle.partitions(Spark SQL)或 spark.default.parallelism

  • 开启 shuffle 压缩:spark.shuffle.compress=true

内存与序列化

  • 使用 Kryo:spark.serializer=KryoSerializer

  • 适当增加 spark.executor.memorymemoryOverhead

  • 合理设置 spark.memory.fractionspark.memory.storageFraction

I/O 与文件

  • 输出合并小文件(避免大量小文件影响后续 jobs)。

  • 使用列式格式(Parquet/ORC)配合列裁剪、Predicate Pushdown。

Join 优化

  • 使用 broadcast join:broadcast(small_df)spark.conf.set("spark.sql.autoBroadcastJoinThreshold", ...)

  • 使用 saltingskew 处理数据倾斜。

  • 使用 Bucketing 或 Repartition + Sort for large joins。

数据倾斜

  • 检测 skew key(high frequency key)。

  • 对 heavy key 做特殊处理:map-side aggregation + random prefix → second stage remove prefix (二阶段聚合)。

  • 自定义分区器或 Bloom filter 进行前置过滤。


✅ 9️⃣ 最佳实践与常见陷阱

最佳实践

  1. 避免 collect() 大数据回 driver(OOM 风险)。

  2. 早期过滤(filter)与列裁剪

  3. 合理划分分区 & 数据倾斜检测

  4. 用 broadcast join 处理小表

  5. 使用列式存储 parity(Parquet/ORC)与合适压缩

  6. 监控 Spark UI(Stage/Task/Shuffle)找瓶颈

常见坑

  • 使用 groupByKey 导致网络与内存爆炸。

  • 忽略序列化导致 GC 或长序列化时间。

  • 多次重复计算没有 cache/persist。

  • broadcast 小表过大超出 executor 内存。

  • shuffle 过多导致磁盘、网络 I/O 成为瓶颈。


📘 10️⃣ 面试题与要点(速记)

  1. RDD 是什么?与 DataFrame 有何区别?

    • RDD:低级、强类型、操作灵活;DataFrame:优化(Catalyst)、列式存储、性能更好。

  2. 什么是窄依赖和宽依赖?举例并说明对性能的影响。

  3. Shuffle 的完整流程?map 端和 reduce 端分别做了什么?

  4. 如何避免 / 优化 Shuffle?

  5. Kryo 序列化为什么比 Java 序列化好?如何使用?

  6. 为什么要 checkpoint?lineage 的优缺点是什么?

  7. Spark 的内存管理模型有哪些关键参数?如何调整?

  8. 如何处理 Join 的数据倾斜?


🧾 11️⃣ 示例(完整流程演示并注释)

from pyspark import SparkContext
sc = SparkContext("local[4]", "Demo")

# 1. 读取
rdd = sc.textFile("hdfs:///data/logs/*")   # partitions ≈ block count

# 2. 预处理 + 过滤(早剪枝)
words = rdd.flatMap(lambda line: line.split()).filter(lambda w: len(w) > 0)

# 3. map -> reduceByKey(map-side aggregate 减少网络)
pairs = words.map(lambda w: (w, 1))
counts = pairs.reduceByKey(lambda a, b: a + b, numPartitions=200)

# 4. cache(如果后面会多次复用 counts)
counts.persist(storageLevel=... )

# 5. collect(仅示例 - 小数据)
print(counts.take(10))
  • 关键点:flatMap(窄依赖)不会触发 shuffle;reduceByKey(宽依赖)触发 shuffle,但在 map 端做了 combine;numPartitions 决定 reduce 并行度。


🔚 12️⃣ 总结(一句话)

Spark 的强大在于 RDD+DAG+ 内存计算 的组合:以 RDD 表示数据、用 DAG 描述计算、通过优化的执行与 shuffle 管理在分布式集群上高效完成大规模数据计算。理解 shuffle、序列化、内存与分区策略,是成为 Spark 调优与架构专家的关键。

评论交流