spark进阶五DataFrame和DataSet使用
spark进阶(五):DataFrame和DataSet使用
DataFrame是Spark SQL提供的一个编程抽象,与RDD类似,也是一个分布式的数据集合。但与RDD不同的是,DataFrame的数据都被组织到有名字的列中,就像关系型数据库中的表一样。此外,多种数据都可以转化为DataFrame,例如Spark计算过程中生成的RDD、结构化数据文件、Hive中的表、外部数据库等。
在Spark中,一个DataFrame所代表的是一个元素类型为Row的Dataset,即DataFrame只是Dataset[Row]的一个类型别名。相对于RDD,Dataset提供了强类型支持,在RDD的每行数据加了类型约束。而且使用DatasetAPI同样会经过Spark SQL优化器的优化,从而提高程序执行效率。
DataFrame和R的数据结构以及python pandas DataFrame的数据结构和操作基本一致。
一、创建DataFrame、DataSet
- 创建RDD
- RDD转化为ROW
- 通过ROW和元数据信息生成DataFrame
- 然后通过DataFrame和对应的类转化为DataSet
- 也就是说DataFrame是DataSet[Row],这里可以通过指定的类将其转化,DataSet[User]
- 需要注意的事转化使用的类需要时内部类,然后就是类里的变量名要和元数据信息的列名保持对齐
/**
* @author: ffzs
* @Date: 2021/10/7 上午8:33
*/
object MovieLenDataSet {
case class User(UserID:String, Gender:String, Age:String, Occupation:String, Zip_Code:String)
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val spark = SparkSession.builder()
.appName("MovieLenDataSet")
.master("local[*]")
.getOrCreate()
import spark.implicits._
val dataPath = "/home/ffzs/data/ml-1m"
val schema4users = StructType(
"UserID::Gender::Age::Occupation::Zip_code"
.split("::")
.map(it => StructField(it, StringType, nullable = true))
)
val usersRdd = spark.sparkContext.textFile(f"$dataPath/users.dat")
val usersRows = usersRdd.map(_.split("::"))
.map(it => {
it.map(_.trim)
})
.map(it => Row(it(0), it(1), it(2), it(3), it(4)))
val usersDF: DataFrame = spark.createDataFrame(usersRows, schema4users)
val usersDataSet = usersDF.as[User]
usersDataSet.show(5)
}
}
二、DataSet使用练习
1.最常见电影类型
- 对电影类型进行split,然后再聚合计数
- 然后再通过计数进行排序
println("最常见电影类型:")
moviesDataSet.select("Genres")
.flatMap(_(0).toString.split("\\|"))
.map(genre => (genre, 1))
.groupBy("_1")
.sum()
.withColumnRenamed("_1", "genre")
.withColumnRenamed("sum(_2)", "sum")
.orderBy($"sum".desc)
.show(5)
最多的居然是戏剧有点意外。
2.最常见的电影类型中最受欢迎的电影
- 首先筛选出 有戏剧(Drama)类型的电影
- 然后通过MovieID获取这些电影的观看的用户
- 然后通过通过电影聚合获取电影的平均评分和评论数
- 筛选出评论数大于10的电影
- 最后排序输出
println("最常见的电影类型中最受欢迎的电影(观看人数大于10):")
val mostMovieGenre = "Drama"
moviesDataSet.filter(it => it.Genres.split("\\|").toSet.contains(mostMovieGenre))
.join(ratingsDataSet.select("MovieID", "Rating"), usingColumn = "MovieID")
.groupBy("MovieID", "Title")
.agg("Rating"->"avg", "Rating"->"count")
.filter($"count(Rating)">10)
.orderBy($"avg(Rating)".desc, $"count(Rating)".desc)
.show(3)
口碑最好的是:七武士
3.获取评论人数最多的电影
- 直接通过MovieID聚合求出每个电影的评论数
- 然后再对数量进行排序
println("获取评论数量最多电影:")
ratingsDataSet.groupBy("MovieID")
.count()
.orderBy($"count".desc)
.show(2)
4.最多评论电影不同年龄段男女观看情况
println("最多评论电影不同年龄段男女观看情况")
ratingsDataSet.filter(_.MovieID.equals(mostReviewMovieID)).select("UserID")
.join(usersDataSet.select("UserID", "Age", "Gender"), "UserID")
.groupBy("Age")
.pivot("Gender")
.count()
.orderBy($"Age")
.show()
5.通过SQL进行操作
- 通过将DataSet生成视图
- 然后通过视图名进行SQL操作
ratingsDataSet.createTempView("rating")
spark.sql("select MovieID, count(1) cnt from rating group by MovieID limit 1").show()
------- --------
|MovieID|count(1)|
------- --------
| 2294| 645|
------- --------
6.存储
- 将DataSet进行存储
- mode有四种:
- overwrite: 替换
- Append:在之后进行追加
- ErrorIfExists:如果存在则报错,默认方式
- Ignore:存在的话不进行操作
println("将DataSet数据存储打到HDFS中")
ratingsDataSet.write
.mode(SaveMode.Overwrite)
.parquet("hdfs://localhost:9000/movieLen/rating")
写入成功:
完整代码:
/**
* @author: ffzs
* @Date: 2021/10/7 上午8:33
*/
object MovieLenDataSet {
case class User(UserID:String, Gender:String, Age:String, Occupation:String, Zip_Code:String)
case class Rating(UserID:String, MovieID:String, Rating:Double, Timestamp: String)
case class Movie(MovieID:String, Title:String, Genres:String)
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val spark = SparkSession.builder()
.appName("MovieLenDataSet")
.master("local[*]")
// .master("spark://localhost:7077")
.getOrCreate()
import spark.implicits._
val dataPath = "/home/ffzs/data/ml-1m"
// 构造user的DataSet 通过Row转化为DataFrame 然后再转化为DataSet
val schema4users = StructType(
"UserID::Gender::Age::Occupation::Zip_code"
.split("::")
.map(it => StructField(it, StringType, nullable = true))
)
val usersRdd = spark.sparkContext.textFile(f"$dataPath/users.dat")
val usersRows = usersRdd.map(_.split("::"))
.map(it => {it.map(_.trim)})
.map(it => Row(it(0), it(1), it(2), it(3), it(4)))
val usersDF: DataFrame = spark.createDataFrame(usersRows, schema4users)
val usersDataSet = usersDF.as[User].cache()
// 构造movie的DataSet 直接读取文件生成DataFrame 然后转化为DataSet
val moviesRows = spark.read.textFile(f"$dataPath/movies.dat")
val moviesDataSet = moviesRows.map(row => {
val values = row.split("::").map(_.trim)
Movie(values(0), values(1), values(2))
}).cache()
// 构造rating的DataSet
val ratingsRows = spark.read.textFile(f"$dataPath/ratings.dat")
val ratingsDataSet = ratingsRows.map(row => {
val values = row.split("::").map(_.trim)
Rating(values(0), values(1), values(2).toDouble, values(3))
}).cache()
println("最常见的电影类型中最受欢迎的电影(观看人数大于10):")
val mostMovieGenre = "Drama"
moviesDataSet.filter(it => it.Genres.split("\\|").toSet.contains(mostMovieGenre))
.join(ratingsDataSet.select("MovieID", "Rating"), usingColumn = "MovieID")
.groupBy("MovieID", "Title")
.agg("Rating"->"avg", "Rating"->"count")
.filter($"count(Rating)">10)
.orderBy($"avg(Rating)".desc, $"count(Rating)".desc)
.show(3)
println("获取评论数量最多电影ID:")
val mostReviewMovieID = ratingsDataSet.groupBy("MovieID")
.count()
.orderBy($"count".desc)
.first()(0)
println("最多评论电影不同年龄段男女观看情况")
ratingsDataSet.filter(_.MovieID.equals(mostReviewMovieID)).select("UserID")
.join(usersDataSet.select("UserID", "Age", "Gender"), "UserID")
.groupBy("Age")
.pivot("Gender")
.count()
.orderBy($"Age")
.show()
println("通过SQL语句对dataset数据进行操作")
ratingsDataSet.createTempView("rating")
spark.sql("select MovieID, count(1) cnt from rating group by MovieID limit 1").show()
println("将DataSet数据存储打到HDFS中")
ratingsDataSet.write
.mode(SaveMode.Overwrite)
.parquet("hdfs://localhost:9000/movieLen/rating")
}
}
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhgkhkkg
系列文章
更多
同类精品
更多
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
怎样阻止微信小程序自动打开
PHP中文网 06-13