Spark 结果写入MySQL
Spark 将结果写入MySQL
创建Scala-Maven 工程
导入pom依赖(Flink所有可能用到的依赖)
<properties>
<scala.version>2.12.10</scala.version>
</properties>
<repositories>
<repository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</pluginRepository>
</pluginRepositories>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.30</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.48</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>3.1.2</version>
</dependency>
<!--导入spark SQL的依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!--spark如果想整合Hive,必须假如hive的支持 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.specs</groupId>
<artifactId>specs</artifactId>
<version>1.2.5</version>
<scope>test</scope>
</dependency>
</dependencies>
spark写入mysql
定义工具类:
package cn.kfc.dem01
import java.sql.{Connection, DriverManager, PreparedStatement}
import scala.io.{BufferedSource, Source}
object MyUtils {
//将ip转为Long类型
def ip2Long(ip: String): Long = {
val fragments = ip.split("[.]")
var ipNum = 0L
for (i <- 0 until fragments.length){
ipNum = fragments(i).toLong | ipNum << 8L
}
ipNum
}
//二分法
def binarySearch(lines: Array[(Long, Long, String)], ip: Long) : Int = {
var low = 0
var high = lines.length - 1
while (low <= high) {
val middle = (low high) / 2
if ((ip >= lines(middle)._1) && (ip <= lines(middle)._2))
return middle
if (ip < lines(middle)._1)
high = middle - 1
else {
low = middle 1
}
}
-1
}
//写入Mysql中调用的方法
def dataMysql(it:Iterator[(String,Int)])={
val conn: Connection = DriverManager.getConnection("jdbc:mysql://192.168.6.160:3306/school?characterEncoding=UTF-8", "root", "12345678")
val pstm: PreparedStatement = conn.prepareStatement("insert into addr values(?,?)")
it.foreach(tp=>{
pstm.setString(1,tp._1)
pstm.setInt(2,tp._2)
pstm.executeUpdate()
})
if(pstm != null){
pstm.close()
}
if(conn != null){
conn.close()
}
}
}
package cn.kfc.dem01
//写入Mysql
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object IpLoaction3 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("IpLoaction2").setMaster("local[*]")
val sc = new SparkContext(conf)
val ipRulesRDD: RDD[(Long, Long, String)] = sc.textFile("hdfs://192.168.6.160:9820/user/root/point1",4).map(line => {
val splits = line.split("\\|")
val startNum = splits(2).toLong
val endNum = splits(3).toLong
val province = splits(6)
(startNum, endNum, province)
})
//将分散在多个Executor中的部分ip规则收集到Driver端
val rulesInDriver: Array[(Long, Long, String)] = ipRulesRDD.collect()
//将Drive端的数据广播到Executor中
//调用sc上的广播方法
//广播变量的引用(还在Driver端) 一旦广播出去,就没法改了,通常是不变的规则
val broadcastRef: Broadcast[Array[(Long, Long, String)]] = sc.broadcast(rulesInDriver)
val func=(line:String)=>{
val splits = line.split("\\|")
val ip = splits(1)
//将ip转换为十进制
val ipNum = MyUtils.ip2Long(ip)
//进行二分法查找,通过Driver端的引用或取到Executor中的广播变量
//通过广播变量的引用,拿到Executor中的广播规则
//Driver端广播变量的引用是如何跑到Executor中的:
//Task是在Driver端生成的,广播变量的引用伴随者Task被发送到Executor中的
val rulesInExecutor: Array[(Long, Long, String)] = broadcastRef.value
//查找
var province= "未知 "
val index = MyUtils.binarySearch(rulesInDriver, ipNum)
if(index != -1) {
province = rulesInExecutor(index)._3
}
(province,1)
}
//创建RDD,读取访问日志
val reduced: RDD[(String, Int)] = sc.textFile("hdfs://192.168.6.160:9820/user/root/point2", 4)
.map(func) //整理数据
.reduceByKey(_ _)
//控制台输出
reduced.collect().foreach(println)
println(reduced.collect.toBuffer)
//写入hdfs
reduced.saveAsTextFile("hdfs://192.168.6.160:9820/user/root/teop")
//写入本地指定路径
reduced.saveAsTextFile("file:///D:\\zb\\project\\spark\\spark01\\table\\data03")
println("========写入MySQL================")
方法一(不推荐)
reduced.foreach(tp=>{
//将数据写入Mysql中
//在哪一端获取到Mysql的连接:在Executor中的Task获取的JDBC连接
val conn: Connection = DriverManager.getConnection("jdbc:mysql://192.168.6.160:3306/school?characterEncoding=utf-8", "root", "12345678")
//写入大量数据的时候:问题
//每写入一条就要拿一条Jdbc连接,1千万条数据就要有一千万条连接
val pstm = conn.prepareStatement("insert into addr values(?,?)")
pstm.setString(1,tp._1)
pstm.setInt(2,tp._2)
pstm.executeUpdate()
pstm.close()
conn.close()
})
//方法二:
//一次拿出一个分区(一个分区用一个连接,可以将一个分区的多条数据写完再释放JDBC连接,节省资源)
reduced.foreachPartition(it=>{
val conn: Connection = DriverManager.getConnection("jdbc:mysql://192.168.6.160:3306/school?characterEncoding=UTF-8", "root", "12345678")
val pstm: PreparedStatement = conn.prepareStatement("insert into addr values(?,?)")
//将一个分区中的每一条数据拿出来
it.foreach(tp=>{
pstm.setString(1,tp._1)
pstm.setInt(2,tp._2)
pstm.executeUpdate()
})
pstm.close()
conn.close()
})
//方法三:在MyUtils封装方法
reduced.foreachPartition(it=>MyUtils.dataMysql(it))
sc.stop()
}
}
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhgabghh
系列文章
更多
同类精品
更多
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
怎样阻止微信小程序自动打开
PHP中文网 06-13 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01