Spark RDD(带源码)

Spark RDD

Spark RDD(Resilient Disttributed Dataset)是整个Spark的核心基础,上层的Spark Streaming,Spark SQL等都依赖于RDD。Spark SQL里的DataFrame在执行时,最终也会转换为一系列的RDD操作。所以理解RDD非常重要,里边包含Spark的容错机制,分区,计算等核心内容。

推荐先看下RDD的论文,resilient distributed datasets a fault-tolerant abstraction for in-memory cluster computing。由于之前关于这篇论文的笔记已经丢失,这次就结合源码做一些大概的总结。如有遗漏或者不正确的地方,欢迎大家指出。

RDD包含内容

实际上RDD包含五个部分:

用户也可以自定义自己的RDD,不过自定义RDD必须实现这些属性和方法

1. 获取分区

/**
 * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
 * be called once, so it is safe to implement a time-consuming computation in it.
 */
protected def getDependencies: Seq[Dependency[_]] = deps

array[partition]必须满足rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }

2.给定分区的计算方法

/**
 * :: DeveloperApi ::
 * Implemented by subclasses to compute a given partition.
 */
@DeveloperApi
def compute(split: Partition, context: TaskContext): Iterator[T]

3.获取lineage依赖关系

/**
 * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
 * be called once, so it is safe to implement a time-consuming computation in it.
 */
protected def getDependencies: Seq[Dependency[_]] = deps

4.(可选)重载获取位置元信息方法

/**
 * Optionally overridden by subclasses to specify placement preferences.
 */
protected def getPreferredLocations(split: Partition): Seq[String] = Nil

5.(可选)重载分区器

/** Optionally overridden by subclasses to specify how they are partitioned. */
@transient val partitioner: Option[Partitioner] = None

RDD通用属性和方法

RDD包含一系列通用属性(sparkContext,RDD id,RDD name等)和操作方法

1. persist和cache方法

spark推荐多多使用cacha或者persist方法来缓存复用的数据集,达到提高运行效率的目的。所以persist和cache也是RDD应该有的一个通用方法

persist最基本的方法persist(newLevel: StorageLevel, allowOverride: Boolean)

/**
 * Mark this RDD for persisting using the specified level.
 *
 * @param newLevel the target storage level
 * @param allowOverride whether to override any existing level with the new one
 * 第一个参数传入缓存级别,第二个参数控制是否支持更新缓存级别
 */
 private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
		...
 }

其余的persist函数,最终都会调用上一个基本方法
persist(newLevel: StorageLevel)

 //实际调用persist(newLevel, allowOverride = false)
 def persist(newLevel: StorageLevel): this.type = {...}

persist(),(persist函数不带参数默认设置缓存级别为MEMORY_ONLY)

 //实际调用persist(StorageLevel.MEMORY_ONLY)
 def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

cache()函数,本质上也是persist函数,只是多了一次包装

 def cache(): this.type = persist()

既然有缓存,那么肯定还有unpersist,移除缓存

/**
 * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
 *
 * @param blocking Whether to block until all blocks are deleted.
 * @return This RDD.
 * 调用sc的unpersistRDD函数,并将该RDD的storageLevel标记为NONE
 */
 def unpersist(blocking: Boolean = true): this.type = {
   logInfo("Removing RDD " + id + " from persistence list")
   sc.unpersistRDD(id, blocking)
   storageLevel = StorageLevel.NONE
   this
 }
Table of Contents