• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Spark 结果写入MySQL

武飞扬头像
阳光少年159
帮助2

Spark 将结果写入MySQL

创建Scala-Maven 工程

学新通
导入pom依赖(Flink所有可能用到的依赖)

<properties>
    <scala.version>2.12.10</scala.version>
  </properties>

  <repositories>
    <repository>
      <id>scala-tools.org</id>
      <name>Scala-Tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
    </repository>
  </repositories>

  <pluginRepositories>
    <pluginRepository>
      <id>scala-tools.org</id>
      <name>Scala-Tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
    </pluginRepository>
  </pluginRepositories>

  <dependencies>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.12</artifactId>
      <version>3.1.2</version>
    </dependency>
    <dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
      <version>1.2.17</version>
    </dependency>

    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>1.6.1</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>1.7.30</version>
    </dependency>

    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.48</version>
    </dependency>

    <dependency>
      <groupId>org.apache.hive</groupId>
      <artifactId>hive-jdbc</artifactId>
      <version>3.1.2</version>
    </dependency>

    <!--导入spark SQL的依赖-->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.12</artifactId>
      <version>3.1.2</version>
    </dependency>

    <!--spark如果想整合Hive,必须假如hive的支持 -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-hive_2.12</artifactId>
      <version>3.1.2</version>
    </dependency>

    <dependency>
      <groupId>org.specs</groupId>
      <artifactId>specs</artifactId>
      <version>1.2.5</version>
      <scope>test</scope>
    </dependency>

  </dependencies>
学新通

spark写入mysql

定义工具类:

package cn.kfc.dem01

import java.sql.{Connection, DriverManager, PreparedStatement}

import scala.io.{BufferedSource, Source}


object MyUtils {

//将ip转为Long类型
  def ip2Long(ip: String): Long = {
    val fragments = ip.split("[.]")
    var ipNum = 0L
    for (i <- 0 until fragments.length){
      ipNum =  fragments(i).toLong | ipNum << 8L
    }
    ipNum
  }


//二分法
  def binarySearch(lines: Array[(Long, Long, String)], ip: Long) : Int = {
    var low = 0
    var high = lines.length - 1
    while (low <= high) {
      val middle = (low   high) / 2
      if ((ip >= lines(middle)._1) && (ip <= lines(middle)._2))
        return middle
      if (ip < lines(middle)._1)
        high = middle - 1
      else {
        low = middle   1
      }
    }
    -1
  }

  //写入Mysql中调用的方法
  def dataMysql(it:Iterator[(String,Int)])={
    val conn: Connection = DriverManager.getConnection("jdbc:mysql://192.168.6.160:3306/school?characterEncoding=UTF-8", "root", "12345678")
    val pstm: PreparedStatement = conn.prepareStatement("insert into addr values(?,?)")
    it.foreach(tp=>{
      pstm.setString(1,tp._1)
      pstm.setInt(2,tp._2)
      pstm.executeUpdate()
    })
    if(pstm != null){
      pstm.close()
    }
    if(conn != null){
      conn.close()
    }
  }
}
学新通
package cn.kfc.dem01

//写入Mysql
import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object IpLoaction3 {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("IpLoaction2").setMaster("local[*]")

    val sc = new SparkContext(conf)

  val ipRulesRDD: RDD[(Long, Long, String)] = sc.textFile("hdfs://192.168.6.160:9820/user/root/point1",4).map(line => {
      val splits = line.split("\\|")
      val startNum = splits(2).toLong
      val endNum = splits(3).toLong
      val province = splits(6)
      (startNum, endNum, province)
    })
    //将分散在多个Executor中的部分ip规则收集到Driver端
  val rulesInDriver: Array[(Long, Long, String)] = ipRulesRDD.collect()

    //将Drive端的数据广播到Executor中

    //调用sc上的广播方法
    //广播变量的引用(还在Driver端)   一旦广播出去,就没法改了,通常是不变的规则
   val broadcastRef: Broadcast[Array[(Long, Long, String)]] = sc.broadcast(rulesInDriver)

    val func=(line:String)=>{
      val splits = line.split("\\|")
      val ip = splits(1)
      //将ip转换为十进制
      val ipNum = MyUtils.ip2Long(ip)
      //进行二分法查找,通过Driver端的引用或取到Executor中的广播变量
      //通过广播变量的引用,拿到Executor中的广播规则
      //Driver端广播变量的引用是如何跑到Executor中的:
      //Task是在Driver端生成的,广播变量的引用伴随者Task被发送到Executor中的
      val rulesInExecutor: Array[(Long, Long, String)] = broadcastRef.value
      //查找
      var province= "未知 "
      val index = MyUtils.binarySearch(rulesInDriver, ipNum)
      if(index != -1) {
        province = rulesInExecutor(index)._3
      }
     (province,1)
    }

    //创建RDD,读取访问日志
    val reduced: RDD[(String, Int)] = sc.textFile("hdfs://192.168.6.160:9820/user/root/point2", 4)
      .map(func) //整理数据
      .reduceByKey(_   _)
学新通
 //控制台输出
     reduced.collect().foreach(println)
     println(reduced.collect.toBuffer)
   //写入hdfs
 reduced.saveAsTextFile("hdfs://192.168.6.160:9820/user/root/teop")
    //写入本地指定路径
reduced.saveAsTextFile("file:///D:\\zb\\project\\spark\\spark01\\table\\data03")
  println("========写入MySQL================")

   
  方法一(不推荐)
    reduced.foreach(tp=>{
      //将数据写入Mysql中
      //在哪一端获取到Mysql的连接:在Executor中的Task获取的JDBC连接
      val conn: Connection = DriverManager.getConnection("jdbc:mysql://192.168.6.160:3306/school?characterEncoding=utf-8", "root", "12345678")
      //写入大量数据的时候:问题
      //每写入一条就要拿一条Jdbc连接,1千万条数据就要有一千万条连接
      val pstm = conn.prepareStatement("insert into addr values(?,?)")
      pstm.setString(1,tp._1)
      pstm.setInt(2,tp._2)
      pstm.executeUpdate()
      pstm.close()
      conn.close()

    })  

     //方法二:
//一次拿出一个分区(一个分区用一个连接,可以将一个分区的多条数据写完再释放JDBC连接,节省资源)
    reduced.foreachPartition(it=>{
      val conn: Connection = DriverManager.getConnection("jdbc:mysql://192.168.6.160:3306/school?characterEncoding=UTF-8", "root", "12345678")
      val pstm: PreparedStatement = conn.prepareStatement("insert into addr values(?,?)")
      //将一个分区中的每一条数据拿出来
      it.foreach(tp=>{
        pstm.setString(1,tp._1)
        pstm.setInt(2,tp._2)
        pstm.executeUpdate()
      })
      pstm.close()
      conn.close()
    })

    //方法三:在MyUtils封装方法

 reduced.foreachPartition(it=>MyUtils.dataMysql(it))

    sc.stop()

  }
}
学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgabghh
系列文章
更多 icon
同类精品
更多 icon
继续加载