• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

spark进阶五DataFrame和DataSet使用

武飞扬头像
泛泛之素
帮助1

spark进阶(五):DataFrame和DataSet使用

DataFrame是Spark SQL提供的一个编程抽象,与RDD类似,也是一个分布式的数据集合。但与RDD不同的是,DataFrame的数据都被组织到有名字的列中,就像关系型数据库中的表一样。此外,多种数据都可以转化为DataFrame,例如Spark计算过程中生成的RDD、结构化数据文件、Hive中的表、外部数据库等。

在Spark中,一个DataFrame所代表的是一个元素类型为Row的Dataset,即DataFrame只是Dataset[Row]的一个类型别名。相对于RDD,Dataset提供了强类型支持,在RDD的每行数据加了类型约束。而且使用DatasetAPI同样会经过Spark SQL优化器的优化,从而提高程序执行效率。

DataFrame和R的数据结构以及python pandas DataFrame的数据结构和操作基本一致。

一、创建DataFrame、DataSet

  • 创建RDD
  • RDD转化为ROW
  • 通过ROW和元数据信息生成DataFrame
  • 然后通过DataFrame和对应的类转化为DataSet
  • 也就是说DataFrame是DataSet[Row],这里可以通过指定的类将其转化,DataSet[User]
  • 需要注意的事转化使用的类需要时内部类,然后就是类里的变量名要和元数据信息的列名保持对齐
/**
 * @author: ffzs
 * @Date: 2021/10/7 上午8:33
 */
object MovieLenDataSet {
  case class User(UserID:String, Gender:String, Age:String, Occupation:String, Zip_Code:String)
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val spark = SparkSession.builder()
      .appName("MovieLenDataSet")
      .master("local[*]")
      .getOrCreate()
    import spark.implicits._

    val dataPath = "/home/ffzs/data/ml-1m"
    val schema4users = StructType(
      "UserID::Gender::Age::Occupation::Zip_code"
        .split("::")
        .map(it => StructField(it, StringType, nullable = true))
    )

    val usersRdd = spark.sparkContext.textFile(f"$dataPath/users.dat")
    val usersRows = usersRdd.map(_.split("::"))
      .map(it => {
        it.map(_.trim)
      })
      .map(it => Row(it(0), it(1), it(2), it(3), it(4)))
    val usersDF: DataFrame = spark.createDataFrame(usersRows, schema4users)
    val usersDataSet = usersDF.as[User]
    usersDataSet.show(5)
  }
}
学新通

二、DataSet使用练习

1.最常见电影类型
  • 对电影类型进行split,然后再聚合计数
  • 然后再通过计数进行排序
println("最常见电影类型:")
moviesDataSet.select("Genres")
  .flatMap(_(0).toString.split("\\|"))
  .map(genre => (genre, 1))
  .groupBy("_1")
  .sum()
  .withColumnRenamed("_1", "genre")
  .withColumnRenamed("sum(_2)", "sum")
  .orderBy($"sum".desc)
  .show(5)

学新通

最多的居然是戏剧有点意外。

2.最常见的电影类型中最受欢迎的电影
  • 首先筛选出 有戏剧(Drama)类型的电影
  • 然后通过MovieID获取这些电影的观看的用户
  • 然后通过通过电影聚合获取电影的平均评分和评论数
  • 筛选出评论数大于10的电影
  • 最后排序输出
println("最常见的电影类型中最受欢迎的电影(观看人数大于10):")
val mostMovieGenre = "Drama"
moviesDataSet.filter(it => it.Genres.split("\\|").toSet.contains(mostMovieGenre))
  .join(ratingsDataSet.select("MovieID", "Rating"), usingColumn = "MovieID")
  .groupBy("MovieID", "Title")
  .agg("Rating"->"avg", "Rating"->"count")
  .filter($"count(Rating)">10)
  .orderBy($"avg(Rating)".desc, $"count(Rating)".desc)
  .show(3)

学新通

口碑最好的是:七武士

3.获取评论人数最多的电影
  • 直接通过MovieID聚合求出每个电影的评论数
  • 然后再对数量进行排序
println("获取评论数量最多电影:")
ratingsDataSet.groupBy("MovieID")
  .count()
  .orderBy($"count".desc)
  .show(2)

学新通

4.最多评论电影不同年龄段男女观看情况
    println("最多评论电影不同年龄段男女观看情况")
    ratingsDataSet.filter(_.MovieID.equals(mostReviewMovieID)).select("UserID")
      .join(usersDataSet.select("UserID", "Age", "Gender"), "UserID")
      .groupBy("Age")
      .pivot("Gender")
      .count()
      .orderBy($"Age")
      .show()

