• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

五-, Spark 算子 (转化+行动算子共三十七个)

武飞扬头像
菜菜的大数据开发之路
帮助1

五-中, Spark 算子吐血总结

5.1.4.3 RDD 转换算子(Transformation)

什么是算子?

在流处理、交互式查询中有个常用的概念是“算子”,在英文中被成为“Operation”,在数学上可以解释为一个函数空间到另一个函数空间上的映射O:X->X,其实就是一个处理单元,往往是指一个函数,在使用算子时往往会有输入和输出,算子则完成相应数据的转化,比如:Group、Sort等都是算子。


大方向来说, Spark算子(RDD方法)大致可以分为以下两类:

  1. Transformation 变换/转换算子 : 这种变换并不触发提交作业, 而是完成作业中间过程处理; Transformation 操作是延迟计算的, 也就是说从一个RDD转换为另一个RDD的转换操作不是马上执行, 需要等到有Action操作(行动算子)的时候才会真正触发运算;
  2. Action 行动算子: 这类算子会触发SparkContext 提交Job作业, 并将数据输出到Spark系统;

小方向来说, Spark算子大致分为三类:

  1. Value数据类型的Transformation算子, 这种变换并不触发提交作业, 针对处理的数据项是Value型的数据;
  2. Key-Value数据类型的Transformation算子, 这种变换并不触发提交作业, 针对处理的数据项是Key-Value型的数据对;
  3. Action算子, 这类算子会触发SparkContext提交Job作业;
1. Value类型
1.1 map
函数签名 函数说明
def map[U: ClassTag](f: T => U): RDD[U] 将待处理的数据逐条进行映射转换, 这里的转换可以是类型的转换, 也可以是值的转换
  其实就是Scala集合函数中的map((数据变量:数据类型) => {对每一条数据的映射操作})
val dataRDD: RDD[Int] = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD1: RDD[Int] = dataRDD.map(num => {
  num * 2
})
val dataRDD2: RDD[String] = dataRDD1.map(num => {
  ""   num
})

map的并行计算🌰:
// 1. rdd的计算一个分区内的数据是一个一个执行逻辑
// 只有前面一个数据全部的逻辑执行完毕后,才会执行下一个数据。
// 分区内数据的执行是有序的。
// 2. 不同分区数据计算是无序的。

1.2 mapPartitions
函数签名 函数说明
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U],preservesPartitioning: Boolean = false): RDD[U] 将待处理的数据以分区为单位发送到计算结点进行处理, 这里的处理是指可以进行任意的处理, 哪怕是过滤数据
  mapPartition, 传递一个迭代器, 返回一个迭代器
  什么, 你的不是迭代器怎么办? 用List包装, 再获取集合的迭代器即可

栗子: 获取每个数据分区的最大值

def main(args: Array[String]): Unit = {
    //1. 新建配置文件对象
    val conf: SparkConf = new SparkConf()

    conf.setMaster("local[*]")
    conf.setAppName("memoryRDD")
    //2. 新建sparkContext
    val sc: SparkContext = new SparkContext(conf)

    //3. 读取文件
    val rdd= sc.makeRDD(List(1,2,3,4,5,6,7,8,9), 5)

    //由上面讲过的内存中数据的分区规则
    /**分区号: 数据
     * 0: 1
     * 1: 2,3
     * 2: 4,5
     * 3: 6,7
     * 4: 8,9
     */
    //mapPartitons()的入参和出参都是迭代器!!!!!
    //mapOartitions()一次计算的是一个分区的数据
    val mapRDD:RDD[Int] = rdd.mapPartitions(
      //iter.max 求出每个分区的最大值, 是int
      //但是这个函数要求出参是迭代器!
      iter => List(iter.max).iterator
    )
      // mapPartitions : 可以以分区为单位进行数据转换操作
        //                 但是会将整个分区的数据加载到内存进行引用
        //                 如果处理完的数据是不会被释放掉,存在对象的引用。
        //                 在内存较小,数据量较大的场合下,容易出现内存溢出。
    //4. collect 从内存中收集数据, 迭代输出
    mapRDD.collect().foreach(println)

    mapRDD.saveAsTextFile("output")
    //5. 关闭资源
    sc.stop()
}
学新通

Q: map 和 mapPartitions的区别 ?

