• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Spark优化篇数据倾斜解决

武飞扬头像
lucklilili
帮助1

数据倾斜是指我们在并行进行数据处理的时候,由于数据散列引起Spark的单个Partition的分布不均,导致大量的数据集中分布到一台或者几台计算节点上,导致处理速度远低于平均计算速度,从而拖延导致整个计算过程过慢,影响整个计算性能。

数据倾斜带来的问题

单个或者多个Task长尾执行,拖延整个任务运行时间,导致整体耗时过大。单个Task处理数据过多,很容易导致OOM。

数据倾斜的产生原因

数据倾斜一般是发生在 shuffle 类的算子、SQL函数导致,具体如以下:

类型 RDD SQL
去重 distinct distinct
聚合 groupByKey、reduceByKey、aggregateByKey group by
关联 join、left join、right join join、left join、right join

通过Spark web ui event timeline观察明显长尾任务:

学新通

学新通

数据倾斜大Key定位

RDD进行抽取:

  1.  
    val cscTopKey: Array[(Int, Row)] = sampleSKew(sparkSession,"default.tab_spark","id")
  2.  
    println(cscTopKey.mkString("\n"))
  3.  
     
  4.  
    def sampleSKew( sparkSession: SparkSession, tableName: String, keyColumn: String ): Array[(Int, Row)] = {
  5.  
    val df: DataFrame = sparkSession.sql("select " keyColumn " from " tableName)
  6.  
    val top10Key: Array[(Int, Row)] = df
  7.  
    .select(keyColumn).sample(withReplacement = false, 0.1).rdd
  8.  
    .map(k => (k, 1)).reduceByKey(_ _)
  9.  
    .map(k => (k._2, k._1)).sortByKey(ascending = false)
  10.  
    .take(10)
  11.  
    top10Key
  12.  
    }

SQL进行抽取:

  1.  
    SELECT
  2.  
    id,conut(1) as cn
  3.  
    FROM
  4.  
    default.tab_spark_test_3
  5.  
    GROUP BY id
  6.  
    ORDER BY cn DESC
  7.  
    LIMIT 100;
  1.  
    100000,2000012
  2.  
    100001,1600012
  3.  
    100002,1

单表数据倾斜优化

为了减少 shuffle 数据量以及 reduce 端的压力,通常 Spark SQL 在 map 端会做一个partial aggregate(通常叫做预聚合或者偏聚合),即在 shuffle 前将同一分区内所属同 key 的记录先进行一个预结算,再将结果进行 shuffle,发送到 reduce 端做一个汇总,类似 MR 的提前Combiner,所以执行计划中 HashAggregate 通常成对出现。 但是这种也会出现问题,如果key重复的量级特别大,Combiner也是解决不了本质问题。

解决方案:

Add Salt局部聚合 2、Remove Salt全局聚合

  1.  
    sparkSession.udf.register("random_prefix", ( value: Int, num: Int ) => randomPrefixUDF(value, num))
  2.  
    sparkSession.udf.register("remove_random_prefix", ( value: String ) => removeRandomPrefixUDF(value))
  3.  
     
  4.  
    //t1 增加前缀,t2按照加盐的key进行聚,t3去除加盐,聚合
  5.  
    val sql =
  6.  
    """
  7.  
    |select
  8.  
    | id,
  9.  
    | sum(sell) totalSell
  10.  
    |from
  11.  
    | (
  12.  
    | select
  13.  
    | remove_random_prefix(random_id) id,
  14.  
    | sell
  15.  
    | from
  16.  
    | (
  17.  
    | select
  18.  
    | random_id,
  19.  
    | sum(pic) sell
  20.  
    | from
  21.  
    | (
  22.  
    | select
  23.  
    | random_prefix(id, 6) random_id,
  24.  
    | pic
  25.  
    | from
  26.  
    | default.tab_spark_test_3
  27.  
    | ) t1
  28.  
    | group by random_id
  29.  
    | ) t2
  30.  
    | ) t3
  31.  
    |group by
  32.  
    | id
  33.  
    """.stripMargin
  34.  
     
  35.  
    def randomPrefixUDF( value: Int, num: Int ): String = {
  36.  
    new Random().nextInt(num).toString "_" value
  37.  
    }
  38.  
     
  39.  
    def removeRandomPrefixUDF( value: String ): String = {
  40.  
    value.toString.split("_")(1)
  41.  
    }
学新通

表关联数据倾斜优化

1、适用场景

适用于 join 时出现数据倾斜。

2、解决逻辑

1、将存在倾斜的表,根据抽样结果,拆分为倾斜 key(skew 表)和没有倾斜 key(common)的两个数据集;

2、将 skew 表的 key 全部加上随机前缀,然后对另外一个不存在严重数据倾斜的数据集(old 表)整体与随机前缀集作笛卡尔乘积(即将数据量扩大 N 倍,得到 new 表)。

3、打散的 skew 表 join 扩容的 new 表

union common 表 join old 表

以下为打散大 key 和扩容小表的实现思路

1、打散大表:实际就是数据一进一出进行处理,对大 key 前拼上随机前缀实现打散;