学新通

5.通过SQL进行操作
  • 通过将DataSet生成视图
  • 然后通过视图名进行SQL操作
ratingsDataSet.createTempView("rating")
spark.sql("select MovieID, count(1) cnt from rating group by MovieID limit 1").show()

 ------- -------- 
|MovieID|count(1)|
 ------- -------- 
|   2294|     645|
 ------- -------- 
6.存储
  • 将DataSet进行存储
  • mode有四种:
    • overwrite: 替换
    • Append:在之后进行追加
    • ErrorIfExists:如果存在则报错,默认方式
    • Ignore:存在的话不进行操作
    println("将DataSet数据存储打到HDFS中")
    ratingsDataSet.write
      .mode(SaveMode.Overwrite)
      .parquet("hdfs://localhost:9000/movieLen/rating")

写入成功:

学新通

完整代码:

/**
 * @author: ffzs
 * @Date: 2021/10/7 上午8:33
 */
object MovieLenDataSet {
  case class User(UserID:String, Gender:String, Age:String, Occupation:String, Zip_Code:String)
  case class Rating(UserID:String, MovieID:String, Rating:Double, Timestamp: String)
  case class Movie(MovieID:String, Title:String, Genres:String)
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val spark = SparkSession.builder()
      .appName("MovieLenDataSet")
      .master("local[*]")
//      .master("spark://localhost:7077")
      .getOrCreate()
    import spark.implicits._

    val dataPath = "/home/ffzs/data/ml-1m"

    // 构造user的DataSet  通过Row转化为DataFrame 然后再转化为DataSet
    val schema4users = StructType(
      "UserID::Gender::Age::Occupation::Zip_code"
        .split("::")
        .map(it => StructField(it, StringType, nullable = true))
    )
    val usersRdd = spark.sparkContext.textFile(f"$dataPath/users.dat")
    val usersRows = usersRdd.map(_.split("::"))
      .map(it => {it.map(_.trim)})
      .map(it => Row(it(0), it(1), it(2), it(3), it(4)))
    val usersDF: DataFrame = spark.createDataFrame(usersRows, schema4users)
    val usersDataSet = usersDF.as[User].cache()

    // 构造movie的DataSet 直接读取文件生成DataFrame 然后转化为DataSet
    val moviesRows = spark.read.textFile(f"$dataPath/movies.dat")
    val moviesDataSet = moviesRows.map(row => {
      val values = row.split("::").map(_.trim)
      Movie(values(0), values(1), values(2))
    }).cache()

    // 构造rating的DataSet
    val ratingsRows = spark.read.textFile(f"$dataPath/ratings.dat")
    val ratingsDataSet = ratingsRows.map(row => {
      val values = row.split("::").map(_.trim)
      Rating(values(0), values(1), values(2).toDouble, values(3))
    }).cache()


    println("最常见的电影类型中最受欢迎的电影(观看人数大于10):")
    val mostMovieGenre = "Drama"
    moviesDataSet.filter(it => it.Genres.split("\\|").toSet.contains(mostMovieGenre))
      .join(ratingsDataSet.select("MovieID", "Rating"), usingColumn = "MovieID")
      .groupBy("MovieID", "Title")
      .agg("Rating"->"avg", "Rating"->"count")
      .filter($"count(Rating)">10)
      .orderBy($"avg(Rating)".desc, $"count(Rating)".desc)
      .show(3)

    println("获取评论数量最多电影ID:")
    val mostReviewMovieID = ratingsDataSet.groupBy("MovieID")
      .count()
      .orderBy($"count".desc)
      .first()(0)

    println("最多评论电影不同年龄段男女观看情况")
    ratingsDataSet.filter(_.MovieID.equals(mostReviewMovieID)).select("UserID")
      .join(usersDataSet.select("UserID", "Age", "Gender"), "UserID")
      .groupBy("Age")
      .pivot("Gender")
      .count()
      .orderBy($"Age")
      .show()

    println("通过SQL语句对dataset数据进行操作")
    ratingsDataSet.createTempView("rating")
    spark.sql("select MovieID, count(1) cnt from rating group by MovieID limit 1").show()

    println("将DataSet数据存储打到HDFS中")
    ratingsDataSet.write
      .mode(SaveMode.Overwrite)
      .parquet("hdfs://localhost:9000/movieLen/rating")
  }
}
学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgkhkkg
系列文章
更多 icon
同类精品
更多 icon
继续加载