角度 区别
1.数据处理角度 map算子是分区内一个数据一个数据的执行, 类似于串行操作; 而 mapPartitions算子是以分区为单位进行批处理操作;
2.功能角度 map算子主要目的是将数据源中的数据进行转换和改变, 但不会减少或增多数据; 而mapPatitiions算子需要传递一个迭代器,返回一个迭代器, 没有要求?元素个数保持不变, 所以可增加或减少数据;
3. 性能的角度 map算子类似于串行操作, 性能较低; mapPartitions算子类似于批处理, 性能较高; 但是mapPartitions算子会长时间占用内存, 那么这样会导致内存可能不够用, 出现内存溢出错误, 内存有限时, 使用map而不是mapPartitions
1.3 mapPartitionsWithIndex
函数签名 函数说明
def mapPartitionsWithIndex[U: ClassTag](f: (Index, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] 待处理的数据以分区为单位发送到计算节点进行处理, 这里的处理可以使任意处理哪怕是过滤数据, 在处理时同时可以获取当前分区索引
  入参是(分区索引, 迭代器), 出参是迭代器

举个🌰:

//就记住这一点就会用了
// mapWithPartitionsIndex, 入参是(index, iterator), 出参是 iterator
def main(args: Array[String]): Unit = {
    //1. 新建配置文件对象
    val conf: SparkConf = new SparkConf()

    conf.setMaster("local[*]")
    conf.setAppName("memoryRDD")
    //2. 新建sparkContext
    val sc: SparkContext = new SparkContext(conf)

    //3. 读取文件
    val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4), 2)

    val rddWithIndex: RDD[Int] = rdd.mapPartitionsWithIndex(
      (index, iterator) => {
        index match {
          case 1 => iterator
          case _ => Nil.iterator
        }
      }
    )

    //4. 存储结果到文件
    rddWithIndex.collect().foreach(println)
    rddWithIndex.saveAsTextFile("output")

    //5. 关闭资源
    sc.stop()
  }
学新通

如何直接打印数据所在的分区?

val mpiRDD = rdd.mapPartitionsWithIndex(
            (index, iter) => {
                // 1,   2,    3,   4
                //(0,1)(2,2),(4,3),(6,4)
                iter.map(
                    num => {
                        (index, num)
                    }
                )
            }
        )
1.4 flatMap
函数签名 函数说明
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] 将处理的数据进行扁平化后再进行映射处理, 扁平映射算子
  入参是T, 出参要求是一个可迭代的集合
def main(args: Array[String]): Unit = {
    //1. 新建配置文件对象
    val conf: SparkConf = new SparkConf()

    conf.setMaster("local[*]")
    conf.setAppName("memoryRDD")
    //2. 新建sparkContext
    val sc: SparkContext = new SparkContext(conf)

    //3. 读取文件
    val rdd = sc.makeRDD(List(List(1,2),3,List(4,5)))

    val rddMap = rdd.flatMap(dat => {
      dat match {
        case i: Int => List(i)
        case j: List[_] => j
      }
    })

    //4. 存储结果到文件
    rddMap.saveAsTextFile("output")
    rddMap.collect().foreach(println)
    //5. 关闭资源
    sc.stop()
}
学新通
1.5 glom
函数签名 函数说明
def glom(): RDD[Array[T]] 同一个分区的数据直接转换为相同类型的内存数组进行处理, 分区不变
  将同一个分区里的元素合并到一个array中

计算所有分区最大值求和(分区内取最大值,分区间最大值求和)

def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)

        // TODO 算子 - glom
        val rdd : RDD[Int] = sc.makeRDD(List(1,2,3,4), 2)

        // 【1,2】,【3,4】
        // 【2】,【4】
        // 【6】
        val glomRDD: RDD[Array[Int]] = rdd.glom()

        val maxRDD: RDD[Int] = glomRDD.map(
            array => {
                array.max
            }
        )
        println(maxRDD.collect().sum)
        sc.stop()
    }
}
学新通
1.6 groupBy
函数签名 函数说明
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] 将数据根据指定的规则进行分组, 分区默认不变, 但是数据会被打乱重新组合, 这就是shuffle. 极限情况下, 数据可能被分在同一个分区中;
  groupBy(f:elem => 对 elem的操作)

一个组的数据在一个分区中, 但是并不是说一个分区中只有一个组;

  1. List(“Hello”, “hive”, “hbase”, “Hadoop”)根据单词首写字母进行分组。