2、扩容小表:实际就是将 DataFrame 中每一条数据,转成一个集合,并往这个集合里循环添加 10 条数据,最后使用 flatmap 压平此集合,达到扩容的效果。

  1.  
    /**
  2.  
    * 打散大表 扩容小表 解决数据倾斜
  3.  
    *
  4.  
    * @param sparkSession
  5.  
    */
  6.  
    def scatterBigAndExpansionSmall(sparkSession: SparkSession): Unit = {
  7.  
    import sparkSession.implicits._
  8.  
    val saleCourse = sparkSession.sql("select *from sparktuning.sale_course")
  9.  
    val coursePay = sparkSession.sql("select * from sparktuning.course_pay")
  10.  
    .withColumnRenamed("discount", "pay_discount")
  11.  
    .withColumnRenamed("createtime", "pay_createtime")
  12.  
    val courseShoppingCart = sparkSession.sql("select * from sparktuning.course_shopping_cart")
  13.  
    .withColumnRenamed("discount", "cart_discount")
  14.  
    .withColumnRenamed("createtime", "cart_createtime")
  15.  
     
  16.  
    // TODO 1、拆分 倾斜的key
  17.  
    val commonCourseShoppingCart: Dataset[Row] = courseShoppingCart.filter(item => item.getAs[Long]("courseid") != 101 && item.getAs[Long]("courseid") != 103)
  18.  
    val skewCourseShoppingCart: Dataset[Row] = courseShoppingCart.filter(item => item.getAs[Long]("courseid") == 101 || item.getAs[Long]("courseid") == 103)
  19.  
     
  20.  
    //TODO 2、将倾斜的key打散 打散36
  21.  
    val newCourseShoppingCart = skewCourseShoppingCart.mapPartitions((partitions: Iterator[Row]) => {
  22.  
    partitions.map(item => {
  23.  
    val courseid = item.getAs[Long]("courseid")
  24.  
    val randInt = Random.nextInt(36)
  25.  
    CourseShoppingCart(courseid, item.getAs[String]("orderid"),
  26.  
    item.getAs[String]("coursename"), item.getAs[String]("cart_discount"),
  27.  
    item.getAs[String]("sellmoney"), item.getAs[String]("cart_createtime"),
  28.  
    item.getAs[String]("dt"), item.getAs[String]("dn"), randInt "_" courseid)
  29.  
    })
  30.  
    })
  31.  
    //TODO 3、小表进行扩容 扩大36
  32.  
    val newSaleCourse = saleCourse.flatMap(item => {
  33.  
    val list = new ArrayBuffer[SaleCourse]()
  34.  
    val courseid = item.getAs[Long]("courseid")
  35.  
    val coursename = item.getAs[String]("coursename")
  36.  
    val status = item.getAs[String]("status")
  37.  
    val pointlistid = item.getAs[Long]("pointlistid")
  38.  
    val majorid = item.getAs[Long]("majorid")
  39.  
    val chapterid = item.getAs[Long]("chapterid")
  40.  
    val chaptername = item.getAs[String]("chaptername")
  41.  
    val edusubjectid = item.getAs[Long]("edusubjectid")
  42.  
    val edusubjectname = item.getAs[String]("edusubjectname")
  43.  
    val teacherid = item.getAs[Long]("teacherid")
  44.  
    val teachername = item.getAs[String]("teachername")
  45.  
    val coursemanager = item.getAs[String]("coursemanager")
  46.  
    val money = item.getAs[String]("money")
  47.  
    val dt = item.getAs[String]("dt")
  48.  
    val dn = item.getAs[String]("dn")
  49.  
    for (i <- 0 until 36) {
  50.  
    list.append(SaleCourse(courseid, coursename, status, pointlistid, majorid, chapterid, chaptername, edusubjectid,
  51.  
    edusubjectname, teacherid, teachername, coursemanager, money, dt, dn, i "_" courseid))
  52.  
    }
  53.  
    list
  54.  
    })
  55.  
     
  56.  
    // TODO 4、倾斜的大key 与 扩容后的表 进行join
  57.  
    val df1: DataFrame = newSaleCourse
  58.  
    .join(newCourseShoppingCart.drop("courseid").drop("coursename"), Seq("rand_courseid", "dt", "dn"), "right")
  59.  
    .join(coursePay, Seq("orderid", "dt", "dn"), "left")
  60.  
    .select("courseid", "coursename", "status", "pointlistid", "majorid", "chapterid", "chaptername", "edusubjectid"
  61.  
    , "edusubjectname", "teacherid", "teachername", "coursemanager", "money", "orderid", "cart_discount", "sellmoney",
  62.  
    "cart_createtime", "pay_discount", "paymoney", "pay_createtime", "dt", "dn")
  63.  
     
  64.  
     
  65.  
    // TODO 5、没有倾斜大key的部分 与 原来的表 进行join
  66.  
    val df2: DataFrame = saleCourse
  67.  
    .join(commonCourseShoppingCart.drop("coursename"), Seq("courseid", "dt", "dn"), "right")
  68.  
    .join(coursePay, Seq("orderid", "dt", "dn"), "left")
  69.  
    .select("courseid", "coursename", "status", "pointlistid", "majorid", "chapterid", "chaptername", "edusubjectid"
  70.  
    , "edusubjectname", "teacherid", "teachername", "coursemanager", "money", "orderid", "cart_discount", "sellmoney",
  71.  
    "cart_createtime", "pay_discount", "paymoney", "pay_createtime", "dt", "dn")
  72.  
     
  73.  
    // TODO 6、将 倾斜key join后的结果 与 普通key join后的结果,uinon起来
  74.  
    df1
  75.  
    .union(df2)
  76.  
    .write.mode(SaveMode.Overwrite).insertInto("sparktuning.salecourse_detail")
  77.  
    }
学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhggjjfj
系列文章
更多 icon
同类精品
更多 icon
继续加载