spark和hive,mysql交互
spark读取hive表的数据处理后存到mysql
●agg返回DF类型 括号里接收的是列 所以可以在括号中给列起别名
○直接写count返回的是df 无法给列起别名
●join 所要查询的数据放在leftjoin左边
●注意方法的返回值 确定返回类型是df还是其他类型
●当遇到联查列重复时,对应的df(列名)
●join的写法
○df1.join(df2,Seq(列名),"left")
○rdf1.join(df2,df1(列名)===df2(列名),"left")
-
package expandword
-
-
import org.apache.spark.sql.SparkSession
-
import org.apache.spark.sql.expressions.Window
-
import org.apache.spark.sql.functions.{col, date_format, desc, rank, sum}
-
-
object four {
-
def main(args: Array[String]): Unit = {
-
val spark = SparkSession
-
.builder()
-
.appName("rigion")
-
.enableHiveSupport()
-
.getOrCreate()
-
-
//使用hive的ods库
-
spark.sql("use shenyunhang")
-
-
val ord=spark.table("orders")
-
val cus=spark.table("customer")
-
val nat=spark.table("nation")
-
val reg=spark.table("region ")
-
-
val rs=ord
-
.join(cus,Seq("custkey"),joinType = "left")
-
.join(nat,cus.col("nationkey")===nat.col("nationkey"))
-
.join(reg,nat.col("regionkey")===reg.col("regionkey"))
-
.select(
-
nat("name").as("nname"),
-
reg("name").as("rname"),
-
ord("totalprice"),
-
(date_format(ord("orderdate"),"yyyyMM")).as("times")
-
)
-
.groupBy(col("nname"),col("rname"),col("times"))
-
.agg(
-
sum("totalprice").as("sum")
-
)
-
.orderBy("sum")
-
.select(
-
col("nname"),
-
col("rname"),
-
col("sum"),
-
rank() over(Window.orderBy(desc("sum")))
-
)
-
-
rs.show()
-
-
//落地
-
rs.coalesce(1).write
-
.format("jdbc")
-
.mode(SaveMode.Overwrite)
-
.option("driver","com.mysql.cj.jdbc.Driver")
-
.option("url","jdbc:mysql://59.111.104.241:9999/job023_group4")
-
.option("dbtable","ORDERS")
-
.option("user","job023_group4")
-
.option("password","job023@TL")
-
.save()
-
}
-
}
插入mysql的增量数据到hive、动态分区
需要注意的是在我们从MySQl拿到数据动态分区插入到Hive中时,是需要配置的。
●开启动态分区参数设置(还有其他配置,这里用这两个就可以)
○hive.exec.dynamic.partition=true
■开启动态分区功能(默认 true ,开启)
○hive.exec.dynamic.partition.mode=nonstrict
■设置为非严格模式(动态分区的模式,默认 strict ,表示必须指定至少一个分区为静态分区, nonstrict 模式表示允许所有的分区字段都可以使用动态分区。)
●在load完后,使用.where可以对加载出来的数据进行筛选
●在这里因为动态分区有格式要求,所以用hive中自带的date_format()方法,进行格式转换
●增量的概念就是每次插入的时候,插进去的数据hive中原来没有的数据,而不是overrite全部重新加载到里边,在这里用到的是在mysql中查到的数据创建一个视图,然后再hive中sql对这个视图和hive中数据的表进行left join查询,最后只取null的数据
-
package expandword
-
-
import org.apache.spark.sql.{SaveMode, SparkSession}
-
-
object MysqlToHive2 {
-
def main(args: Array[String]): Unit = {
-
-
val sparkSession = SparkSession
-
.builder()
-
.appName("reda mysql to hive limit")
-
// .master("local[*]")
-
.enableHiveSupport()
-
.config("hive.exec.dynamic.partition","true")
-
.config("hive.exec.dynamic.partition.mode","nonstrict")
-
.getOrCreate()
-
-
//加载mysql的数据
-
val df = sparkSession.read
-
.format("jdbc")
-
.option("driver","com.mysql.cj.jdbc.Driver")
-
.option("url","jdbc:mysql://59.111.104.241:9999/job023_group4")
-
.option("dbtable","ORDERS")
-
.option("user","job023_group4")
-
.option("password","job023@TL")
-
.load()
-
.where( "orderdate>='1997-12-1'")
-
.createOrReplaceTempView("mysql_orders")
-
-
//使用hive中的数据
-
sparkSession.sql("use shenyunhang")
-
-
sparkSession.sql(
-
"""
-
|insert into table orders1 partition(times)
-
|select t1.*,date_format(t1.orderdate,'yyyyMMdd') times
-
|from
-
| mysql_orders t1
-
|left join
-
| orders1 t2
-
|on
-
| t1.orderkey = t2.orderkey
-
|where t2.orderkey is null
-
|""".stripMargin)
-
}
-
}
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhggjheg
系列文章
更多
同类精品
更多
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
怎样阻止微信小程序自动打开
PHP中文网 06-13