• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

spark-sql字段级血缘关系实现

武飞扬头像
Chocolate?
帮助3

1.背景:

血缘关系非常重要,因为有了字段间的血缘关系,便可以知道数据的来源去处,以及字段之间的转换关系,这样对数据的质量,治理有很大的帮助。

Spark SQL 相对于 Hive 来说通常情况下效率会比较高,对于运行时间、资源的使用上面等都会有较大的收益。所以考虑将采用MapReduce引擎执行的sql进行迭代,以spark引擎执行。但同时也需要实现字段血缘的功能。hive血缘关系实现较为简单,攻略也比较多,这spark血缘关系攻略较少,这里提供一种解析思路。

2.需求:

在使用spark引擎执行sql时,将表与表,字段与字段的血缘信息解析出来,可视化展示。

3.思路:

使用QueryExecutionListener对spark进行监听,读取出sparkplan(物理计划),解析其中包含的血缘关系,将血缘关系导入neo4j,spring-boot写接口,前端请求返回表的血缘关系。

4.实现:

QueryExecutionListener:监听和用于分析spark-sql执行过程中的的一些指标

The interface of query execution listener that can be used to analyze execution metrics.

  1.  
    trait QueryExecutionListener {
  2.  
    @DeveloperApi
  3.  
    def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit
  4.  
     
  5.  
    @DeveloperApi
  6.  
    def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit
  7.  
    }
  1.  
    class SparkListenerTest extends QueryExecutionListener{
  2.  
     
  3.  
    override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {
  4.  
     
  5.  
    }
  6.  
     
  7.  
    override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
  8.  
    val sparkPlanJson: String = qe.sparkPlan.prettyJson
  9.  
     
  10.  
    }
  11.  
    }

了解一下整个sql在spark中的解析过程

sql —(ANTLR4) —> AST —(Spark AstBuilder) —> Unresolved LogicalPlan —  (Catalog) —>  Resolved LogicalPlan  — (Optimizer) —> Optimized LogicalPlan — (SparkPlanner) —> PhysicalPlan(SparkPlan) —(prepareForExecution) —> ExecutionPlan(PhysicalPlan)

学新通

QueryExecution可以获取到的信息如下:

学新通

logicalPlan: 逻辑计划并不知道如何执行,比如不知道表的类型(hive还是hbase),如何获取数据(jdbc还是读取hdfs),数据分布是什么样的(bucketed还是hashDistrobuted),这时就需要将逻辑计划树转换成物理计划树,获取真实的物理属性

sparkPlan:就是刚刚提到的物理计划树,这里我们监听获取sparkplan的Json信息

这段JSON获取目标表的所有信息

学新通

 table和database

学新通

 第一个Project包含了目标表的信息和血缘信息(下面说),其中projectList就是表中的字段信息,第一个Project就是insert into xxx 的 xxx(目标表target)

学新通

spark会给每个字段打上一个唯一自增的id(logicalPlan打上的),我们将信息存放在map里(id -> database.table.column)

这里很重要,因为就是使用id将字段和字段之间关联的

学新通

LogicalRelation包含了源头表的所有信息,output中包含了字段信息,catalogTable中包含了表名和库名(多个源头表就会有多个LogicalRelation)

学新通

字段信息和id 学新通

表名和库名

学新通

上面说的第一个Project非常重要,因为第一个Project不仅仅包含了目标表的字段信息,还包含了这些字段来自哪个字段,去刚刚存放的map里匹配,将结果放入新map(sourcecolumn -> targetcolumn)

学新通

