• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

SPAK Value RDD :常见操作

武飞扬头像
YIFZZZ
帮助1

两种transformations

窄依赖

RDD 数据处理过程中会用到计算和一些方法,在分布式环境中,窄依赖得到的数据不会换到其他的Partition。[functions and these compute data that live on a single partition meaning there will not be any data movement between partitions]
常用的方法有: map(), mapPartition(), flatMap(), filter(), union().

宽依赖

RDD 数据处理中,data 在很多 partition 上,在RDD 转换过程中,会发生 data 转移 partition的情况 [data that live on many partitions meaning there will be data movements between partitions to execute wider transformations], 这个过程中会发生 Shuffer, 会影响数据处理速度,需要按照业务需求来判定使用宽窄依赖。

Transforms

map , flatMap, mapPartitions 窄依赖常用函数

给定映射函数 f

  val conf = new SparkConf().setMaster("local").setAppName("Test")
  val sc = new SparkContext(conf)
  val rdd1 = sc.parallelize(1 to 100)
  val rdd2 = sc.range(1, 100, 1, 10)   //这里会每10个进行一个分区	
  • map: map(f) 以元素为粒度对 RDD 做数据转换 【元素】=> 【元素】,
    • RDD.map( f)
    	rdd1.map(_   101).collect.foreach(f => println(f))
    
  • mapPartitions : 以数据分区为粒度,使用映射函数 f 对 RDD 进行数据转换 【元素】=> 【元素】,
    • f 作用的对象是不同的区(block), 因此 f的类型必须是 Iterator < T > => Iterator < U >
    	   rdd2.mapPartitions( f => {
    	      for (i <- f)
    	        yield i   202
    	    }).foreach(f => println(f)) 
    	    // 因为需要	Iterator   可以直接写成
            rdd2.mapPartitions{iter => iter.map(_   202) }.collect.foreach(f => println(f))
            //  如果用到字符串操作,使用 s"$",如下
            rdd1.mapPartitionsWithIndex{(idx, iter) => 
            Iterator(s"$idx:${iter.toArray.mkString("-")}")
             }.collect
    
  • flatMap: 【元素】=> 【集合】
    • f 函数应该返回一个 Seq 而不是单个元素
    	rdd2.flatMap( f = l => {
    					Seq(l   101) 
    					}).collect.foreach(f => println(f))
    
  • union(otherRDD)
  • cartesian(otherRDD):笛卡尔积

map 与 mapPartitions 的区别

  • map:每次处理一条数据
  • mapPartitions:每次处理一个分区的数据,分区的数据处理完成后,数据才能释放,资源不足时容易导致OOM
  • 最佳实践:当内存资源充足时,建议使用mapPartitions,以提高处理效率

宽依赖常用

  • sample(withReplacement, fraction, seed):采样算子。以指定的随机种子(seed)随机抽样出数量为fraction的数据,withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样, 不管什么方式抽样,都不影响数组的元素变化
 val sample = rdd1.sample(false, 0.2)
  • groupBy(func):按照传入函数的返回值进行分组
 groupEven = rdd1.groupBy(_ % 2 == 0)
  • distinct([numTasks])):对RDD元素去重后,返回一个新的RDD。可传入numTasks参数改变RDD分区数
  • coalesce(numPartitions):缩减分区数,无shuffle
  • repartition(numPartitions):增加或减少分区数,有shuffle
val distRDD = sc.parallelize(Array(1,1,1,1,1,1,2,2,2,2,2,2,2,2,3,3,34)).distinct
val rdd4 = rdd2.coalesce(5)
// rdd2.getNumPartitions = 10
// rdd4.getNumPartitions =  5
val rdd5 = rdd2.repartition(5)
// rdd4.getNumPartitions =  15
  • sortBy(func, [ascending], [numTasks]):使用 func 对数据进行处理,对处理后的结果进行排序
 sortRDD= rdd1.sortBy(x=>x)
  • rdd1.intersection(rdd2) rdd1 与 rdd2 的交集
  • rdd1.subtract(rdd2) rdd1有 rdd2 没有的
    学新通

Action比较简单,

详见 https://spark.apache.org/docs/2.4.5/rdd-programming-guide.html#actions

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgkhggj
系列文章
更多 icon
同类精品
更多 icon
继续加载