sc.groupBy(_.charAt(0))
  1. 从服务器日志数据 apache.log 中获取每个时间段访问量。
//1. 配置文件对象
    val conf = new SparkConf();
    conf.setAppName("groupby")
    conf.setMaster("local[*]")
    //2. sparkcontext
    val sc = new SparkContext(conf)

    //3. 创建RDD
    val rddString: RDD[String] = sc.textFile("datas/apache.log")

    //4.
    val hourRDD: RDD[(String, Int)] = rddString.map(
      line => {
        //取出时间
        val strArr = line.split(" ")
        val time = strArr(3)

        //对time格式化
        //指定格式化模式
        val sdfOfDate = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
        //格式化时间
        val date: Date = sdfOfDate.parse(time)

        //指定格式化模式
        val sdfOfHour = new SimpleDateFormat("HH")
        //格式化时间, 获取到了小时
        val hour: String = sdfOfHour.format(date)
        (hour, 1)
      }
    )

    //5. 分组
    val hourOfGroupedRDD: RDD[(String, Iterable[(String, Int)])] = hourRDD.groupBy(_._1)

    //6. 汇总计算
    val res: RDD[(String, Int)] = hourOfGroupedRDD.map {
      case (hour: String, iter: Iterable[(String, Int)]) =>
        (hour, iter.size)
    }
    res.collect().foreach(println)
学新通

学新通

  1. WordCount
1.7 filter
函数签名 函数说明
def filter(f: T => Boolean): RDD[T] 按照规则筛选过滤; 处理后, 分区不变, 但是分区内的数据可能不均衡, 生产环境, 可能会出现数据倾斜;
  符合规则的数据保留,不符合规则的数据丢弃

简单例子:

val dataRDD = sparkContext.makeRDD(List(1,2,3,4),1)
val dataRDD1 = dataRDD.filter(_%2 == 0)

从服务器日志数据 apache.log 中获取 2015 年 5 月 17 日的请求路径

//1. 配置文件对象
    val conf = new SparkConf();
    conf.setAppName("groupby")
    conf.setMaster("local[*]")
    //2. sparkcontext
    val sc = new SparkContext(conf)

    //3. 创建RDD
    val rddString: RDD[String] = sc.textFile("datas/apache.log")

    //4.
    val timeAndPathRDD: RDD[(String, String)] = rddString.map(
      line => {
        //取出时间
        val strArr = line.split(" ")
        val timeAndPathTuple: Tuple2[String, String] = (strArr(3), strArr(6))

        //对time格式化
        //指定格式化模式
        val sdfOfDate = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
        //格式化时间
        val date: Date = sdfOfDate.parse(timeAndPathTuple._1)

        //指定格式化模式
        val sdfOfTime = new SimpleDateFormat("dd/MM/yyyy")
        //格式化时间, 获取到了小时
        val time: String = sdfOfTime.format(date)
        (time, timeAndPathTuple._2)
      }
    )
    //过滤
    val res = timeAndPathRDD.filter(_._1.equals("17/05/2015"))
    res.collect().foreach(println)
学新通
1.8 sample
函数签名 函数说明
def sample(withReplacement: Boolean,fraction:Double,seed: Long = Utils.random.nextLong): RDD[T] 根据指定的规则从数据集中抽取数据
withReplacement 是否放回(抽奖)
fraction 概率
seed 种子

学新通

val dataRDD = sparkContext.makeRDD(List(1,2,3,4),1)
    // 抽取数据不放回(伯努利算法)
    // 伯努利算法:又叫 0、1 分布。例如扔硬币,要么正面,要么反面。
    // 具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较,小于第二个参数要,大于不要
    // 第一个参数:抽取的数据是否放回,false:不放回
    // 第二个参数:抽取的几率,范围在[0,1]之间,0:全不取;1:全取;
    // 第三个参数:随机数种子
    val dataRDD1 = dataRDD.sample(false, 0.5)
    // 抽取数据放回(泊松算法)
    // 第一个参数:抽取的数据是否放回,true:放回;false:不放回
    // 第二个参数:重复数据的几率,范围大于等于 0.表示每一个元素被期望抽取到的次数
    // 第三个参数:随机数种子
    val dataRDD2 = dataRDD.sample(true, 2)
