大数据技术栈:SparkJSON字符串处理
1 获取SparkSession并导入functions和types包
4 使用get_json_object将"value"中的字符串拆分并重命名
5 使用get_json_object将jsonobj中的dept继续拆分并重命名
7 使用get_json_object对school列中的数据进行拆分得到最终处理结果
前言
Spark能够自动推断出Json数据集的“数据模式”(Schema),并将它加载为一个SchemaRDD实例。这种“自动”的行为是通过下述两种方法实现的:
jsonFile:从一个文件目录中加载数据,这个目录中的文件的每一行均为一个JSON字符串(如果JSON字符串“跨行”,则可能导致解析错误)
jsonRDD:从一个已经存在的RDD中加载数据,这个RDD中的每一个元素均为一个JSON字符串
这里我们仅讨论jsonFile的场景,jsonRDD处理方法类似
在本文中使用Scala Spark实现对以下JSON字符串的整理
1|{"dept":{"describe":"主要负责招生,教学,就业等一系列学校事务","name":"学术部"},"email":"gree@edu.cn","id":79,"name":"gree","stus":[{"grade":"accp","id":1212,"name":"zs","school":{"address":"安德门","leader":"老吴","name":"南京中博"}},{"grade":"bigdata","id":4321,"name":"ww","school":{"address":"安德门3","leader":"老吴3","name":"南京中博3"}},{"grade":"yjs","id":9086,"name":"zq","school":{"address":"安德门2","leader":"老吴2","name":"南京中博2"}}],"tel":"15850500365"}
实操
1 获取SparkSession并导入functions和types包
-
// 1 获取SparkSession并导入functions和types包
-
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("jsonStuOpLog")
-
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
-
val sc: SparkContext = spark.sparkContext
-
import spark.implicits._
-
import org.apache.spark.sql.functions._
-
import org.apache.spark.sql.types._
2 构造数据源
-
// 2 构造数据源
-
val optionRDD: RDD[String] = sc.textFile("in/teacherinfo.txt")
-
optionRDD.foreach(println)
3 用map将字符串按"|"分割为"id"和"value"
-
// 3 用map将字符串按"|"分割为"id"和"value"
-
val option1: RDD[(String, String)] = optionRDD.map(x => {
-
val arr = x.split('|');
-
(arr(0), arr(1))
-
})
-
option1.foreach(println)
-
-
val jsonStrDF: DataFrame = option1.toDF("id", "value")
-
jsonStrDF.printSchema()
-
jsonStrDF.show(false)
4 使用get_json_object将"value"中的字符串拆分并重命名
-
// 4 使用get_json_object将"value"中的字符串拆分并重命名
-
val jsonobj: DataFrame = jsonStrDF.select($"id",
-
get_json_object($"value", "$.dept").as("dept"),
-
get_json_object($"value", "$.email").as("email"),
-
get_json_object($"value", "$.id").as("Tid"),
-
get_json_object($"value", "$.name").as("Tname"),
-
get_json_object($"value", "$.stus").as("stus"),
-
get_json_object($"value", "$.tel").as("tel")
-
)
-
-
jsonobj.printSchema()
-
jsonobj.show()
5 使用get_json_object将jsonobj中的dept继续拆分并重命名
-
// 5 使用get_json_object将jsonobj中的dept继续拆分并重命名
-
val jsonobj1: DataFrame = jsonobj.select($"id"
-
, get_json_object($"dept", "$.describe").as("describe"),
-
get_json_object($"dept", "$.name").as("Dname"),
-
$"email", $"Tid", $"Tname", $"stus",$"tel")
-
-
jsonobj1.printSchema()
-
jsonobj1.show()
6 对stus中的字符串进行炸裂,拆分为3条数据
-
// 6 对stus中的字符串进行炸裂,拆分为3条数据
-
val fields: List[StructField] = StructField("grade",StringType)::
-
StructField("id",IntegerType)::
-
StructField("name",StringType)::
-
StructField("school",StringType)::Nil
-
-
val jsonobj2: DataFrame = jsonobj1.select($"id", $"describe", $"Dname", $"email", $"Tid", $"Tname", from_json($"stus", ArrayType(StructType(fields))).as("stust"),$"tel")
-
-
jsonobj2.show(false)
-
-
val jsonobj3: DataFrame = jsonobj2.withColumn("stust", explode($"stust"))
-
-
val jsonobj4: DataFrame = jsonobj3.withColumn("grade", $"stust.grade")
-
.withColumn("Sid", $"stust.id")
-
.withColumn("name", $"stust.name")
-
.withColumn("school", $"stust.school")
-
.drop("stust")
-
-
jsonobj4.printSchema()
-
jsonobj4.show(false)
7 使用get_json_object对school列中的数据进行拆分得到最终处理结果
-
// 7 使用get_json_object对school列中的数据进行拆分
-
val jsonobj5: DataFrame = jsonobj4.select($"id", $"describe", $"Dname", $"email", $"Tid", $"Tname", $"tel", $"grade", $"name"
-
, get_json_object($"school", "$.address").as("address")
-
, get_json_object($"school", "$.leader").as("leader")
-
, get_json_object($"school", "$.name").as("SCname"))
-
-
jsonobj5.printSchema()
-
jsonobj5.show(false)
8 处理结果写入mysql数据库
-
// 8 写入mysql数据库
-
jsonobj5.write.mode(SaveMode.Append).jdbc(JdbcUtils.url,"teacher",properties)
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhgkbehb
系列文章
更多
同类精品
更多
-
用Scala编程,在Spark RDD下, 实现 WordCount 的8种方式 (1/3)
-
spark执行优化——依赖上传到HDFS二(-conf spark.yarn.dist.jars或者--jars 的使用)
-
大数据ClickHouse十九Flink 写入 ClickHouse API
-
Flink项目4 双流connect和amp;intervalJoin项目
-
ErrorError while executing topic command : Replication factor: 1 larger than available brokers: 0
-
Ubuntu上安装 Spark3.3和Scala2.13的过程
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
怎样阻止微信小程序自动打开
PHP中文网 06-13