• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

大数据技术栈:SparkJSON字符串处理

武飞扬头像
请叫我小飞机
帮助1

目录

前言

实操

1 获取SparkSession并导入functions和types包

2 构造数据源

3 用map将字符串按"|"分割为"id"和"value"

4 使用get_json_object将"value"中的字符串拆分并重命名

 5 使用get_json_object将jsonobj中的dept继续拆分并重命名

 6 对stus中的字符串进行炸裂,拆分为3条数据

 7 使用get_json_object对school列中的数据进行拆分得到最终处理结果

 8 处理结果写入mysql数据库


前言

        Spark能够自动推断出Json数据集的“数据模式”(Schema),并将它加载为一个SchemaRDD实例。这种“自动”的行为是通过下述两种方法实现的:

        jsonFile:从一个文件目录中加载数据,这个目录中的文件的每一行均为一个JSON字符串(如果JSON字符串“跨行”,则可能导致解析错误)

        jsonRDD:从一个已经存在的RDD中加载数据,这个RDD中的每一个元素均为一个JSON字符串

        这里我们仅讨论jsonFile的场景,jsonRDD处理方法类似

        在本文中使用Scala Spark实现对以下JSON字符串的整理

1|{"dept":{"describe":"主要负责招生,教学,就业等一系列学校事务","name":"学术部"},"email":"gree@edu.cn","id":79,"name":"gree","stus":[{"grade":"accp","id":1212,"name":"zs","school":{"address":"安德门","leader":"老吴","name":"南京中博"}},{"grade":"bigdata","id":4321,"name":"ww","school":{"address":"安德门3","leader":"老吴3","name":"南京中博3"}},{"grade":"yjs","id":9086,"name":"zq","school":{"address":"安德门2","leader":"老吴2","name":"南京中博2"}}],"tel":"15850500365"}

实操

1 获取SparkSession并导入functions和types包

  1.  
    // 1 获取SparkSession并导入functions和types包
  2.  
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("jsonStuOpLog")
  3.  
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
  4.  
    val sc: SparkContext = spark.sparkContext
  5.  
    import spark.implicits._
  6.  
    import org.apache.spark.sql.functions._
  7.  
    import org.apache.spark.sql.types._

2 构造数据源

  1.  
    // 2 构造数据源
  2.  
    val optionRDD: RDD[String] = sc.textFile("in/teacherinfo.txt")
  3.  
    optionRDD.foreach(println)

3 用map将字符串按"|"分割为"id"和"value"

  1.  
    // 3 用map将字符串按"|"分割为"id"和"value"
  2.  
    val option1: RDD[(String, String)] = optionRDD.map(x => {
  3.  
    val arr = x.split('|');
  4.  
    (arr(0), arr(1))
  5.  
    })
  6.  
    option1.foreach(println)
  7.  
     
  8.  
    val jsonStrDF: DataFrame = option1.toDF("id", "value")
  9.  
    jsonStrDF.printSchema()
  10.  
    jsonStrDF.show(false)

学新通

4 使用get_json_object将"value"中的字符串拆分并重命名

  1.  
    // 4 使用get_json_object将"value"中的字符串拆分并重命名
  2.  
    val jsonobj: DataFrame = jsonStrDF.select($"id",
  3.  
    get_json_object($"value", "$.dept").as("dept"),
  4.  
    get_json_object($"value", "$.email").as("email"),
  5.  
    get_json_object($"value", "$.id").as("Tid"),
  6.  
    get_json_object($"value", "$.name").as("Tname"),
  7.  
    get_json_object($"value", "$.stus").as("stus"),
  8.  
    get_json_object($"value", "$.tel").as("tel")
  9.  
    )
  10.  
     
  11.  
    jsonobj.printSchema()
  12.  
    jsonobj.show()

学新通

 5 使用get_json_object将jsonobj中的dept继续拆分并重命名

  1.  
    // 5 使用get_json_object将jsonobj中的dept继续拆分并重命名
  2.  
    val jsonobj1: DataFrame = jsonobj.select($"id"
  3.  
    , get_json_object($"dept", "$.describe").as("describe"),
  4.  
    get_json_object($"dept", "$.name").as("Dname"),
  5.  
    $"email", $"Tid", $"Tname", $"stus",$"tel")
  6.  
     
  7.  
    jsonobj1.printSchema()
  8.  
    jsonobj1.show()

学新通

 6 对stus中的字符串进行炸裂,拆分为3条数据

  1.  
    // 6 对stus中的字符串进行炸裂,拆分为3条数据
  2.  
    val fields: List[StructField] = StructField("grade",StringType)::
  3.  
    StructField("id",IntegerType)::
  4.  
    StructField("name",StringType)::
  5.  
    StructField("school",StringType)::Nil
  6.  
     
  7.  
    val jsonobj2: DataFrame = jsonobj1.select($"id", $"describe", $"Dname", $"email", $"Tid", $"Tname", from_json($"stus", ArrayType(StructType(fields))).as("stust"),$"tel")
  8.  
     
  9.  
    jsonobj2.show(false)
  10.  
     
  11.  
    val jsonobj3: DataFrame = jsonobj2.withColumn("stust", explode($"stust"))
  12.  
     
  13.  
    val jsonobj4: DataFrame = jsonobj3.withColumn("grade", $"stust.grade")
  14.  
    .withColumn("Sid", $"stust.id")
  15.  
    .withColumn("name", $"stust.name")
  16.  
    .withColumn("school", $"stust.school")
  17.  
    .drop("stust")
  18.  
     
  19.  
    jsonobj4.printSchema()
  20.  
    jsonobj4.show(false)
学新通

学新通

 7 使用get_json_object对school列中的数据进行拆分得到最终处理结果

  1.  
    // 7 使用get_json_object对school列中的数据进行拆分
  2.  
    val jsonobj5: DataFrame = jsonobj4.select($"id", $"describe", $"Dname", $"email", $"Tid", $"Tname", $"tel", $"grade", $"name"
  3.  
    , get_json_object($"school", "$.address").as("address")
  4.  
    , get_json_object($"school", "$.leader").as("leader")
  5.  
    , get_json_object($"school", "$.name").as("SCname"))
  6.  
     
  7.  
    jsonobj5.printSchema()
  8.  
    jsonobj5.show(false)

学新通

 8 处理结果写入mysql数据库

  1.  
    // 8 写入mysql数据库
  2.  
    jsonobj5.write.mode(SaveMode.Append).jdbc(JdbcUtils.url,"teacher",properties)

学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgkbehb
系列文章
更多 icon
同类精品
更多 icon
继续加载