1.9 distinct
函数签名 函数说明
def distinct()(implicit ord: Ordering[T] = null): RDD[T] 将数据集中重复的数据去重
def distinct(numPartitions: Int)(implicit ord:Ordering[T] = null): RDD[T]  
val dataRDD = sparkContext.makeRDD(List(1,2,3,4,1,2),1)
val dataRDD1 = dataRDD.distinct()
val dataRDD2 = dataRDD.distinct(2)

学新通

1.10 coalesce
函数签名 函数说明
def coalesce(numPartitions: Int, shuffle: Boolean = false,partitionCoalescer: Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[T] = null): RDD[T] 根据数据量缩减分区,用于大数据集过滤后, 提高小数据集的执行效率
  当spark程序中, 存在过多的小任务的时候, 可以通过coalesce方法, 收缩合并分区, 减少分区的个数, 减少任务调度成本

注意: coalesce 默认不会打乱分区中的数据. 缩减分区主要是单纯的进行分区间的合并, 为了避免可能的数据倾斜, 此方法的参数 shuffle = true, 通过shuffle去平衡数据;

val dataRDD = sparkContext.makeRDD(List(1,2,3,4,1,2),6)
val dataRDD1 = dataRDD.coalesce(2)

Q: 如何扩大分区??

学新通

  • 其实, 直接用下面的这个方法就可以了.
1.11 repartition
函数签名 函数说明
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] 该操作内部其实执行的是coalesce, 参数shuffle的默认值为true
  无论是将分区数多的RDD 转换为分区数少的 RDD,还是将分区数少的 RDD 转换为分区数多的 RDD,repartition操作都可以完成,因为无论如何都会经 shuffle 过程。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EVOB3uZv-1646825459606)(2022-03-08-16-45-38.png)]

coalesce 和 repartition 区别?
学新通

1.12 sortBy
函数签名 函数说明
def sortBy[K](f: (T) => K, ascending: Boolean = true,numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] 在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理的结果进行排序,默认为升序排列
  排序后新产生的 RDD 的分区数与原 RDD 的分区数一致。中间存在 shuffle 的过程

学新通

2. 双Value类型
2.13 intersection
函数签名 函数说明
def intersection(other: RDD[T]): RDD[T] 对源RDD和参数RDD求交集后返回一个新的RDD
   
2.14 union
函数签名 函数说明
def union(other: RDD[T]): RDD[T] 源RDD参数RDD并集后返回一个新的RDD
2.15 subtract
函数签名 函数说明
def subtract(other: RDD[T]): RDD[T] 以一个 RDD 元素为主,去除两个 RDD 中重复元素,将其他元素保留下来
   
2.16 zip
函数签名 函数说明
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] 将两个 RDD 中的元素,以键值对(拉链)的形式进行合并.
  键值对中的 Key 为第 1 个 RDD中的元素,Value 为第 2 个 RDD 中的相同位置的元素。
 def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - 双Value类型

    // 交集,并集和差集要求两个数据源数据类型保持一致
    // 拉链操作两个数据源的类型可以不一致

    val rdd1 = sc.makeRDD(List(1,2,3,4))
    val rdd2 = sc.makeRDD(List(3,4,5,6))
    val rdd7 = sc.makeRDD(List("3","4","5","6"))

    // 交集 : 【3,4】
    val rdd3: RDD[Int] = rdd1.intersection(rdd2)
    //val rdd8 = rdd1.intersection(rdd7)
    println(rdd3.collect().mkString(","))

    // 并集 : 【1,2,3,4,3,4,5,6】
    val rdd4: RDD[Int] = rdd1.union(rdd2)
    println(rdd4.collect().mkString(","))

    // 差集 : 【1,2】
    val rdd5: RDD[Int] = rdd1.subtract(rdd2)
    println(rdd5.collect().mkString(","))

    // 拉链 : 【1-3,2-4,3-5,4-6】
    val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)
    val rdd8 = rdd1.zip(rdd7)
    println(rdd6.collect().mkString(","))

    sc.stop()
  }
学新通

思考一个问题:如果两个 RDD 数据类型不一致怎么办?
思考一个问题:如果两个 RDD 数据分区不一致怎么办?
思考一个问题:如果两个 RDD 分区数据数量不一致怎么办?

  1. 交集,并集和差集要求两个数据源数据类型保持一致
  2. 拉链操作两个数据源的类型可以不一致
  3. [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-eyLK105S-1646825459608)(2022-03-08-17-00-31.png)]
