• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

spark和hive,mysql交互

武飞扬头像
Eternal_Date
帮助1

spark读取hive表的数据处理后存到mysql

●agg返回DF类型 括号里接收的是列 所以可以在括号中给列起别名
○直接写count返回的是df 无法给列起别名
●join 所要查询的数据放在leftjoin左边
●注意方法的返回值 确定返回类型是df还是其他类型
●当遇到联查列重复时,对应的df(列名)
●join的写法
○df1.join(df2,Seq(列名),"left")
○rdf1.join(df2,df1(列名)===df2(列名),"left")

  1.  
    package expandword
  2.  
     
  3.  
    import org.apache.spark.sql.SparkSession
  4.  
    import org.apache.spark.sql.expressions.Window
  5.  
    import org.apache.spark.sql.functions.{col, date_format, desc, rank, sum}
  6.  
     
  7.  
    object four {
  8.  
    def main(args: Array[String]): Unit = {
  9.  
    val spark = SparkSession
  10.  
    .builder()
  11.  
    .appName("rigion")
  12.  
    .enableHiveSupport()
  13.  
    .getOrCreate()
  14.  
     
  15.  
    //使用hive的ods库
  16.  
    spark.sql("use shenyunhang")
  17.  
     
  18.  
    val ord=spark.table("orders")
  19.  
    val cus=spark.table("customer")
  20.  
    val nat=spark.table("nation")
  21.  
    val reg=spark.table("region ")
  22.  
     
  23.  
    val rs=ord
  24.  
    .join(cus,Seq("custkey"),joinType = "left")
  25.  
    .join(nat,cus.col("nationkey")===nat.col("nationkey"))
  26.  
    .join(reg,nat.col("regionkey")===reg.col("regionkey"))
  27.  
    .select(
  28.  
    nat("name").as("nname"),
  29.  
    reg("name").as("rname"),
  30.  
    ord("totalprice"),
  31.  
    (date_format(ord("orderdate"),"yyyyMM")).as("times")
  32.  
    )
  33.  
    .groupBy(col("nname"),col("rname"),col("times"))
  34.  
    .agg(
  35.  
    sum("totalprice").as("sum")
  36.  
    )
  37.  
    .orderBy("sum")
  38.  
    .select(
  39.  
    col("nname"),
  40.  
    col("rname"),
  41.  
    col("sum"),
  42.  
    rank() over(Window.orderBy(desc("sum")))
  43.  
    )
  44.  
     
  45.  
    rs.show()
  46.  
     
  47.  
    //落地
  48.  
    rs.coalesce(1).write
  49.  
    .format("jdbc")
  50.  
    .mode(SaveMode.Overwrite)
  51.  
    .option("driver","com.mysql.cj.jdbc.Driver")
  52.  
    .option("url","jdbc:mysql://59.111.104.241:9999/job023_group4")
  53.  
    .option("dbtable","ORDERS")
  54.  
    .option("user","job023_group4")
  55.  
    .option("password","job023@TL")
  56.  
    .save()
  57.  
    }
  58.  
    }
学新通

插入mysql的增量数据到hive、动态分区

需要注意的是在我们从MySQl拿到数据动态分区插入到Hive中时,是需要配置的。
●开启动态分区参数设置(还有其他配置,这里用这两个就可以)
○hive.exec.dynamic.partition=true
■开启动态分区功能(默认 true ,开启)
○hive.exec.dynamic.partition.mode=nonstrict
■设置为非严格模式(动态分区的模式,默认 strict ,表示必须指定至少一个分区为静态分区, nonstrict 模式表示允许所有的分区字段都可以使用动态分区。)
●在load完后,使用.where可以对加载出来的数据进行筛选
●在这里因为动态分区有格式要求,所以用hive中自带的date_format()方法,进行格式转换
●增量的概念就是每次插入的时候,插进去的数据hive中原来没有的数据,而不是overrite全部重新加载到里边,在这里用到的是在mysql中查到的数据创建一个视图,然后再hive中sql对这个视图和hive中数据的表进行left join查询,最后只取null的数据

  1.  
    package expandword
  2.  
     
  3.  
    import org.apache.spark.sql.{SaveMode, SparkSession}
  4.  
     
  5.  
    object MysqlToHive2 {
  6.  
    def main(args: Array[String]): Unit = {
  7.  
     
  8.  
    val sparkSession = SparkSession
  9.  
    .builder()
  10.  
    .appName("reda mysql to hive limit")
  11.  
    // .master("local[*]")
  12.  
    .enableHiveSupport()
  13.  
    .config("hive.exec.dynamic.partition","true")
  14.  
    .config("hive.exec.dynamic.partition.mode","nonstrict")
  15.  
    .getOrCreate()
  16.  
     
  17.  
    //加载mysql的数据
  18.  
    val df = sparkSession.read
  19.  
    .format("jdbc")
  20.  
    .option("driver","com.mysql.cj.jdbc.Driver")
  21.  
    .option("url","jdbc:mysql://59.111.104.241:9999/job023_group4")
  22.  
    .option("dbtable","ORDERS")
  23.  
    .option("user","job023_group4")
  24.  
    .option("password","job023@TL")
  25.  
    .load()
  26.  
    .where( "orderdate>='1997-12-1'")
  27.  
    .createOrReplaceTempView("mysql_orders")
  28.  
     
  29.  
    //使用hive中的数据
  30.  
    sparkSession.sql("use shenyunhang")
  31.  
     
  32.  
    sparkSession.sql(
  33.  
    """
  34.  
    |insert into table orders1 partition(times)
  35.  
    |select t1.*,date_format(t1.orderdate,'yyyyMMdd') times
  36.  
    |from
  37.  
    | mysql_orders t1
  38.  
    |left join
  39.  
    | orders1 t2
  40.  
    |on
  41.  
    | t1.orderkey = t2.orderkey
  42.  
    |where t2.orderkey is null
  43.  
    |""".stripMargin)
  44.  
    }
  45.  
    }
学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhggjheg
系列文章
更多 icon
同类精品
更多 icon
继续加载