代码:

  1.  
    class SparkListenerTest extends QueryExecutionListener{
  2.  
     
  3.  
    override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {
  4.  
     
  5.  
    }
  6.  
     
  7.  
    override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
  8.  
    // println(qe.analyzed.prettyJson)
  9.  
    // println("-------------------------------")
  10.  
    // println(qe.logical.prettyJson)
  11.  
    // println("-------------------------------")
  12.  
    // println(qe.optimizedPlan.prettyJson)
  13.  
    // println("-------------------------------")
  14.  
    // println(qe.sparkPlan.prettyJson)
  15.  
    // println("-------------------------------")
  16.  
    // println(qe.executedPlan.prettyJson)
  17.  
     
  18.  
    val sparkPlanJson: String = qe.sparkPlan.prettyJson
  19.  
    //获取数据写入的相关的json信息,其中可以将血缘关系解析出来
  20.  
    val dataWritingCommandExec = JSON.parseArray(sparkPlanJson).get(0)
  21.  
     
  22.  
    //id与字段的对应关系
  23.  
    val allTableInfo = new util.HashMap[String, String]()
  24.  
    //目标表
  25.  
    var targetDatabaseTable = ""
  26.  
    //源头表
  27.  
    val sourceDatabaseTables = new util.ArrayList[String]()
  28.  
    //目标表与源头表中字段与字段的对应关系
  29.  
    val allColumnRelation = new util.HashMap[String, String]()
  30.  
    //判断一下是否为insert语句,不是的话不需要解析
  31.  
    if(JSON.parseObject(dataWritingCommandExec.toString).getString("class").contains("DataWritingCommandExec")) {
  32.  
    val cmd = JSON.parseObject(dataWritingCommandExec.toString).getJSONArray("cmd").toArray()
  33.  
    for (c <- cmd) {
  34.  
    //这里只写了目标表为hive表的情况 获取目标表
  35.  
    if (JSON.parseObject(c.toString).getString("class").contains("InsertIntoHiveTable")) {
  36.  
    //table的所有信息
  37.  
    val tableInfo = JSON.parseObject(c.toString).getString("table")
  38.  
    val identifier = JSON.parseObject(tableInfo).getString("identifier")
  39.  
    //table name
  40.  
    val table = JSON.parseObject(identifier).getString("table")
  41.  
    //database
  42.  
    val database = JSON.parseObject(identifier).getString("database")
  43.  
    targetDatabaseTable = database "." table //目标表
  44.  
    // println(targetDatabaseTable)
  45.  
    }
  46.  
    //获取源头表
  47.  
    if (JSON.parseObject(c.toString).getString("class").contains("LogicalRelation")) {
  48.  
    //catalog
  49.  
    val catalogTable = JSON.parseObject(c.toString).getString("catalogTable")
  50.  
    val identifier = JSON.parseObject(catalogTable).getString("identifier")
  51.  
    //table name
  52.  
    val table = JSON.parseObject(identifier).getString("table")
  53.  
    //database
  54.  
    val database = JSON.parseObject(identifier).getString("database")
  55.  
    //将源头表加入list中
  56.  
    sourceDatabaseTables.add(database "." table)
  57.  
    val output = JSON.parseObject(c.toString).getJSONArray("output").toArray()
  58.  
    //获取字段信息
  59.  
    for (o <- output) {
  60.  
    val detail = JSON.parseObject(JSON.parseArray(o.toString).get(0).toString)
  61.  
    //clomun name
  62.  
    val column = (database "." table) "." detail.getString("name")
  63.  
    //唯一id信息
  64.  
    val columnId = JSON.parseObject(detail.getString("exprId")).getString("id")
  65.  
    allTableInfo.put(columnId, column)
  66.  
    }
  67.  
    }
  68.  
    }
  69.  
    // println("allTableInfo:" allTableInfo)
  70.  
     
  71.  
    //相当于java的 break 因为我们只需要第一个Project的信息,只有第一个Project的中才包含血缘信息;
  72.  
    var loop = new Breaks;
  73.  
    loop.breakable {
  74.  
    for (c <- cmd) {
  75.  
    if (JSON.parseObject(c.toString).getString("class").contains("logical.Project")) {
  76.  
    val projectList = JSON.parseObject(c.toString).getJSONArray("projectList").toArray()
  77.  
    for (p <- projectList) {
  78.  
    val project = JSON.parseArray(p.toString)
  79.  
    val length = project.size()
  80.  
    //目标表和源头表字段名字不一样(roleId -> role_id)
  81.  
    if (length > 1) {
  82.  
    //获取目标表的字段名
  83.  
    val targetColumn = targetDatabaseTable "." JSON.parseObject(project.get(0).toString).get("name")
  84.  
    // println(targetColumn)
  85.  
    for (p <- project.toArray()) {
  86.  
    if (JSON.parseObject(p.toString).getString("class").contains("AttributeReference")) {
  87.  
    //获取与目标表字段对应的源头表字段的字段id,通过字段id获取源头表字段
  88.  
    val sourceColumn = allTableInfo.get(JSON.parseObject(JSON.parseObject(p.toString).getString("exprId")).getString("id"))
  89.  
    if (!allColumnRelation.containsKey(sourceColumn)) {
  90.  
    //放入字段关系map中
  91.  
    allColumnRelation.put(sourceColumn, targetColumn)
  92.  
    }
  93.  
    }
  94.  
    }
  95.  
    //目标表和源头表字段名字一样(role_id -> role_id)
  96.  
    } else {
  97.  
    val targetColumn = targetDatabaseTable "." JSON.parseObject(project.get(0).toString).get("name")
  98.  
    val sourceColumn = allTableInfo.get(JSON.parseObject(JSON.parseObject(project.get(0).toString).getString("exprId")).getString("id"))
  99.  
    allColumnRelation.put(sourceColumn, targetColumn)
  100.  
    }
  101.  
    }
  102.  
    loop.break()
  103.  
    }
  104.  
    }
  105.  
    }
  106.  
    // println("allColumnRelation:" allColumnRelation)
  107.  
     
  108.  
    // println("targetDatabaseTable:" targetDatabaseTable)
  109.  
     
  110.  
    // println("sourceDatabaseTables" sourceDatabaseTables)
  111.  
     
  112.  
    }
  113.  
     
  114.  
    //下面就是 neo4j的语句拼接了 将血缘信息导入到图数据库
  115.  
    if(!targetDatabaseTable.equals("")) {
  116.  
    val session: Session = SparkListenerTest.driver.session
  117.  
     
  118.  
    //创建目标表
  119.  
    if (session.run(s"match (t:Table {name:'${targetDatabaseTable}'}) return t").list().size() == 0) {
  120.  
    session.run(s"CREATE (n:Table {name:'${targetDatabaseTable}'}) RETURN n")
  121.  
    println(s"CREATE (n:Table {name:'${targetDatabaseTable}'}) RETURN n")
  122.  
    // println(s"CREATE (n:Table {name:'${TargetTable}'}) RETURN n")
  123.  
    }
  124.  
     
  125.  
    //创建源头表并创建目标表表与源头表的关系
  126.  
    for(sourceDatabaseTable <- sourceDatabaseTables) {
  127.  
    if (session.run(s"match (t:Table {name:'${sourceDatabaseTable}'}) return t").list().size() == 0) {
  128.  
    session.run(s"CREATE (n:Table {name:'${sourceDatabaseTable}'}) RETURN n")
  129.  
    println(s"CREATE (n:Table {name:'${sourceDatabaseTable}'}) RETURN n")
  130.  
    }
  131.  
    session.run(s"MATCH (a:Table {name:'${sourceDatabaseTable}'}),(b:Table {name:'${targetDatabaseTable}'}) MERGE (a)-[:Derived]->(b)")
  132.  
    println(s"MATCH (a:Table {name:'${sourceDatabaseTable}'}),(b:Table {name:'${targetDatabaseTable}'}) MERGE (a)-[:Derived]->(b)")
  133.  
    }
  134.  
     
  135.  
    //判断是否有目标表的列节点,如果没有则创建
  136.  
    for(targetColumn <- allColumnRelation.values()) {
  137.  
    if (session.run(s"match (c:Column {name:'${targetColumn}'}) return c").list().size() == 0) {
  138.  
    session.run(s"CREATE (n:Column {name:'${targetColumn}'}) RETURN n")
  139.  
    // println(s"CREATE (n:Column {name:'${TargetColumn}'}) RETURN n")
  140.  
    }
  141.  
    val targeTable = targetColumn.split("\\.")(0) "." targetColumn.split("\\.")(1)
  142.  
    session.run(s"MATCH (a:Column {name:'${targetColumn}'}),(b:Table {name:'${targeTable}'}) MERGE (a)-[:Belongs]->(b)")
  143.  
    println(s"MATCH (a:Column {name:'${targetColumn}'}),(b:Table {name:'${targeTable}'}) MERGE (a)-[:Belongs]->(b)")
  144.  
    }
  145.  
     
  146.  
    //判断是否有源头表的列节点,如果没有则创建并创建列关系
  147.  
    for(sourceColumn <- allColumnRelation.keySet()) {
  148.  
    if (session.run(s"match (c:Column {name:'${sourceColumn}'}) return c").list().size() == 0) {
  149.  
    session.run(s"CREATE (n:Column {name:'${sourceColumn}'}) RETURN n")
  150.  
    // println(s"CREATE (n:Column {name:'${TargetColumn}'}) RETURN n")
  151.  
    }
  152.  
    val sourceTable = sourceColumn.split("\\.")(0) "." sourceColumn.split("\\.")(1)
  153.  
    session.run(s"MATCH (a:Column {name:'${sourceColumn}'}),(b:Table {name:'${sourceTable}'}) MERGE (a)-[:Belongs]->(b)")
  154.  
    println(s"MATCH (a:Column {name:'${sourceColumn}'}),(b:Table {name:'${sourceTable}'}) MERGE (a)-[:Belongs]->(b)")
  155.  
    session.run(s"MATCH (a:Column {name:'${sourceColumn}'}),(b:Column {name:'${allColumnRelation.get(sourceColumn)}'}) MERGE (a)-[:Derived_column]->(b)")
  156.  
    println(s"MATCH (a:Column {name:'${sourceColumn}'}),(b:Column {name:'${allColumnRelation.get(sourceColumn)}'}) MERGE (a)-[:Derived_column]->(b)")
  157.  
    }
  158.  
    }
  159.  
    }
  160.  
    }
  161.  
     
  162.  
    //创建伴生类,可以理解为java的静态变量,同一个session只需要建立一次driver,不需要每个sql都建立一次
  163.  
    object SparkListenerTest{
  164.  
    val driver = GraphDatabase.driver(url, AuthTokens.basic(database, password))
  165.  
    }
学新通

提交的时候要修改配置文件

spark-submit

--master xxx --deploy xxx

--executor-cores xxx --executor-memory xxx --num-executor xxx

--conf spark.sql.queryExecutionListeners="xxx.SparkListenerTest" 

或者直接修改spark的conf文件 spark.sql.queryExecutionListeners= "xxx.SparkListenerTest"

导入neo4j后的效果 neo4j browser

学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhfcfhcf
系列文章
更多 icon
同类精品
更多 icon
继续加载