3. Key-Value 类型
3.17 partitionBy
函数签名 函数说明
def partitionBy(partitioner: Partitioner): RDD[(K, V)] 将数据按照Partitioner重新进行分区; Spark 默认的分区器是 HashPartitioner
   

学新通

  • 思考一个问题:如果重分区的分区器和当前 RDD 的分区器一样怎么办?

    • 产生的结果都是一样的
  • 思考一个问题:Spark 还有其他分区器吗?
    学新通

    • RangePartitioner 一般在排序中使用
      思考一个问题:如果想按照自己的方法进行数据分区怎么办?
  • 自己写一个分区器(待补充)
    思考一个问题:哪那么多问题?

3.18 reduceByKey
函数签名 函数说明
def reduceByKey(func: (V, V) => V): RDD[(K, V)] 可以将数据按照相同的Key对Value进行聚合
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]  
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 = dataRDD1.reduceByKey(_ _)
val dataRDD3 = dataRDD1.reduceByKey(_ _, 2)

Q: redeceByKey 和 groupBykey的区别?

从shuffle 的角度:reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是 reduceByKey可以在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)功能,这样会减少落盘的数据量,而 groupByKey 只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较高。
从功能的角度:reduceByKey 其实包含分组和聚合的功能。GroupByKey 只能分组,不能聚合,所以在分组聚合的场合下,推荐使用 reduceByKey,如果仅仅是分组而不需要聚合。那么还是只能使用 groupByKey

3.19 groupByKey
函数签名 函数说明
def groupByKey(): RDD[(K, Iterable[V])] 将数据源的数据根据key对value进行分组
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]  
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]  
   
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 = dataRDD1.groupByKey()
val dataRDD3 = dataRDD1.groupByKey(2)
val dataRDD4 = dataRDD1.groupByKey(new HashPartitioner(2))
3.20 aggregateByKey
函数签名 函数说明
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] 将数据根据不同的规则进行分区内计算分区间计算
   

思考一个问题:分区内计算规则和分区间计算规则相同怎么办?

3.21 foldByKey
函数签名 函数说明
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] 当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为 foldByKey
3.22 combineByKey
函数签名 函数说明
def combineByKey[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C): RDD[(K, C)] 最通用的对 key-value 型 rdd 进行聚集操作的聚集函数(aggregation function)
  类似于arrregate(), combineByKey() 允许用户返回值的类型与输入不一致
3.23 sortByKey
函数签名 函数说明
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)] 在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口(特质),返回一个按照 key 进行排序
 
3.24 join
函数签名 函数说明
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] 在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的
(K,(V,W))的 RDD  
3.25 leftOuterJoin
函数签名 函数说明
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] 类似于 SQL 语句的左外连接
3.26 cogroup
函数签名 函数说明
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] 在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable,Iterable))类型的 RDD

5.1.4.4 RDD 行动算子 (Action)

1. reduce
函数签名 函数说明
def reduce(f: (T, T) => T): T 聚集 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据
2. collect
函数签名 函数说明
def collect(): Array[T] 在驱动程序中,以数组 Array 的形式返回数据集的所有元素
3. count
函数签名 函数说明
def count(): Long 返回 RDD 中元素的个数
4. first
函数签名 函数说明
def first(): T 返回 RDD 中的第一个元素
   
5. take
函数签名 函数说明
def take(num: Int): Array[T] 返回一个由 RDD 的前 n 个元素组成的数组
   
6. takeOrdered
函数签名 函数说明
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] 返回该 RDD 排序后的前 n 个元素组成的数组
   
7. aggregate
函数签名 函数说明
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U 分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合
8. fold
函数签名 函数说明
def fold(zeroValue: T)(op: (T, T) => T): T 统计每种 key 的个数
9. countByKey
函数签名 函数说明
def countByKey(): Map[K, Long] 折叠操作,aggregate 的简化版操作
10. save相关的算子
函数签名 函数说明
def saveAsTextFile(path: String): Unit  
def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None): Unit 将数据保存到不同格式的文件中
def saveAsObjectFile(path: String): Unit  
def saveAsSequenceFile(path: String,codec: Option[Class[_ <: CompressionCodec]] = None): Unit  
11. foreach
函数签名 函数说明
   

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgcjghk
系列文章
更多 icon
同类精品
更多 icon
继续加载