🧠 1️⃣ RDD 基础概念
✨ 1.1 RDD 是什么?
RDD(Resilient Distributed Dataset,弹性分布式数据集)是 Spark 的核心抽象,它不仅仅是“分布式数组”,更是对并行计算与容错机制的组合封装:
-
✅ 不可变、只读:每次转换都会创建新 RDD(便于血缘恢复)。
-
✅ 分区化:RDD 被划分为多个 partition,每个 partition 在集群上并行处理(task)。
-
✅ 血缘(Lineage):记录 RDD 之间的转换链路,用于在节点或分区失败后重新计算丢失的数据(无需检查点也能恢复)。
-
✅ 惰性执行:Transformation(转换)是惰性记录计算 DAG;Action(行动)触发物理执行。
-
✅ 算子驱动:通过 Transformation 与 Action 完成数据流和计算。
🧩 1.2 RDD 的 5 个核心组成(源码角度)
-
Partitions(分区列表)
-
决定并行度;每个 partition 对应一个 Task。
-
rdd.getNumPartitions()可查看分区数。
-
-
Compute function(计算函数)
-
每个 partition 的处理逻辑(如何生成该 partition 的数据)。
-
-
Dependencies(依赖)
-
描述父 RDD 与子 RDD 的依赖关系(窄 / 宽)。
-
决定是否会触发 shuffle,从而影响 stage 划分。
-
-
Partitioner(分区器)
-
仅对 KV RDD 有意义(hash / range)。
-
决定相同 key 后续落到哪个 partition(影响 shuffle 结果)。
-
-
Preferred locations(首选位置)
-
例如 HDFS block 的位置信息,用于提升数据局部性(Data Locality)。
-
🏗 1.3 RDD 的创建方式
-
从外部数据源读取(HDFS、S3、Kafka、HBase):
sc.textFile(...)、sc.hadoopFile(...) -
sc.parallelize([...], numSlices):将内存集合并行化 -
从现有 RDD 转换:
map,flatMap,filter,join等 -
从 DataFrame 或 Dataset 转换:
df.rdd(类型信息丢失)
🔁 2️⃣ 依赖(Lineage)与算子模型
🟥 2.1 窄依赖 vs 🟦 宽依赖
窄依赖(Narrow Dependency)
-
每个子分区仅依赖于父 RDD 的少量分区(常是 1 个)。
-
不需要 shuffle,任务可以并行执行且容错时只需重算相关分区。
-
例:
map,filter,flatMap,mapPartitions
宽依赖(Wide Dependency)
-
子分区依赖于父 RDD 的多个分区(通常所有父分区)。
-
触发 shuffle:数据需要跨节点重新分区并写入磁盘或网络传输。
-
例:
reduceByKey,groupByKey,join,distinct,sortByKey
为什么重要? 宽依赖是 stage 划分的边界,也是影响性能的关键(shuffle 成本高)。
🔄 2.2 Combiner / map-side aggregation(本地聚合)
-
reduceByKey内部会在 map 端先做局部合并(combiner),减少网络传输。 -
groupByKey则不会做 map 端聚合(会传输更多数据),因此reduceByKey/aggregateByKey优于groupByKey。
⚙️ 3️⃣ Shuffle 机制(核心)
Shuffle 是 MapReduce 与 Spark 中最昂贵的操作之一。Spark 的 shuffle 流程(Map → Shuffle → Reduce)在实现细节上包含多次排序、spill、磁盘写读、网络传输与合并。
3.1 Map 端(写出阶段)
-
环形缓冲区(in-memory buffer):Map task 的输出先写入内存缓冲区(kv 对)。
-
达到阈值触发 spill(溢写):
-
缓冲区超过
spark.shuffle.memoryFraction(历史)或shuffle.spill阈值时,部分数据被 spill 到磁盘。 -
spill 过程:对 buffer 做按 partition 的排序,然后写出 spill 文件(分区内有序)。
-
-
多个 spill 文件合并(merge):当 map task 结束或达到条件时,合并多个 spill 文件为每个 partition 的输出文件(可做 map-side combine)。
3.2 Reduce 端(读入阶段)
-
Fetch(拉取):Reduce task 根据 partition id 向所有 map task 的 shuffle 服务发起 HTTP 请求拉取对应 partition 数据(并行拉取多个 map 输出片段)。
-
排序与归并:Reduce 将来自不同 map 的数据合并(归并排序),得到按 key 排序或分区内有序的数据,供 reduce function 处理。
-
内存 / 磁盘:如果数据过大,会分块写磁盘再归并(外排序)。
3.3 Shuffle 的实现模式(历史与现代)
-
Sort-based shuffle(默认):map 输出按 key 排序并写文件,reduce 端合并;支持 map-side combine、外排序、减少内存压力。
-
Hash-based shuffle(旧):将数据按 partition 哈希写文件;现代 Spark 优先用 sort-based。
3.4 调优点
-
减少 shuffle 数据量:使用
reduceByKey、使用过滤(filter)早期剪枝、map-side 聚合。 -
增加并行度(
spark.default.parallelism/numPartitions)。 -
调整
spark.shuffle.compress、spark.shuffle.file.buffer、spark.reducer.maxSizeInFlight。 -
使用 Kryo 序列化(更小、更快):
spark.serializer=org.apache.spark.serializer.KryoSerializer。 -
采用广播 join(broadcast join)或 map-side join(小表广播到所有 executors)。
🧪 4️⃣ Serialization(序列化)与内存管理
4.1 Java 序列化 vs Kryo
-
Java Serializer:易用但慢且占空间大(默认)。
-
Kryo Serializer:速度快、占用小,通常推荐(需要注册类以最佳化)。
配置示例:
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.kryo.registrator=com.example.MyRegistrator
4.2 Executor 内存结构(简化)
-
Spark(2.x+)使用统一内存模型:
Unified Memory将执行内存(Shuffle、Sort)与存储内存(cache)合并管理,但会保留策略边界。 -
重要参数:
spark.executor.memory、spark.executor.memoryOverhead、spark.memory.fraction、spark.memory.storageFraction。
📣 5️⃣ 广播变量(Broadcast)与累加器(Accumulator)
广播变量(Broadcast)
-
用于将小量只读数据(如字典、小表)广播到所有 executors,避免 shuffle。
-
API:
sc.broadcast(obj)。 -
用场景:小表 join(broadcast join)、模型参数分发等。
累加器(Accumulator)
-
用于在 executors 上做计数 / 求和,结果由 driver 汇总(类似 MapReduce 的计数器)。
-
API:
sc.longAccumulator("name")。 -
注意:累加器在 task 重试时可能被重复累加 → 需谨慎使用或进行幂等设计。
🔁 6️⃣ 容错机制(Lineage 与 Checkpoint)
6.1 Lineage 重算
-
Spark 不持久化中间结果时,遇到失败会通过血缘链重算丢失的 partition(代价是重复计算)。
-
优点:不需要写入 HDFS,节省 IO。
-
缺点:重算开销大(当依赖链很长时)。
6.2 Checkpoint(检查点)
-
将 RDD 的计算结果写入可靠存储(如 HDFS),截断 lineage,提升容错恢复速度。
-
API:
rdd.checkpoint()+ 配置sc.setCheckpointDir("hdfs://...")。 -
常用于迭代计算或 lineage 很长时(如 GraphX 算法)。
⚡ 7️⃣ DAG、Stage、Task 的细节与调度
7.1 Stage 划分规则(更细)
-
遇到 宽依赖(shuffle)时会将 DAG 划分为新的 Stage。
-
每个 Stage 包含一系列可以并行执行的 Task(基于 partition)。
-
Stage 之间通过 Shuffle 文件传递数据。
7.2 Task 调度与 Data Locality(数据局部性)
Spark 在调度时尽量满足局部性(优先在数据所在节点或机架执行):
-
LOCAL_PREF — 本地进程(最优)
-
NODE_LOCAL — 同节点不同进程
-
NO_PREF — 无偏好
-
RACK_LOCAL — 同机架
-
ANY — 任意节点(最差,可能跨机架网络传输)
如果长时间无法满足高局部性,Spark 会降级到更低层次的 locality 去避免任务饥饿。
7.3 Speculative Execution(推测执行)
-
当某些 task 过慢(straggler)时,Spark 可启动备份 task 在其他节点运行,完成后保留最快结果。
-
开关:
spark.speculation。 -
能改善尾延迟,但会浪费资源。
🧭 8️⃣ 常见性能调优(实用清单)
Shuffle 优化
-
减少 shuffle:
reduceByKey而不是groupByKey。 -
合理设置分区数:
rdd.reduceByKey(func, numPartitions=...)。 -
调整
spark.sql.shuffle.partitions(Spark SQL)或spark.default.parallelism。 -
开启 shuffle 压缩:
spark.shuffle.compress=true。
内存与序列化
-
使用 Kryo:
spark.serializer=KryoSerializer。 -
适当增加
spark.executor.memory与memoryOverhead。 -
合理设置
spark.memory.fraction与spark.memory.storageFraction。
I/O 与文件
-
输出合并小文件(避免大量小文件影响后续 jobs)。
-
使用列式格式(Parquet/ORC)配合列裁剪、Predicate Pushdown。
Join 优化
-
使用 broadcast join:
broadcast(small_df)或spark.conf.set("spark.sql.autoBroadcastJoinThreshold", ...)。 -
使用
salting或skew处理数据倾斜。 -
使用 Bucketing 或 Repartition + Sort for large joins。
数据倾斜
-
检测 skew key(high frequency key)。
-
对 heavy key 做特殊处理:
map-side aggregation + random prefix→ second stage remove prefix (二阶段聚合)。 -
自定义分区器或 Bloom filter 进行前置过滤。
✅ 9️⃣ 最佳实践与常见陷阱
最佳实践
-
避免 collect() 大数据回 driver(OOM 风险)。
-
早期过滤(filter)与列裁剪。
-
合理划分分区 & 数据倾斜检测。
-
用 broadcast join 处理小表。
-
使用列式存储 parity(Parquet/ORC)与合适压缩。
-
监控 Spark UI(Stage/Task/Shuffle)找瓶颈。
常见坑
-
使用
groupByKey导致网络与内存爆炸。 -
忽略序列化导致 GC 或长序列化时间。
-
多次重复计算没有 cache/persist。
-
broadcast 小表过大超出 executor 内存。
-
shuffle 过多导致磁盘、网络 I/O 成为瓶颈。
📘 10️⃣ 面试题与要点(速记)
-
RDD 是什么?与 DataFrame 有何区别?
-
RDD:低级、强类型、操作灵活;DataFrame:优化(Catalyst)、列式存储、性能更好。
-
-
什么是窄依赖和宽依赖?举例并说明对性能的影响。
-
Shuffle 的完整流程?map 端和 reduce 端分别做了什么?
-
如何避免 / 优化 Shuffle?
-
Kryo 序列化为什么比 Java 序列化好?如何使用?
-
为什么要 checkpoint?lineage 的优缺点是什么?
-
Spark 的内存管理模型有哪些关键参数?如何调整?
-
如何处理 Join 的数据倾斜?
🧾 11️⃣ 示例(完整流程演示并注释)
from pyspark import SparkContext
sc = SparkContext("local[4]", "Demo")
# 1. 读取
rdd = sc.textFile("hdfs:///data/logs/*") # partitions ≈ block count
# 2. 预处理 + 过滤(早剪枝)
words = rdd.flatMap(lambda line: line.split()).filter(lambda w: len(w) > 0)
# 3. map -> reduceByKey(map-side aggregate 减少网络)
pairs = words.map(lambda w: (w, 1))
counts = pairs.reduceByKey(lambda a, b: a + b, numPartitions=200)
# 4. cache(如果后面会多次复用 counts)
counts.persist(storageLevel=... )
# 5. collect(仅示例 - 小数据)
print(counts.take(10))
-
关键点:
flatMap(窄依赖)不会触发 shuffle;reduceByKey(宽依赖)触发 shuffle,但在 map 端做了 combine;numPartitions决定 reduce 并行度。
🔚 12️⃣ 总结(一句话)
Spark 的强大在于 RDD+DAG+ 内存计算 的组合:以 RDD 表示数据、用 DAG 描述计算、通过优化的执行与 shuffle 管理在分布式集群上高效完成大规模数据计算。理解 shuffle、序列化、内存与分区策略,是成为 Spark 调优与架构专家的关键。