HOME> 威尔士世界杯> 详解RDD基本概念、RDD五大属性

详解RDD基本概念、RDD五大属性

威尔士世界杯 2025-06-05 14:47:20

一、RDD是什么

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。RDD是spark core的底层核心。

Dataset:

RDD 可以不保存具体数据, 只保留创建自己的必备信息, 例如依赖和计算函数;

RDD 也可以缓存起来, 相当于存储具体数据。

Distributed:

RDD 支持分区, 可以运行在集群中。

Resilient:

RDD 支持高效的容错;

RDD 中的数据即可以缓存在内存中, 也可以缓存在磁盘中, 也可以缓存在外部存储中。

1.RDD的特点:

弹性

容错的弹性:数据丢失可以自动恢复;存储的弹性:内存与磁盘的自动切换;计算的弹性:计算出错重试机制;分片的弹性:可根据需要重新分片。分布式:数据存储在集群不同节点上/计算分布式。数据集: RDD封装了计算逻辑,并不保存数据。数据抽象: RDD是一个抽象类,需要子类具体实现。不可变: RDD封装了计算逻辑,是不可以改变的,想要改变,只能产生新的RDD,在新的RDD里面封装计算逻辑。可分区、并行计算。

二、RDD 为什么会出现

在 RDD 出现之前, 当时 MapReduce 是比较主流的, 而 MapReduce 如何执行迭代计算的任务呢?

多个 MapReduce 任务之间没有基于内存的数据共享方式, 只能通过磁盘来进行共享,这种方式明显比较低效。

RDD 如何解决迭代计算非常低效的问题呢?

在 Spark 中, 其实最终 Job3 从逻辑上的计算过程是: Job3 = (Job1.map).filter, 整个过程是共享内存的, 而不需要将中间结果存放在可靠的分布式文件系统中。

这种方式可以在保证容错的前提下, 提供更多的灵活, 更快的执行速度, RDD 在执行迭代型任务时候的表现可以通过下面代码体现:

// 线性回归

val points = sc.textFile(...)

.map(...)

.persist(...)

val w = randomValue

