五-中, Spark 算子吐血总结
5.1.4.3 RDD 转换算子(Transformation)
什么是算子?
在流处理、交互式查询中有个常用的概念是“算子
”,在英文中被成为“Operation
”,在数学上可以解释为一个函数空间到另一个函数空间上的映射O:X->X
,其实就是一个处理单元,往往是指一个函数,在使用算子时往往会有输入和输出,算子则完成相应数据的转化,比如:Group、Sort等都是算子。
从大方向来说, Spark算子(RDD方法)大致可以分为以下两类:
- Transformation 变换/转换算子 : 这种变换并不触发提交作业, 而是
完成作业中间过程处理
; Transformation 操作是延迟计算
的, 也就是说从一个RDD转换为另一个RDD的转换操作不是马上执行, 需要等到有Action操作(行动算子)的时候才会真正触发运算;
- Action 行动算子: 这类算子会
触发SparkContext 提交Job作业
, 并将数据输出到Spark系统;
从小方向来说, Spark算子大致分为三类:
Value数据类型
的Transformation算子, 这种变换并不触发提交作业, 针对处理的数据项是Value型的数据;
Key-Value数据类型
的Transformation算子, 这种变换并不触发提交作业, 针对处理的数据项是Key-Value型的数据对;
Action
算子, 这类算子会触发SparkContext提交Job作业;
1. Value类型
1.1 map
函数签名 |
函数说明 |
def map[U: ClassTag](f: T => U): RDD[U] |
将待处理的数据逐条进行映射转换, 这里的转换可以是类型的转换, 也可以是值的转换 |
|
其实就是Scala集合函数中的map((数据变量:数据类型) => {对每一条数据的映射操作}) |
val dataRDD: RDD[Int] = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD1: RDD[Int] = dataRDD.map(num => {
num * 2
})
val dataRDD2: RDD[String] = dataRDD1.map(num => {
"" num
})
map的并行计算🌰:
// 1. rdd的计算一个分区内的数据是一个一个执行逻辑
// 只有前面一个数据全部的逻辑执行完毕后,才会执行下一个数据。
// 分区内数据的执行是有序的。
// 2. 不同分区数据计算是无序的。
1.2 mapPartitions
函数签名 |
函数说明 |
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U],preservesPartitioning: Boolean = false): RDD[U] |
将待处理的数据以分区为单位 发送到计算结点进行处理, 这里的处理是指可以进行任意的处理, 哪怕是过滤数据 |
|
mapPartition, 传递一个迭代器, 返回一个迭代器 |
|
什么, 你的不是迭代器怎么办? 用List包装, 再获取集合的迭代器即可 |
栗子: 获取每个数据分区的最大值
def main(args: Array[String]): Unit = {
//1. 新建配置文件对象
val conf: SparkConf = new SparkConf()
conf.setMaster("local[*]")
conf.setAppName("memoryRDD")
//2. 新建sparkContext
val sc: SparkContext = new SparkContext(conf)
//3. 读取文件
val rdd= sc.makeRDD(List(1,2,3,4,5,6,7,8,9), 5)
//由上面讲过的内存中数据的分区规则
/**分区号: 数据
* 0: 1
* 1: 2,3
* 2: 4,5
* 3: 6,7
* 4: 8,9
*/
//mapPartitons()的入参和出参都是迭代器!!!!!
//mapOartitions()一次计算的是一个分区的数据
val mapRDD:RDD[Int] = rdd.mapPartitions(
//iter.max 求出每个分区的最大值, 是int
//但是这个函数要求出参是迭代器!
iter => List(iter.max).iterator
)
// mapPartitions : 可以以分区为单位进行数据转换操作
// 但是会将整个分区的数据加载到内存进行引用
// 如果处理完的数据是不会被释放掉,存在对象的引用。
// 在内存较小,数据量较大的场合下,容易出现内存溢出。
//4. collect 从内存中收集数据, 迭代输出
mapRDD.collect().foreach(println)
mapRDD.saveAsTextFile("output")
//5. 关闭资源
sc.stop()
}
Q: map 和 mapPartitions的区别 ?
角度 |
区别 |
1.数据处理角度 |
map算子是分区内一个数据一个数据的执行 , 类似于串行操作; 而 mapPartitions算子是以分区为单位进行批处理操作; |
2.功能角度 |
map算子主要目的是将数据源中的数据进行转换和改变, 但不会减少或增多数据; 而mapPatitiions算子需要传递一个迭代器,返回一个迭代器, 没有要求?元素个数保持不变, 所以可增加或减少数据; |
3. 性能的角度 |
map算子类似于串行操作, 性能较低; mapPartitions算子类似于批处理, 性能较高; 但是mapPartitions算子会长时间占用内存, 那么这样会导致内存可能不够用, 出现内存溢出错误, 内存有限时, 使用map而不是mapPartitions |
1.3 mapPartitionsWithIndex
函数签名 |
函数说明 |
def mapPartitionsWithIndex[U: ClassTag](f: (Index, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] |
将待处理的数据以分区为单位发送到计算节点进行处理 , 这里的处理可以使任意处理哪怕是过滤数据, 在处理时同时可以获取当前分区索引 |
|
入参是(分区索引, 迭代器), 出参是迭代器 |
举个🌰:
//就记住这一点就会用了
// mapWithPartitionsIndex, 入参是(index, iterator), 出参是 iterator
def main(args: Array[String]): Unit = {
//1. 新建配置文件对象
val conf: SparkConf = new SparkConf()
conf.setMaster("local[*]")
conf.setAppName("memoryRDD")
//2. 新建sparkContext
val sc: SparkContext = new SparkContext(conf)
//3. 读取文件
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4), 2)
val rddWithIndex: RDD[Int] = rdd.mapPartitionsWithIndex(
(index, iterator) => {
index match {
case 1 => iterator
case _ => Nil.iterator
}
}
)
//4. 存储结果到文件
rddWithIndex.collect().foreach(println)
rddWithIndex.saveAsTextFile("output")
//5. 关闭资源
sc.stop()
}
如何直接打印数据所在的分区?
val mpiRDD = rdd.mapPartitionsWithIndex(
(index, iter) => {
// 1, 2, 3, 4
//(0,1)(2,2),(4,3),(6,4)
iter.map(
num => {
(index, num)
}
)
}
)
1.4 flatMap
函数签名 |
函数说明 |
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] |
将处理的数据进行扁平化后再进行映射处理 , 扁平映射算子 |
|
入参是T, 出参要求是一个可迭代的集合 |
def main(args: Array[String]): Unit = {
//1. 新建配置文件对象
val conf: SparkConf = new SparkConf()
conf.setMaster("local[*]")
conf.setAppName("memoryRDD")
//2. 新建sparkContext
val sc: SparkContext = new SparkContext(conf)
//3. 读取文件
val rdd = sc.makeRDD(List(List(1,2),3,List(4,5)))
val rddMap = rdd.flatMap(dat => {
dat match {
case i: Int => List(i)
case j: List[_] => j
}
})
//4. 存储结果到文件
rddMap.saveAsTextFile("output")
rddMap.collect().foreach(println)
//5. 关闭资源
sc.stop()
}
1.5 glom
函数签名 |
函数说明 |
def glom(): RDD[Array[T]] |
将同一个分区的数据直接转换为相同类型的内存数组进行处理, 分区不变 |
|
将同一个分区里的元素合并到一个array中 |
计算所有分区最大值求和(分区内取最大值,分区间最大值求和)
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - glom
val rdd : RDD[Int] = sc.makeRDD(List(1,2,3,4), 2)
// 【1,2】,【3,4】
// 【2】,【4】
// 【6】
val glomRDD: RDD[Array[Int]] = rdd.glom()
val maxRDD: RDD[Int] = glomRDD.map(
array => {
array.max
}
)
println(maxRDD.collect().sum)
sc.stop()
}
}
1.6 groupBy
函数签名 |
函数说明 |
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] |
将数据根据指定的规则进行分组 , 分区默认不变, 但是数据会被打乱重新组合 , 这就是shuffle. 极限情况下, 数据可能被分在同一个分区中; |
|
groupBy(f:elem => 对 elem的操作) |
一个组的数据在一个分区中, 但是并不是说一个分区中只有一个组;
- List(“Hello”, “hive”, “hbase”, “Hadoop”)根据单词首写字母进行分组。
sc.groupBy(_.charAt(0))
- 从服务器日志数据 apache.log 中获取每个时间段访问量。
//1. 配置文件对象
val conf = new SparkConf();
conf.setAppName("groupby")
conf.setMaster("local[*]")
//2. sparkcontext
val sc = new SparkContext(conf)
//3. 创建RDD
val rddString: RDD[String] = sc.textFile("datas/apache.log")
//4.
val hourRDD: RDD[(String, Int)] = rddString.map(
line => {
//取出时间
val strArr = line.split(" ")
val time = strArr(3)
//对time格式化
//指定格式化模式
val sdfOfDate = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
//格式化时间
val date: Date = sdfOfDate.parse(time)
//指定格式化模式
val sdfOfHour = new SimpleDateFormat("HH")
//格式化时间, 获取到了小时
val hour: String = sdfOfHour.format(date)
(hour, 1)
}
)
//5. 分组
val hourOfGroupedRDD: RDD[(String, Iterable[(String, Int)])] = hourRDD.groupBy(_._1)
//6. 汇总计算
val res: RDD[(String, Int)] = hourOfGroupedRDD.map {
case (hour: String, iter: Iterable[(String, Int)]) =>
(hour, iter.size)
}
res.collect().foreach(println)
- WordCount
1.7 filter
函数签名 |
函数说明 |
def filter(f: T => Boolean): RDD[T] |
按照规则筛选过滤; 处理后, 分区不变, 但是分区内的数据可能不均衡, 生产环境, 可能会出现数据倾斜; |
|
符合规则的数据保留,不符合规则的数据丢弃 |
简单例子:
val dataRDD = sparkContext.makeRDD(List(1,2,3,4),1)
val dataRDD1 = dataRDD.filter(_%2 == 0)
从服务器日志数据 apache.log 中获取 2015 年 5 月 17 日的请求路径
//1. 配置文件对象
val conf = new SparkConf();
conf.setAppName("groupby")
conf.setMaster("local[*]")
//2. sparkcontext
val sc = new SparkContext(conf)
//3. 创建RDD
val rddString: RDD[String] = sc.textFile("datas/apache.log")
//4.
val timeAndPathRDD: RDD[(String, String)] = rddString.map(
line => {
//取出时间
val strArr = line.split(" ")
val timeAndPathTuple: Tuple2[String, String] = (strArr(3), strArr(6))
//对time格式化
//指定格式化模式
val sdfOfDate = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
//格式化时间
val date: Date = sdfOfDate.parse(timeAndPathTuple._1)
//指定格式化模式
val sdfOfTime = new SimpleDateFormat("dd/MM/yyyy")
//格式化时间, 获取到了小时
val time: String = sdfOfTime.format(date)
(time, timeAndPathTuple._2)
}
)
//过滤
val res = timeAndPathRDD.filter(_._1.equals("17/05/2015"))
res.collect().foreach(println)
1.8 sample
函数签名 |
函数说明 |
def sample(withReplacement: Boolean,fraction:Double,seed: Long = Utils.random.nextLong): RDD[T] |
根据指定的规则从数据集中抽取数据 |
withReplacement |
是否放回(抽奖) |
fraction |
概率 |
seed |
种子 |
val dataRDD = sparkContext.makeRDD(List(1,2,3,4),1)
// 抽取数据不放回(伯努利算法)
// 伯努利算法:又叫 0、1 分布。例如扔硬币,要么正面,要么反面。
// 具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较,小于第二个参数要,大于不要
// 第一个参数:抽取的数据是否放回,false:不放回
// 第二个参数:抽取的几率,范围在[0,1]之间,0:全不取;1:全取;
// 第三个参数:随机数种子
val dataRDD1 = dataRDD.sample(false, 0.5)
// 抽取数据放回(泊松算法)
// 第一个参数:抽取的数据是否放回,true:放回;false:不放回
// 第二个参数:重复数据的几率,范围大于等于 0.表示每一个元素被期望抽取到的次数
// 第三个参数:随机数种子
val dataRDD2 = dataRDD.sample(true, 2)
1.9 distinct
函数签名 |
函数说明 |
def distinct()(implicit ord: Ordering[T] = null): RDD[T] |
将数据集中重复的数据去重 |
def distinct(numPartitions: Int)(implicit ord:Ordering[T] = null): RDD[T] |
|
val dataRDD = sparkContext.makeRDD(List(1,2,3,4,1,2),1)
val dataRDD1 = dataRDD.distinct()
val dataRDD2 = dataRDD.distinct(2)
1.10 coalesce
函数签名 |
函数说明 |
def coalesce(numPartitions: Int, shuffle: Boolean = false,partitionCoalescer: Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[T] = null): RDD[T] |
根据数据量缩减分区 ,用于大数据集过滤后, 提高小数据集的执行效率 |
|
当spark程序中, 存在过多的小任务的时候, 可以通过coalesce方法, 收缩合并分区, 减少分区的个数, 减少任务调度成本 |
注意: coalesce
默认不会打乱分区中的数据. 缩减分区主要是单纯的进行分区间的合并, 为了避免可能的数据倾斜, 此方法的参数 shuffle = true
, 通过shuffle去平衡数据;
val dataRDD = sparkContext.makeRDD(List(1,2,3,4,1,2),6)
val dataRDD1 = dataRDD.coalesce(2)
Q: 如何扩大分区??
1.11 repartition
函数签名 |
函数说明 |
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] |
该操作内部其实执行的是coalesce, 参数shuffle的默认值为true |
|
无论是将分区数多的RDD 转换为分区数少的 RDD,还是将分区数少的 RDD 转换为分区数多的 RDD,repartition操作都可以完成,因为无论如何都会经 shuffle 过程。 |
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EVOB3uZv-1646825459606)(2022-03-08-16-45-38.png)]
coalesce 和 repartition 区别?
1.12 sortBy
函数签名 |
函数说明 |
def sortBy[K](f: (T) => K, ascending: Boolean = true,numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] |
在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理的结果 进行排序,默认为升序排列 |
|
排序后新产生的 RDD 的分区数与原 RDD 的分区数一致。中间存在 shuffle 的过程 |
2. 双Value类型
2.13 intersection
函数签名 |
函数说明 |
def intersection(other: RDD[T]): RDD[T] |
对源RDD和参数RDD求交集 后返回一个新的RDD |
|
|
2.14 union
函数签名 |
函数说明 |
def union(other: RDD[T]): RDD[T] |
对源RDD和参数RDD求并集 后返回一个新的RDD |
2.15 subtract
函数签名 |
函数说明 |
def subtract(other: RDD[T]): RDD[T] |
以一个 RDD 元素为主,去除两个 RDD 中重复元素,将其他元素保留下来 |
|
|
2.16 zip
函数签名 |
函数说明 |
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] |
将两个 RDD 中的元素,以键值对(拉链)的形式进行合并. |
|
键值对中的 Key 为第 1 个 RDD中的元素,Value 为第 2 个 RDD 中的相同位置的元素。 |
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子 - 双Value类型
// 交集,并集和差集要求两个数据源数据类型保持一致
// 拉链操作两个数据源的类型可以不一致
val rdd1 = sc.makeRDD(List(1,2,3,4))
val rdd2 = sc.makeRDD(List(3,4,5,6))
val rdd7 = sc.makeRDD(List("3","4","5","6"))
// 交集 : 【3,4】
val rdd3: RDD[Int] = rdd1.intersection(rdd2)
//val rdd8 = rdd1.intersection(rdd7)
println(rdd3.collect().mkString(","))
// 并集 : 【1,2,3,4,3,4,5,6】
val rdd4: RDD[Int] = rdd1.union(rdd2)
println(rdd4.collect().mkString(","))
// 差集 : 【1,2】
val rdd5: RDD[Int] = rdd1.subtract(rdd2)
println(rdd5.collect().mkString(","))
// 拉链 : 【1-3,2-4,3-5,4-6】
val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)
val rdd8 = rdd1.zip(rdd7)
println(rdd6.collect().mkString(","))
sc.stop()
}
思考一个问题:如果两个 RDD 数据类型不一致怎么办?
思考一个问题:如果两个 RDD 数据分区不一致怎么办?
思考一个问题:如果两个 RDD 分区数据数量不一致怎么办?
- 交集,并集和差集要求两个数据源数据类型保持一致
- 拉链操作两个数据源的类型可以不一致
- [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-eyLK105S-1646825459608)(2022-03-08-17-00-31.png)]
3. Key-Value 类型
3.17 partitionBy
函数签名 |
函数说明 |
def partitionBy(partitioner: Partitioner): RDD[(K, V)] |
将数据按照Partitioner重新进行分区; Spark 默认的分区器是 HashPartitioner |
|
|
3.18 reduceByKey
函数签名 |
函数说明 |
def reduceByKey(func: (V, V) => V): RDD[(K, V)] |
可以将数据按照相同的Key对Value进行聚合 |
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] |
|
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 = dataRDD1.reduceByKey(_ _)
val dataRDD3 = dataRDD1.reduceByKey(_ _, 2)
Q: redeceByKey 和 groupBykey的区别?
从shuffle 的角度
:reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是 reduceByKey可以在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)功能,这样会减少落盘的数据量,而 groupByKey 只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较高。
从功能的角度
:reduceByKey 其实包含分组和聚合的功能。GroupByKey 只能分组,不能聚合,所以在分组聚合的场合下,推荐使用 reduceByKey,如果仅仅是分组而不需要聚合。那么还是只能使用 groupByKey
3.19 groupByKey
函数签名 |
函数说明 |
def groupByKey(): RDD[(K, Iterable[V])] |
将数据源的数据根据key对value进行分组 |
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] |
|
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] |
|
|
|
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 = dataRDD1.groupByKey()
val dataRDD3 = dataRDD1.groupByKey(2)
val dataRDD4 = dataRDD1.groupByKey(new HashPartitioner(2))
3.20 aggregateByKey
函数签名 |
函数说明 |
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] |
将数据根据不同的规则进行分区内计算 和分区间计算 |
|
|
思考一个问题:分区内计算规则和分区间计算规则相同怎么办?
3.21 foldByKey
函数签名 |
函数说明 |
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] |
当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为 foldByKey |
3.22 combineByKey
函数签名 |
函数说明 |
def combineByKey[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C): RDD[(K, C)] |
最通用的对 key-value 型 rdd 进行聚集操作的聚集函数(aggregation function) |
|
类似于arrregate(), combineByKey() 允许用户返回值的类型与输入不一致 |
3.23 sortByKey
函数签名 |
函数说明 |
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)] |
在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口(特质),返回一个按照 key 进行排序 |
的 |
|
3.24 join
函数签名 |
函数说明 |
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] |
在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的 |
(K,(V,W))的 RDD |
|
3.25 leftOuterJoin
函数签名 |
函数说明 |
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] |
类似于 SQL 语句的左外连接 |
3.26 cogroup
函数签名 |
函数说明 |
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] |
在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable,Iterable))类型的 RDD |
5.1.4.4 RDD 行动算子 (Action)
1. reduce
函数签名 |
函数说明 |
def reduce(f: (T, T) => T): T |
聚集 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据 |
2. collect
函数签名 |
函数说明 |
def collect(): Array[T] |
在驱动程序中,以数组 Array 的形式返回数据集的所有元素 |
3. count
函数签名 |
函数说明 |
def count(): Long |
返回 RDD 中元素的个数 |
4. first
函数签名 |
函数说明 |
def first(): T |
返回 RDD 中的第一个元素 |
|
|
5. take
函数签名 |
函数说明 |
def take(num: Int): Array[T] |
返回一个由 RDD 的前 n 个元素组成的数组 |
|
|
6. takeOrdered
函数签名 |
函数说明 |
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] |
返回该 RDD 排序后的前 n 个元素组成的数组 |
|
|
7. aggregate
函数签名 |
函数说明 |
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U |
分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合 |
8. fold
函数签名 |
函数说明 |
def fold(zeroValue: T)(op: (T, T) => T): T |
统计每种 key 的个数 |
9. countByKey
函数签名 |
函数说明 |
def countByKey(): Map[K, Long] |
折叠操作,aggregate 的简化版操作 |
10. save相关的算子
函数签名 |
函数说明 |
def saveAsTextFile(path: String): Unit |
|
def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None): Unit |
将数据保存到不同格式的文件中 |
def saveAsObjectFile(path: String): Unit |
|
def saveAsSequenceFile(path: String,codec: Option[Class[_ <: CompressionCodec]] = None): Unit |
|
11. foreach
这篇好文章是转载于:学新通技术网