for (i <- 1 to 10000) {

val gradient = points.map(p => p.x * (1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y)

.reduce(_ + _)

w -= gradient

}

在这个例子中, 进行了大致 10000 次迭代, 如果在 MapReduce 中实现, 可能需要运行很多 Job, 每个 Job 之间都要通过 HDFS 共享结果, 谁快谁慢一窥便知。

三、结合案例深入了解RDD

需求:

给定一个网站的访问记录, 俗称 Access log

计算其中出现的独立 IP, 以及其访问的次数

val config = new SparkConf().setAppName("ip_ana").setMaster("local[6]")

val sc = new SparkContext(config)

val result = sc.textFile("dataset/access_log_sample.txt")

.map(item => (item.split(" ")(0), 1))

.filter(item => StringUtils.isNotBlank(item._1))

.reduceByKey((curr, agg) => curr + agg)

.sortBy(item => item._2, false)

.take(10)

result.foreach(item => println(item))

针对这个小案例, 我们问出互相关联但是又方向不同的六个问题:

1.假设要针对整个网站的历史数据进行处理, 量有 1T, 如何处理?

放在集群中, 利用集群多台计算机来并行处理。

2.如何放在集群中运行?

简单来讲, 并行计算就是同时使用多个计算资源解决一个问题, 有如下四个要点

要解决的问题必须可以分解为多个可以并发计算的部分;

每个部分要可以在不同处理器上被同时执行;

需要一个共享内存的机制;

需要一个总体上的协作机制来进行调度。

3.如果放在集群中的话, 可能要对整个计算任务进行分解, 如何分解?

概述:

对于 HDFS 中的文件, 是分为不同的 Block 的;

在进行计算的时候, 就可以按照 Block 来划分, 每一个 Block 对应一个不同的计算单元。

扩展:

RDD 并没有真实的存放数据, 数据是从 HDFS 中读取的, 在计算的过程中读取即可;

RDD 至少是需要可以 分片 的, 因为HDFS中的文件就是分片的, RDD 分片的意义在于表示对源数据集每个分片的计算, RDD 可以分片也意味着 可以并行计算。

4.移动数据不如移动计算是一个基础的优化, 如何做到?

每一个计算单元需要记录其存储单元的位置, 尽量调度过去。

5.在集群中运行, 需要很多节点之间配合, 出错的概率也更高, 出错了怎么办?

RDD1 → RDD2 → RDD3 这个过程中, RDD2 出错了, 有两种办法可以解决:

缓存 RDD2 的数据, 直接恢复 RDD2, 类似 HDFS 的备份机制;

记录 RDD2 的依赖关系, 通过其父级的 RDD 来恢复 RDD2, 这种方式会少很多数据的交互和保存。

如何通过父级 RDD 来恢复?

记录 RDD2 的父亲是 RDD1;

记录 RDD2 的计算函数, 例如记录 RDD2 = RDD1.map(…​), map(…​) 就是计算函数;

当 RDD2 计算出错的时候, 可以通过父级 RDD 和计算函数来恢复 RDD2。

6.假如任务特别复杂, 流程特别长, 有很多 RDD 之间有依赖关系, 如何优化?

上面提到了可以使用依赖关系来进行容错, 但是如果依赖关系特别长的时候, 这种方式其实也比较低效, 这个时候就应该使用另外一种方式, 也就是记录数据集的状态。

在 Spark 中有两个手段可以做到:

缓存

Checkpoint

四、RDD的五大属性

/**

* A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,

* partitioned collection of elements that can be operated on in parallel. This class contains the

* basic operations available on all RDDs, such as `map`, `filter`, and `persist`. In addition,

* [[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value

* pairs, such as `groupByKey` and `join`;

* [[org.apache.spark.rdd.DoubleRDDFunctions]] contains operations available only on RDDs of

* Doubles; and

* [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that

* can be saved as SequenceFiles.

* All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)])

* through implicit.

*

* Internally, each RDD is characterized by five main properties:

*

* - A list of partitions

* - A function for computing each split

* - A list of dependencies on other RDDs

* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)

* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for

* an HDFS file)

*

* All of the scheduling and execution in Spark is done based on these methods, allowing each RDD

* to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for

* reading data from a new storage system) by overriding these functions. Please refer to the

* Spark paper

* for more details on RDD internals.

*/

从上面源码中,可以得到RDD的五大属性:

1.分区列表( a list of partitions)

Spark RDD是被分区的,每一个分区都会被一个计算任务(Task)处理,分区数决定了并行计算的数量,RDD的并行度默认从父RDD传给子RDD。默认情况下,一个HDFS上的数据分片就是一个 partiton,RDD分片数决定了并行计算的力度,可以在创建RDD时指定RDD分片个数(分区)。

如果不指定分区数量,当RDD从集合创建时,则默认分区数量为该程序所分配到的资源的CPU核数(每个Core可以承载2~4个 partition),如果是从HDFS文件创建,默认为文件的 Block数。

/**

* Implemented by subclasses to return the set of partitions in this RDD. This method will only

* be called once, so it is safe to implement a time-consuming computation in it.

*

* The partitions in this array must satisfy the following property:

* `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`

*/

protected def getPartitions: Array[Partition]

2.每一个分区都有一个计算函数( a function for computing each split)

每个分区都会有计算函数, Spark的RDD的计算函数是以分片为基本单位的,每个RDD都会实现 compute函数,对具体的分片进行计算,不需要保存每次计算的结果。RDD中的分片是并行的,所以是分布式并行计算。

有一点非常重要,就是由于RDD有前后依赖关系,遇到宽依赖关系,如reduceByKey等这些操作时划分成 Stage, Stage内部的操作都是通过 Pipeline进行的,在具体处理数据时它会通过 Blockmanager来获取相关的数据,因为具体的 split要从外界读数据,也要把具体的计算结果写入外界,所以用了一个管理器,具体的 split都会映射成 BlockManager的Block,而具体的split会被函数处理,函数处理的具体形式是以任务的形式进行的。

/**

* :: DeveloperApi ::

* Implemented by subclasses to compute a given partition.

*/

@DeveloperApi

def compute(split: Partition, context: TaskContext): Iterator[T]

3.依赖关系( a list of dependencies on other RDDS)

由于RDD每次转换都会生成新的RDD,所以RDD会形成类似流水线一样的前后依赖关系,当然宽依赖就不类似于流水线了,宽依赖后面的RDD具体的数据分片会依赖前面所有的RDD的所有数据分片,这个时候数据分片就不进行内存中的 Pipeline,一般都是跨机器的,因为有前后的依赖关系,所以当有分区的数据丢失时, Spark会通过依赖关系进行重新计算,从而计算出丢失的数据,而不是对RDD所有的分区进行重新计算。

RDD之间的依赖有两种:窄依赖( Narrow Dependency)和宽依赖( Wide Dependency)。

窄依赖(Narrow Dependency)

窄依赖是指每个父RDD的一个Partition最多被子RDD的一个Partition所使用,例如map、filter、union等操作都会产生窄依赖;

对于窄依赖,由于partition依赖关系的确定性,partition的转换处理就可以在同一个线程里完成,这种转换不会引起shuffle操作,速度快!

宽依赖(Wide Dependency)

宽依赖是指一个父RDD的Partition会被多个子RDD的Partition所使用,例如groupByKey、reduceByKey、sortByKey等操作都会产生宽依赖;

这种转换会引起shuffle操作,速度慢!

Shuffle是MapReduce框架中的一个特定的phase,介于Map phase和Reduce phase之间,当Map的输出结果要被Reduce使用时,输出结果需要按key哈希,并且分发到每一个Reducer上去,这个过程就是shuffle。由于shuffle涉及到了磁盘的读写和网络的传输,因此shuffle性能的高低直接影响到了整个程序的运行效率。

/**

* Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only

* be called once, so it is safe to implement a time-consuming computation in it.

*/

protected def getDependencies: Seq[Dependency[_]] = deps

4.key- value数据类型的RDD分区器( a Partitioner for key- alue RDDS)

一个Partitioner,即RDD的分区函数(可选项),Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。

当前Spark中实现了两种类型的分区函数,

基于哈希的HashPartitioner,(key.hashcode % 分区数= 分区号)。它是默认值基于范围的RangePartitioner。

什么会有Partitioner?

只有对于key-value的RDD(RDD[(String, Int)]),并且产生shuffle,才会有Partitioner;非key-value的RDD(RDD[String])的Parititioner的值是None。

Option类型:可以表示有值或者没有值,它有2个子类:

Some:表示封装了值None:表示没有值

/** Optionally overridden by subclasses to specify how they are partitioned. */

@transient val partitioner: Option[Partitioner] = None

5每个分区都有一个优先位置列表,即首选位置( a list of preferred locations to compute each split on)

存储每个切片优先(preferred location)位置的列表。 比如对于一个 HDFS 文件来说, 这个列表保存的就是每个 Partition 所在文件块的位置.。按照“移动数据不如移动计算”的理念, Spark 在进行任务调度的时候, 会尽可能地将计算任务分配到其所要处理数据块的存储位置。

/**

* Optionally overridden by subclasses to specify placement preferences.

*/

protected def getPreferredLocations(split: Partition): Seq[String] = Nil

镐的解释
不灭战神无弹窗,不灭战神最新章节阅读,不灭战神txt全集下载