• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

微服务链路追踪SkyWalking第十课 GraphQL

武飞扬头像
办公模板库 素材蛙
帮助3

第27讲:实战入门 GraphQL,如何将 REST API 换成 GraphQL

从本节开始将深入介绍 query-graphql-plugin 插件,我们会启动 SkyWalking Rocketbot 来查询 Trace 数据和 JVM 监控数据,这些用户查询请求最终都会路由到 query-graphql-plugin 插件中进行处理。

GraphQL 简介

GraphQL 是一个用于 API 的查询语言,是一个使用基于类型系统来执行查询的服务端运行时。GraphQL 并没有和任何特定数据库或者存储引擎绑定。GraphQL 对服务端 API 中的数据提供了一套易于理解的完整描述,使得客户端能够准确地获得它需要的数据,而且没有任何冗余,也让 API 更容易地随着时间推移而演进,还能用于构建强大的开发者工具。使用 GraphQL 开发 API 有如下好处。

  • 可描述:使用 GraphQL,你获取的都是你想要的数据,不多也不会少;

  • 分级:GraphQL 天然遵循了对象间的关系,通过一个简单的请求,我们可以获取到一个对象及其相关的对象。

  • 强类型:使用 GraphQL 的类型系统,能够清晰、准确的描述数据,这样就能确保从服务器获取的数据和我们查询的一致。

  • 跨语言:GraphQL 并不绑定于某一特定的语言。

  • 兼容性:GraphQL 不限于某一特定存储平台,GraphQL 可以方便地接入已有的存储、代码、甚至可以连接第三方的 API。

GraphQL  类型系统

在一篇文章中完整介绍 GraphQL 本身,以及如何在 Java 服务端使用 GraphQL 几乎是不可能的,其中会涉及很多琐碎的细节需要说明,并且还需要列举一些示例。本课时并不是一篇完整的 GraphQL Java 教程,这里重点介绍 query-graphql-plugin 插件涉及的 GraphQL 知识点。

GraphQL 的类型系统与 Java 的类型系统非常相似,主要有下面 6 种类型:

  1. Scalar,类似于 Java 中的基本变量。

学新通

  1. Object,类似于 Java 中的对象,这里使用 GraphQL 定义一个 Book 类型,它就是 Object 类型,在 GraphQL Java 中对应的是 GraphQLObjectType。

type Book {
    id: ID # 编号
    name: String #书名
    pageCount: Int #页数
    author: Author # 作者
}
  1. Interface,类似于 Java 中的接口,在下面中的示例如下:

interface ComicCharacter {
    name: String;
}

在 GraphQL Java 中对应的类型是 GraphQLInterfaceType。

  1. Union,在 Java 中没有 Union 类型,但是在 C 中有 Union 类型,在 GraphQL Schema 中的示例如下:

type Cat {
    name: String;
    lives: Int;
}
type Dog {
    name: String;
    bonesOwned: int;
}
union Pet = Cat | Dog

在 GraphQL Java 中对应的类型是 GraphQLUnionType。在使用 Interface 或 Union 时,如果需要获取对象真实类型,可以通过 TypeResolver 进行判断。

  1. InputObject,主要用于封装方法参数,GraphQL Schema 中的定义与 Object 类似,主要区别是将 type 关键字换成 input 关键字。GraphQL Java 中对应的类型是 GraphQLInputObjectType。

  2. Enum,类似于 Java 中的枚举,不再赘述。

GraphQL Java 基础入门

SkyWalking OAP 中的 query-graphql-plugin 插件也使用了 GraphQL 开发其 API,这里就简单介绍如何在 Java 中使用 GraphQL。

定义 GraphQL Schema

首先,我们需要定义一套 GraphQL Schema,似于要使用数据库存储数据之前,需要先建表,之后上层应用才能读写数据库。GraphQL Java 服务端要是响应用户的请求,也需要定义一个描述数据的结构,也就是这里的 GraphQL Schema。一般我们会将 GraphQL Schema 单独放到 resources 目录下,这里以图书信息管理的为例,resources/book.graphql 文件如下所示,其中定义了 Book、Author、Query 三个类型,其中 Book 和 Author 类似于普通的 JavaBean,Query 则类似于 Java 中的接口定义:

type Book {
    id: ID # 编号
    name: String #书名
    pageCount: Int #页数
    author: Author # 作者
}
type Author {
  id: ID # 作者编号
  firstName: String # 作者姓名
  lastName: String
}
type QueryBook {
    # getById()类似于Java方法,根据Id查询书籍信息
    # id是方法参数,ID是类型,"!"表示非空
    # Book是返回值类型,这里返回的是一个Book对象
    getById(id: ID!): Book
    # 查询Book列表
    list: [Book]
}
学新通

加载 GraphQL Schema 文件

在 GraphQL Java 中加载并解析 GraphQL Schema 文件的方式如下:

@Component
public class GraphQLProvider {
    private GraphQL graphQL;
    @Bean
    public GraphQL graphQL() {
        return graphQL;
    }
    @PostConstruct
    public void init() throws IOException {
        // 读取 GraphQL Schema文件并创建 GraphQL实例,
        // 该GraphQL实例会通过上面的 graphQL()方法暴露给Spring,
        // 默认情况下,请求到"/graphql"这个path上的请求都会由该GraphQL实例处理
        URL url = Resources.getResource("book.graphqls");
        String sdl = Resources.toString(url, Charsets.UTF_8);
        GraphQLSchema graphQLSchema = buildSchema(sdl);
        this.graphQL = GraphQL.newGraphQL(graphQLSchema).build();
    }
    private GraphQLSchema buildSchema(String sdl) {
        // GraphQL Schema文件被解析之后,就是这里的 TypeDefinitionRegistry对象
        TypeDefinitionRegistry typeRegistry = new SchemaParser().parse(sdl);
        // 注册DataFetcher,DataFetcher的介绍以及buildWiring()方法实现在后面马上会进行介绍
        RuntimeWiring runtimeWiring = buildWiring();
        SchemaGenerator schemaGenerator = new SchemaGenerator();
        // 将GraphQL Schema中定义的与 DataFetcher关联起来
        return schemaGenerator.makeExecutableSchema(typeRegistry, runtimeWiring);
    }
    // 这哪是省略buildWiring()方法
}
学新通

这里我们接触到了几个新类型。

  • GraphQL:它默认将处理"/graphql"这个 path 上的全部请求;

  • TypeDefinitionRegistry:它是 GraphQL Schema 文件的解析结果;

  • SchemaGenerator: 它关联了 TypeDefinitionRegistry 对象和后面要介绍的 DataFetcher 对象,并生成一个 GraphQLSchema 对象;

  • RuntimeWiring:后面介绍的 DataFetcher 对象将注册到 RuntimeWiring 中,具体的注册方式在 buildWiring() 方法中(后面分析)。

关联 DataFetcher

DataFetcher 是在 GraphQL Java 服务端中比较重要的概念之一。DataFetcher 的核心功能是:获取查询字段的相应数据。DataFetcher 接口的实现如下:

public interface DataFetcher<T> {
    // DataFetchingEnvironment中记录了很多信息,例如:
    // 该 DataFetcher对应的字段以及类型、查询的外层对象以及根对象、当前上下文信息等等一系列信息
    T get(DataFetchingEnvironment dataFetchingEnvironment) throws Exception;
}

在 GraphQLProvider.buildWiring() 方法中,我们为 Query.getById 方法与 Book.author 字段绑定了自定义的 DataFetcher 实现,具体实现如下:

@Autowired
GraphQLDataFetchers graphQLDataFetchers;
private RuntimeWiring buildWiring() {
    return RuntimeWiring.newRuntimeWiring()
            // 将Query.getById与getBookByIdDataFetcher()方法返回的DataFetcher实现关联
            .type(newTypeWiring("Query").dataFetcher("getById", 
                            graphQLDataFetchers.getBookByIdDataFetcher())
                            .dataFetcher("list", graphQLDataFetchers.listDataFetcher()))
            // 将Book.author字段与getBookByIdDataFetcher()方法返回的DataFetcher实现关联
            .type(newTypeWiring("Book").dataFetcher("author", 
                            graphQLDataFetchers.getAuthorDataFetcher()))
            .build();
}

GraphQLDataFetchers 的定义如下,GraphQLDataFetchers 中定义了 books 和 authors 两个集合来模拟数据源,getBookByIdDataFetcher() 方法和 getAuthorDataFetcher() 方法返回的自定义 DataFetcher 实现会分别查询这两个集合返回相应数据:

@Component
public class GraphQLDataFetchers {
    private static List<ImmutableMap<String, String>> books = Arrays.asList(
            ImmutableMap.of("id", "book-1","name", "Harry Potter and the Philosopher's Stone","pageCount", "223","authorId", "author-1"),
    );
    private static List<ImmutableMap<String, String>> authors = Arrays.asList(
            ImmutableMap.of("id", "author-1","firstName", "Joanne","lastName", "Rowling"),
    );
    public DataFetcher getBookByIdDataFetcher() {
        return dataFetchingEnvironment -> {
            // 获取 id参数,然后根据id查找 books集合并返回相应的 Book信息
            String bookId = dataFetchingEnvironment.getArgument("id");
            return books.stream().filter(book -> book.get("id").equals(bookId))
                    .findFirst().orElse(null);
        };
    }
    public DataFetcher getAuthorDataFetcher() {
        return dataFetchingEnvironment -> {
            // DataFetcher 会按照 GraphQL Schema定义从外层向内层调用
            // 这里可以直接通过 DataFetchingEnvironment获取外层 DataFetcher查找到的数据(即关联的Book)
            Map<String, String> book = dataFetchingEnvironment.getSource();
            String authorId = book.get("authorId");  // 根据 authorId查找作者信息
            return authors.stream().filter(author -> author.get("id").equals(authorId))
                    .findFirst().orElse(null);
        };
    }
    public DataFetcher listDataFetcher() {
       return dataFetchingEnvironment -> books;
    }
}
学新通

GraphQL Schema 中的每个字段都会关联一个 DataFetcher,如果未通过 RuntimeWiring 方式明确关联的自定义 DataFetcher ,会默认关联 PropertyDataFetcher。PropertyDataFetcher 会有多种方式从外层结果中为关联字段查找正确的值:

  1. 如果外层查找结果为 null,则直接返回 null,否则执行步骤 2;

  2. 通过 function 从外层查询结果中提起对应字段值,该 function 是在 PropertyDataFetcher 初始化时指定,若未指定 function 则执行步骤 3;

  3. 如果外层查询结果为 Map,则从该 Map 中直接获取字段值;

  4. 如果外层查询结果是 Java 对象,则调用相应的 getter 方法获取字段值。

最后我们回顾整个示例项目,其核心逻辑如下图:

学新通

启动

启动该 Spring 项目之后,可以使用 GraphQL Playground 这个工具访问"/graphql",并传入查询 Book 的请求,如下图所示:

学新通

如果想查看 Http 请求,可以点击 COPY CURL 按钮获取相应的 curl 命令,如下所示:

curl 'http://localhost:8080/graphql' -H 'Content-Type: application/json' --data-binary '{"query":"\n{\n      getById(id:\"book-1\") {\n        id\n        name\n        pageCount\n        author{\n        \tfirstName\n        \tlastName\n        }\n    }\n}"}' --compressed

list 请求如下图所示:

学新通

到此为止,GraphQL Java Demo 项目的涉及的基础知识就全部介绍完了。

GraphQL Java Tools 入门

在使用 GraphQL Java 开发服务端 API 的时候,需要手写前文介绍的 DataFetcher 实现、GraphQLSchema 解析逻辑以及 GraphQL 对象的实例化过程。

GraphQL Java Tools 可以帮助我们屏蔽底层的 GraphQL Java 中的复杂概念和重复代码,GraphQL Java Tools 能够从 GraphQL Schema 定义(即 .graphqls 文件)中构建出相应的 Java 的 POJO 类型对象,GraphQL Java Tools 将读取 classpath 下所有以 .graphqls 为后缀名的文件,然后创建 GraphQL Schema 对象。GraphQL Java Tools 也是依赖于 GraphQL Java 实现的。

这里依然以上面图书管理的 demo 为例来介绍 GraphQL Java Tools 的使用,前文示例的GraphQL Schema 定义的 Book 、Author 以及 Query 三种类型我们保持不变。Query 接口是 GraphQL 查询的入口,我们可以通过 extend 的方式扩展 Query,如下所示:

extend type Query{ # 扩展 Query
    getAuthorById(id: ID!): Author # 根据 id查询作者信息
}

GraphQL Schema 中还可以定义 Mutation 类型作为修改数据的入口,如下所示,启动定义了 createBook 和 createAuthor 两个方法,分别用来新增图书信息和作者信息:

type Mutation {
    createBook(input : BookInput!) : Book!
    createAuthor(firstName:String!, lastName:String!) : ID!
}
input BookInput {  # input 表示入参
    name : String!
    pageCount : String!
    authorId: String!
}

这里引入了一个 BootInput 类型,将需要传递到服务端的数据封装起来,GraphQL 中返回类型和输入类型(input)是不能共用的,所以加上 input 后缀加以区分。

GraphQL Java Tools 可以将 GraphQL 对象的方法和字段映射到 Java 对象。一般情况下,GraphQL Java Tools 可以通过 POJO 的字段或是相应的 getter 方法完成字段读取,对于复杂字段,则需要定义相应的 Resolver 实现。例如,下面为 GraphQL Schema 定义对应了 Book 和 Author 两个 POJO(以及 BookInput):

public class Book {
    private String id;
    private String name;
    private int pageCount;
    private String authorId;
    // 省略 getter/setter 方法
}
public class Author {
    private String id;
    private String firstName;
    private String lastName;
    // 省略 getter/setter 方法
}
public class BookInput {
    private String name;
    private int pageCount;
    private String authorId;
    // 省略 getter/setter 方法
}
学新通

很明显,这里定义的 Book.java 中大部分字段都与 GraphQL Schema 中的 Book 一个一个地对应,但是 authorId 字段与 GraphQL Schema 中 Book 的 author 字段是无法直接完成映射的,这里就需要一个 GraphQLResolver 实现来完成该转换,例如下面的 BookResolver 实现:

@Component
class BookResolver implements GraphQLResolver<Book> {
@Autowired
private AuthorService authorService;
    public Author author(Book book) {
        return authorService.getAuthorById(book.getAuthorId());
    }
}

在 GraphQL Java Tools 需要将 Java 对象映射成 GraphQL 对象的时候,首先会尝试使用相应 GraphQLResolver(示例中的 BookResolver) 的相应方法完成映射(示例中的 author() 方法),如果在 GraphQLResolver 没有方法才会使用相应的 getter 方法或是直接访问字段。

BookResolver.author() 方法的实现也可以看出,被映射的 Java 对象需要作为参数传入。


在 GraphQL Schema 中定义的 Query 和 Mutation 是 GraphQL 查询和修改数据的入口,它们对应的 Resolver 实现需要实现 GraphQLQueryResolver 或 GraphQLMutationResolver。例如下面定义的 BookService 以及 AuthorService:

public interface BookService extends GraphQLQueryResolver, GraphQLMutationResolver {
    Book getBookById(String id);
    List<Book> list();
    Book createBook(BookInput input);
} 
public interface AuthorService extends GraphQLQueryResolver, GraphQLMutationResolver {
    String createAuthor(String firstName, String lastName);
    Author getAuthorById(String id);
}

GraphQL Java Tools 会根据方法名将上述 GraphQLQueryResolver 或 GraphQLMutationResolver 与 GraphQL Schema 中的 Query 和 Mutation 进行映射。

BookService 和 AuthorService 的具体实现如下:

@Service
public class BookServiceImpl implements BookService {
    // 使用递增方式生成 id后缀
    private AtomicLong idGenerator = new AtomicLong(0L);
    // 这里并没有使用持久化存储,而是使用该 List将图书信息保存在内存中
    private static List<Book> books = Lists.newCopyOnWriteArrayList();
    @Override
    public Book getBookById(String id) {
    return books.stream().filter(b -> b.getId().equals(id))
    .findFirst().orElse(null);
}
    @Override
    public List<Book> list() {
    return books;
    }
    @Override
    public Book createBook(BookInput input) {
        String id = "book-"   idGenerator.getAndIncrement();
        Book book = new Book();
        book.setId(id);
        book.setName(input.getName());
        book.setPageCount(input.getPageCount());
        book.setAuthorId(input.getAuthorId());
        books.add(book);
        return book;
    } 
}
@Component
public class AuthorServiceImpl implements AuthorService {
    private AtomicLong idGenerator = new AtomicLong(0L);
    private static List<Author> authors = Lists.newCopyOnWriteArrayList();
@Override
public String createAuthor(String firstName, String lastName) {
    String id = "author-"   idGenerator.getAndIncrement();
    Author author = new Author();
    author.setId(id);
    author.setFirstName(firstName);
    author.setLastName(lastName);
    authors.add(author);
    return id;
}
@Override
public Author getAuthorById(String id) {
    return authors.stream().filter(a -> a.getId().equals(id))
    .findFirst().orElse(null);
    }
}
学新通

最后,我们启动该 Demo 项目,使用 GraphQL Playground 分别请求 Query 和 Mutation 中定义的接口,如下图所示:

学新通

学新通

到此为止,GraphQL 入门示例分析就结束了。


第28讲:深入 query-graphql 插件,SW Rocketbot 背后的英雄(上)

SkyWalking OAP 目前只提供了query-graphql-plugin 这一款查询插件,从名字就可以看出它是使用 GraphQL 实现的查询 API。本课时将深入分析 query-graphql-plugin 模块的核心原理。

启动逻辑

首先我们需要了解的是 query-graphql-plugin 插件是如何将 GraphQL 与 OAP 自身的 JettyServer  Handler 体系进行集成的。这部分集成逻辑是在 GraphQLQueryProvider 中实现的,它是 query-grapql-plugin 插件 SPI 文件中指定的唯一个一个地个 ModuleProvider 实现,其中主要完成下面三件事:

  1. 通过 GraphQL Java Tools 实现 GraphQL Schema 与 POJO 之间的映射,创建相应的 GraphQLSchema 对象。如何使用 GraphQL Java Tools 以及 Resolver 与 POJO 映射的映射规则在前面的 GraphQL Java Tools 入门中已经详细介绍过了。

  2. 通过 GraphQL Java API 创建 GraphQL 对象,它将处理"/graphql"路径上的全部请求。

  3. 创建 GraphQLQueryHandler 实例并注册到 JettyServer。GraphQLQueryHandler 会将收到的 Http 请求进行一次转换,并交给 GraphQL 对象进行处理。

GraphQLQueryProvider 的核心实现如下所示:

public class GraphQLQueryProvider extends ModuleProvider {
    private final GraphQLQueryConfig config = new GraphQLQueryConfig();
    private GraphQL graphQL;
    
    @Override public void prepare() throws ServiceNotProvidedException, ModuleStartException {
        GraphQLSchema schema = SchemaParser.newParser()
            .file("query-protocol/common.graphqls")
            .resolvers(new Query(), new Mutation())
            ... ... // 这里会添加所有 GraphQL Schema以及关联的 Resolver实现,后面会挑选几个展开详述
            .build() .makeExecutableSchema();
        // 创建 GraphQL 对象, GraphQL Java提供的API
        this.graphQL = GraphQL.newGraphQL(schema).build();
    }
    @Override public void start() throws ServiceNotProvidedException, ModuleStartException {
        // 创建 GraphQLQueryHandler实例并注册到 JettyServer中
        JettyHandlerRegister service = getManager().find(CoreModule.NAME).provider().getService(JettyHandlerRegister.class);
        // 这里的 path在 application.yml中的 query部分有相应配置项,默认是"/graphql"
        service.addHandler(new GraphQLQueryHandler(config.getPath(), graphQL));
    }
}
学新通

GraphQLQueryHandler

在前面介绍中提到,server-core 模块会启动两个 Server,一个是 GRPCServer,主要用于接收 Agent 发送来的 gRPC 请求,前文介绍的 RegisterServiceHandler、JVMMetricReportServiceHandler、TraceSegmentReportServiceHandler 等都是注册在 GRPCServer 上的 Handler;另一个是 JettyServer,用于接收 Http 请求,本小节介绍的 GraphQLQueryHandler 就是注册在 JettyServer 的 Handler,它继承 JettyJsonHandler 如下图所示:

学新通

JettyJsonHandler 使用模板方法模式将真正的请求处理逻辑延迟到子类实现,而在其 doGet() 方法和 doPost() 方法中只完成了下面几项通用的逻辑:

  1. 设置 HttpResponse 响应头;

  2. 将请求处理结果(JSON 数据)写入返回给客户端;

  3. 如果请求处理过程中出现异常,则在响应的 JSON 中携带 error-message 字段记录简单的异常信息。

GraphQLQueryHandler 只支持 POST 请求,不支持 GET 请求,其 doPost() 方法中首先会读取 JSON 格式的请求体,并用其中数据创建 ExecutionInput 对象,execute() 方法是 GraphQL 对象处理请求的入口,ExecutionInput 是其唯一的参数,execute() 方法返回 ExecutionResult 对象,其中封装了查询得到的 GraphQL Schema 对象(正常情况)以及错误信息(异常情况),具体实现如下:

protected JsonElement doPost(HttpServletRequest req) throws IOException {
    BufferedReader reader = new BufferedReader(new InputStreamReader(req.getInputStream()));
    StringBuilder request = new StringBuilder();
    // 省略读取reader的过程
    JsonObject requestJson = gson.fromJson(request.toString(), JsonObject.class);
    ExecutionInput executionInput = ExecutionInput.newExecutionInput()
        .query(requestJson.get(QUERY).getAsString())
        .variables(gson.fromJson(requestJson.get(VARIABLES), mapOfStringObjectType))
        .build();
    // 在前文的示例中,Spring Boot 帮我们屏蔽了 execute()方法的调用,这里需要自己通过GraphQL Java API进行调用
    ExecutionResult executionResult = graphQL.execute(executionInput);
    Object data = executionResult.getData(); // 正常查询结果
    List<GraphQLError> errors = executionResult.getErrors(); // 异常信息
    JsonObject jsonObject = new JsonObject();
    // 将正常查询结果记录到"data"字段,将异常信息记录到"error"(略)
    return jsonObject;
}
学新通

到此处为止,query-graphql-plugin 插件处理查询请求的核心流程就介绍完了,通过下面一张图,可以很好地总结该流程:

学新通

GraphQL Schema 鸟瞰

在 resouces/query-protocol 目录中包含了 query-graphql-plugin 插件的全部 GraphQL Schema 文件,其结构如下图所示,该结构图是通过 GraphQL Voyager 工具生成的,如果你感兴趣可以查找相关资料进行了解。

学新通

在学习了前面介绍的 GraphQL Schema 基本语法和示例之后,相信你已经完全能够读懂上图涉及的全部 GraphQL Schema 定义,这里就不再一个一个地展开分析,我们将重点放在关联的 Resolver 以及具体的查询实现上。

MetadataQuery

query-graphql-plugin 插件中提供了三个查询 Service 的方法,如下图所示:

学新通

GraphQL Java Tools 会将上述三个查询 Service 的方法映射到 MetadataQuery 中的同名方法,如下图所示,MetadataQuery 会将请求委托给 MetadataQueryService 的同名方法处理,而 MetadataQueryService 中也没有其他逻辑,直接将请求委托给 MetadataQueryEsDAO 的同名方法:

学新通

在 MetadataQuery 的这三个方法中都有一个 Duration 入参,在 metadata.graphqls 文件中定义了 Duration 这个 input 类型,该参数指定了查询的起止时间以及时间单位。

MetadataQueryEsDAO 底层通过 High Level REST Client 对 ElasticSearch 的查询。先来看 searchServices() 方法的具体实现,其中会根据时间范围以及 serviceName 进行匹配:

public List<Service> searchServices(long startTimestamp, long endTimestamp,
    String keyword) throws IOException {
    SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
    BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
    // 查询的时间范围
    boolQueryBuilder.must().add(timeRangeQueryBuild(startTimestamp, endTimestamp));
    // 不查询 NetWorkAddress在 service_inventory索引中的数据 
    boolQueryBuilder.must().add(QueryBuilders.termQuery(ServiceInventory.IS_ADDRESS, BooleanUtils.FALSE));
    if (!Strings.isNullOrEmpty(keyword)) { 
        // serviceName匹配用户指定的关键字(keyword)
        String matchCName = MatchCNameBuilder.INSTANCE.build(ServiceInventory.NAME);
        boolQueryBuilder.must().add(QueryBuilders.matchQuery(matchCName, keyword));
    }
    sourceBuilder.query(boolQueryBuilder);
    sourceBuilder.size(queryMaxSize); // 查询返回Document的个数上限,默认上限5000个
    // 通过 RestHighLevelClient 执行 SearchRequest查询
    SearchResponse response = getClient().search(ServiceInventory.INDEX_NAME, sourceBuilder);
    // 从 SearchResponse响应中获取相应的 Service信息并返回
    return buildServices(response);
}
学新通

下图展示了 timeRangeQueryBuild() 方法构造的查询时间范围:

学新通

另外两个查询 Service 元数据的方法:getAllServices() 方法只根据时间范围进行查询,searchService() 方法只根据 serviceName 的关键字进行匹配,实现方式类似,这里不再展开详细分析。

除了查询 Service,MetadataQuery 还提供了查询其他多种基础元数据的相应方法:

  • 查询 ServiceInstance

getServiceInstances() 方法可以按照时间范围和 serviceId 查询关联的 ServiceInstance 集合。

  • 查询 Endpoint

    • searchEndpoint() 方法会根据 serviceId 以及关键字查询相应的 Endpoint 集合。

    • getEndpointInfo() 方法会根据指定的 endpointId 查询 Endpoint 信息。

  • 查询 Databases

getAllDatabases() 方法其实也是查询 Service ,只不过指定了 node_type 字段的取值为 Database 而已。

  • 查询 ClusterBrief

getGlobalBrief() 方法会按照时间范围查询整个 OAP 集群所能感知到的各类组件的个数,然后封装成 ClusterBrief 对象返回。在 ClusterBrief 中包括 Service 数量、 Endpoint 数量、Database 数量、Cache 数量以及 MQ 数量。

查询上述元数据的请求最终会委托给 MetadataQueryEsDAO 中的同名方法,然后依赖 High Level Rest Client 请求 ElasticSearch 进行查询,具体代码实现并不复杂,如果你感兴趣可以参考源码进行学习。

MetricQuery

在前面介绍 jvm-receiver-plugin 以及 trace-receiver-plugin 的章节中,我们详细介绍了 SkyWalking 中多种监控指标的计算方式以及存储实现,在 query-graphql-plugin 插件中自然是关注这些指标是如何查询的,在 metric.graphqls 文件中定义了下图三个查询监控指标的相关方法。

学新通

  • getValues() 方法:返回一个聚合后的单值,例如,一个 Service 在一段时间内 SLA 的平均值。

  • getLinearIntValues() 方法:返回一条时序数据(即每个时间单位一个点,这些连续的点可以组成一张二维的监控图)。

  • getThermodynamic() 方法:返回的 heatmap(热力图)。

查询单个聚合值

首先来看 MetricQuery.getValues() 方法,请求该方法的位置是在 SkyWalking Rocketbot 的拓扑图中,如下图所示:

学新通

图中的“每分钟请求量”“SLA”以及“延迟”三个值都是分别请求 getValues() 方法获得的,这三个值都是计算查询时间段内响应指标的平均值。

getValues() 方法有两个入参,一个是 Duration 类型入参,用于指定查询时间范围,另一个是 是 BatchMetricConditions 类型入参,其中指定了查询的 index alias 以及 entity_id 字段集合。以上图中 SLA 这个指标为例,其 BatchMetricConditions.name 值为“service_sla”,entity_id 字段集合为 [2,3](图中 demo-webapp 和 demo-provider 对应的 ServiceId 分别为 2 和 3)。

MetricQuery 最终会将 getValues() 请求委托给 MetricsQueryEsDAO 的同名方法,下面以查询 demo-provider 和 demo-webapp 两个 Service 在 2020 年 01 月 05 日 19:10~19:40 的 SLA 为例,分析 MetricsQueryEsDAO.getValues() 方法的执行流程:

1、指定 time_bucket 字段的时间范围,即 time_bucket 字段值必须在 startTB 和 endTB 之间。相关代码片段如下:

RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(Metrics.TIME_BUCKET).gte(startTB).lte(endTB);
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.must().add(rangeQueryBuilder);  

这里的 startTB 和 endTB 已经经过格式化,与查询的 Index 中使用的时间格式对齐。示例中的 startTB 和 endTB 分别是 202001051910 和 202001051940。

2、精确匹配 Document 中的 entity_id 字段值,示例中 entity_id 字段分别为 2 和 3, 相关代码片段如下:

where.getKeyValues().forEach(keyValues -> {
    if (keyValues.getValues().size() > 1) {  
        boolQuery.must().add(QueryBuilders.termsQuery(keyValues.getKey(), keyValues.getValues()));
    } else {
        boolQuery.must().add(QueryBuilders.termQuery(keyValues.getKey(), keyValues.getValues().get(0)));
    }
});

3、按照 entity_id 分组聚合查询到的 SLA 值(即 Document 中的 percentage 字段),具体聚合方式是计算平均值,相关片段如下:

TermsAggregationBuilder entityIdAggregation = AggregationBuilders.terms(Metrics.ENTITY_ID).field(Metrics.ENTITY_ID).size(1000);
parentAggBuilder.subAggregation(AggregationBuilders.avg(valueCName).field(valueCName));

4、将上述构造的查询条件和聚合函数构造成 SearchRequest 请求发送给 ElasticSearch 集群完成查询,相关片段如下:

SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
sourceBuilder.query(boolQuery);
sourceBuilder.size(0);
sourceBuilder.aggregation(entityIdAggregation);
SearchResponse response = getClient().search(indexName, sourceBuilder);

注意,这里查询的 indexName 是 Index 别名,在前面介绍 Index Template 的时候已经简单介绍了 Index alias 的作用,这里不再重复。

5、解析 SearchResponse 得到查询结果,即示例中每个 Service 的 SLA 平均值,相关代码片段如下:

IntValues intValues = new IntValues();
Terms idTerms = response.getAggregations().get(Metrics.ENTITY_ID);
for (Terms.Bucket idBucket : idTerms.getBuckets()) {
    KVInt kvInt = new KVInt();
    // key为 entity_id,即示例中的serviceId
    kvInt.setId(idBucket.getKeyAsString()); 
    // value为该 entity_id对应的 SLA平均值
    kvInt.setValue(idBucket.getAggregations().get(valueCName).getValue());
    intValues.getValues().add(kvInt);  // 记录上述查询解析结果
}
return intValues;

注意,上述执行过程只展示了针对 Avg 计算的相关代码,其他监控指标可能会用到其他聚合函数(例如:Sum、Max 等),就可能会走到其他代码分支,但核心逻辑类似,这里就不再重复展示了。

查询时序

下图是 demo-provider (serviceId = 3)响应时间的监控图,如前文所述,图中的时序数据是通过 getLinearIntValues() 方法查询得到的:

学新通

下面将以该图为例,详细分析 getLinearIntValues() 方法的查询流程:

1、首先根据查询的起止时间以及 entity_id,确定要查询的 Document Id,具体实现如下:

// 按照 DownSampling单位以及查询时间范围,确定有多少个Document需要查询
List<DurationPoint> durationPoints = DurationUtils.INSTANCE.getDurationPoints(downsampling, startTB, endTB);
List<String> ids = new ArrayList<>();
// 构造每个 DurationPoint对应的 Document Id
durationPoints.forEach(durationPoint -> ids.add(durationPoint.getPoint()   Const.ID_SPLIT   id));

示例中的 DownSampling 值为 Minute,查询的时间范围为 20:44~ 20:59,生成的 DurationPoint 以及 Document Id 如下图所示:

学新通

2、创建 SearchRequst 请求进行查询。

SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.types(TYPE);
// 指定查询的 Document Id
searchRequest.source().query(QueryBuilders.idsQuery().addIds(ids)).size(ids.length);
SearchResponse response = client.search(searchRequest);
// 将返回的 SearchResponse转换成 Map后返回,第一层 Key是Document Id,第二层 Key是 Field名称,第二层 Value是字段对应的 Value值
Map<String, Map<String, Object>> result = new HashMap<>();
SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
    result.put(hit.getId(), hit.getSourceAsMap());
}
return result;

示例中会根据步骤 1 生成的 Document Id 精确查找 demo-webapp 的 service_resp_time 指标每分钟(20:44~ 20:59 范围)对应的 Document,如下图所示:

学新通

3、将步骤 2 的查询结果整理成 IntValues(底层是 KVInt 列表),相关代码实现比较简单,不再展示。示例中的整理结果如下图所示,其中每个 KVInt 的 Key 为 Document Id,Value 为相应的 summation 值:

学新通

前端拿到上述 KVInt 列表之后,即可绘制出示例中的 Service Response Time 监控图。

查询 heatmap

MetricQuery 中最后一个查询方法是 getThermodynamic() 方法,该方法用于查询热力图,具体查询方式与 getLinearIntValues() 方法类似,这里不再展开分析。


第29讲:深入 query-graphql 插件,SW Rocketbot 背后的英雄(下)

TopN 查询

在 aggregation.graphqls 和 top-n-records.graphqls 两个 GraphQL Schema 文件中定义了所有关于 TopN 数据的查询,如下图所示:

学新通

在分析 MultiScopesSpanListener 的课时中,我们了解到 OAP 可以从存储请求的相关 Trace 中解析得到慢查询信息并转换成 TopNDatabaseStatement 存储到 ES 中。 这里定义的  getTopNRecords() 方法就是用来查询此类 TopN 数据的,为了便于理解,这里以 DB 慢查询为例分析 getTopNRecords() 方法的实现。

在对应的 TopNRecordsQuery.getTopNRecords() 方法中,多个入参被封装成了一个 TopNRecordsCondition 对象,其中包含了如下信息:

private int serviceId; // 查询哪个 DB的慢查询
private String metricName; // 查询的 Index别名,即 top_n_database_statement
private int topN; // 返回 N个耗时最大的慢查询,默认20
private Order order; // 排序方式,查询 DB慢查询自然是 DES
private Duration duration; // 查询的时间范围

请求会经过 TopNRecordsQuery -> TopNRecordsQueryService -> TopNRecordsQueryEsDAO 最终形成 SearchRequest 请求发送给 ElasticSearch,在 TopNRecordsQueryEsDAO 中会设置查询条件以及排序方式,相关代码片段如下:

SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
// 指定查询的时间范围
boolQueryBuilder.must().add(QueryBuilders.rangeQuery(TopN.TIME_BUCKET).gte(startSecondTB).lte(endSecondTB));
// 指定查询的 DB对应的 serviceId
boolQueryBuilder.must().add(QueryBuilders.termQuery(TopN.SERVICE_ID, serviceId));
sourceBuilder.query(boolQueryBuilder);
// 按照 latency进行排序,指定返回 topN条记录
sourceBuilder.size(topN).sort(TopN.LATENCY, order.equals(Order.DES) ? SortOrder.DESC : SortOrder.ASC);
SearchResponse response = getClient().search(metricName, sourceBuilder);

之后会将查询到的每个 Document 中的 statement、traceId 以及 latency 字段值封装成 TopNRecord 对象返回。

除了上述 DB 慢查询的 TopN  查询之外,在 AggregationQuery 中还提供了为 Service 、ServiceInstance 以及 Endpoint 提供了其他维度的 TopN查询,如下图所示:

学新通

简单介绍下这些方法的功能:

  • getServiceeTopN()/getAllServiceInstanceTopN()/getAllEndpointTopN() 方法:按照 name 参数指定监控维度对所有 Service/ServiceInstance/Endpoint 进行排序并获取 TopN。

  • getServiceInstance()/getEndpointTopN() 方法:在 serviceId 参数指定 Service 中,按照 name 参数指定的监控维度对 ServiceInstance/Endpoint 进行排序并获取 TopN。

在 SkyWalking Rocketbot 中我们可以看到 Global Top Throughout 的监控,如下图所示:

学新通

其底层是通过 getServiceTopN() 方法统计指定时间段内所有 Service 的 CPM 平均值并获取 Top10 实现的。这里就以该示例为主线介绍 AggregationQuery 查询的核心流程。

AggregationQuery 收到的请求会经过 AggregationQuery -> AggregationQueryService -> AggregationQueryEsDAO,其中会格式化查询起止时间、根据 DownSampling 生成相应的 Index 别名等操作,前面已经简单介绍过这些通用操作,不再重复。

在 AggregationQueryEsDAO.getServiceTopN() 方法中会构造 SearchRequest 的查询条件,如下所示:

SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
// 指定查询的起止时间,示例中起止时间分别是201901072044~201901072059
sourceBuilder.query(QueryBuilders.rangeQuery(Metrics.TIME_BUCKET).lte(endTB).gte(startTB));
boolean asc = false; // 确定排序方式,示例中查询服务的吞吐量是从高到低排序的
if (order.equals(Order.ASC)) { asc = true; }
TermsAggregationBuilder aggregationBuilder = AggregationBuilders
    .terms(Metrics.ENTITY_ID) // 按照entity_id进行聚合,在以 service_cpm 为别名的 Index中 entity_id字段记录的是 serviceId
    .field(Metrics.ENTITY_ID)
    .order(BucketOrder.aggregation(valueCName, asc)) // 按照指定字段排序,示例中以 service_cpm 为别名的 Index会按照 value字段进行排序
    .size(topN) // 返回记录的数量,Skywalking Rocketbot传递的topN参数为10
    .subAggregation( // 根据 entity_id分组后会计算 valueCName字段的平均值,生成的新字段名称也为valueCName
        AggregationBuilders.avg(valueCName).field(valueCName)
    );
sourceBuilder.aggregation(aggregationBuilder);
// 发送SearchRequest请求
SearchResponse response = getClient().search(indexName, sourceBuilder);
学新通

完成查询之后会从 SearchResponse 中解析得到每个 Service 的 CPM 平均值,并封装成 TopNEntity 集合返回,具体实现如下:

List<TopNEntity> topNEntities = new ArrayList<>();
Terms idTerms = response.getAggregations().get(Metrics.ENTITY_ID);
for (Terms.Bucket termsBucket : idTerms.getBuckets()) {
    TopNEntity topNEntity = new TopNEntity();
    topNEntity.setId(termsBucket.getKeyAsString()); // 获取 ServiceId
    Avg value = termsBucket.getAggregations().get(valueCName); // 获取 cpm平均值
    topNEntity.setValue((long)value.getValue());
    topNEntities.add(topNEntity);
}
return topNEntities;

在将 TopNEntitiy 集合返回给前端展示之前,还会在 AggregationService 中查询 ServiceInventoryCache 获取对应的 serviceName 并记录到 TopNEntity.name字段中,查询 ServiceInventoryCache 的过程前面已经详细分析过,这里不再重复。

AggregationQuery 提供的其他 TopN 查询与 getServiceTopN() 方法实现基本类似,相信你看完 getServiceTopN() 方法的分析之后,完全可以读懂其他方法的实现。

TopologyQuery

首先请你回顾一下,在分析 MultiScopesSpanListener 的课时中,可以看到 OAP 会根据 Trace 的调用关系创建相应的 Relation 指标来记录调用链上的监控信息,例如,ServiceRelationServerCpmMetrics 指标记录了一个服务调用另一个服务的 cpm 值。

在 SkyWalking Rocketbot 中有一个“拓扑图”的视图,如下所示:

学新通

该拓扑图中展示的拓扑关系以及调用链上的指标数据是通过 query-graphql-plugin 插件提供的三个 get*Topology() 方法实现的,如下图所示:

学新通

在上述拓扑图展示的时候只需要请求 getGlobalTopology() 方法即可,在 TopologyQueryService.getGlobalTopology() 方法中会通过下面两个方法完成查询。

  • loadServerSideServiceRelations() 方法:查询 Index 别名为 service_relation_server_side 的 Index,该类 Index 中只记录了服务端视角的调用关系,并没有记录其他指标信息。在前面示例中,该查询的结果如下图所示:

学新通

  • loadClientSideServiceRelations() 方法:查询 Index 别名为 service_relation_client_side 的 Index,该类 Index 中只记录了客户端视角的调用关系,并没有记录其他指标信息。在前面示例中,该查询的结果如下图所示:

学新通

接下来,TopologyQueryService 会将上述两个查询结果集合进行合并和整理,最终得到一个 Topology 对象。在 Topology 对象中包含两个集合。

  • nodes 集合:包含了拓扑图中所有的节点信息,示例中的结果如下图所示,总共有 3 个节点,分别是 User、demo-webapp、demo-provider:

学新通

  • calls 集合:包含了拓扑图中所有的边(即调用关系),示例中的结果如下图所示,总共有 2 条边,一条边是 User 调用 demo-webapp(即 1_2),另一条边是 demo-webapp 调用 demo-provider(即2_3):

学新通

在侦察端面板中展示的监控图都是通过 getLinearIntValues() 方法查询相应 Index 实现的,例如上图中侦察端面板中展示的“平均响应时间”监控图,就是查询别名为 service_relation_server_resp_time 的这组 Index 实现的,其中指定了 entity_id 为 “2_3”(即 demo-webapp 调用 demo-provider 的这条调用链路的平均响应时间)。

除了查询完整的拓扑图之外,我们还可以以一个 Service 或 Endpoint 为中心进行拓扑图查询,分别对应前文提到的 getServiceTopology() 方法和 getEndpointTopology() 方法,这两个方法的查询逻辑与 getGlobalTopology() 方法基本类似,主要区别在于添加了 serviceId(或是 endpointId)的查询条件,具体实现不再展开,如果你感兴趣可以翻看一下源码。

TraceQuery

在 SkyWalking Rocketbot 的“追踪”面板中,我们可以查询到所有收集到的 Trace 信息,如下图所示:

学新通

该面板可以分为三个区域,在区域 1 中,我们可以选择 TraceSegment 关联的 Service、ServiceInstance 以及 Endpoint,这些下拉表中的数据是通过前文介绍的 MetadataQuery 查询到的。在区域 2 中展示了 TraceSegment 的简略信息,通过 queryBasicTraces() 方法查询得到,如下图所示。在区域 3 中展示了一条完整 Trace 的详细信息,通过 queryTrace() 方法查询得到,如下图所示。

学新通

TraceQuery.queryBasicTraces() 方法的入参被封装成了一个 TraceQueryCondition

对象,其中包含了一些查询 Trace 简略信息的条件,如下所示:

  • serviceId、serviceInstanceId、endpointId 字段:TraceSegment 关联的 Service、ServiceInstance、Endpoint。

  • traceId 字段:指定 TraceSegment 的 traceId。

  • queryDuration 字段:指定查询的时间跨度。

  • minTraceDuration 和 maxTraceDuration 字段:指定 TraceSegment 耗时范围,只查询耗时在 minTraceDuration~maxTraceDuration 之间的 Trace。

  • traceState 字段:Trace 的状态信息,枚举,可选值有 ALL、SUCC、ERROR 三个值。

  • queryOrder 字段:查询结果的排序方式,枚举,可选值有 BY_DURATION、BY_START_TIME 两个值。

  • paging 字段:分页信息,类似于 SQL 语句中的 limit 部分,指定了此次查询的起始位置以及结果条数。

同样的,最终创建以及执行 SearchRequest 请求的逻辑在底层的 TraceQueryEsDAO 中,具体代码逻辑如下,基本与 TraceCondition 中的字段一个一个地对应:

SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
sourceBuilder.query(boolQueryBuilder);
List<QueryBuilder> mustQueryList = boolQueryBuilder.must();
if (startSecondTB != 0 && endSecondTB != 0) { // 查询时间范围,即过滤 time_bucket字段
    mustQueryList.add(QueryBuilders.rangeQuery(SegmentRecord.TIME_BUCKET).gte(startSecondTB).lte(endSecondTB));
}
if (minDuration != 0 || maxDuration != 0) { // 查询TraceSegment的耗时范围,即过滤 latency字段
    RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(SegmentRecord.LATENCY);
    if (minDuration != 0) { rangeQueryBuilder.gte(minDuration); }
    if (maxDuration != 0) { rangeQueryBuilder.lte(maxDuration); }
    boolQueryBuilder.must().add(rangeQueryBuilder);
}
if (!Strings.isNullOrEmpty(endpointName)) { // 过滤 endpoint_name字段
    String matchCName = MatchCNameBuilder.INSTANCE.build(SegmentRecord.ENDPOINT_NAME);
    mustQueryList.add(QueryBuilders.matchPhraseQuery(matchCName, endpointName));
}
if (serviceId != 0) { // 查询 TraceSegment所属的Service,即过滤 service_id字段
    boolQueryBuilder.must().add(QueryBuilders.termQuery(SegmentRecord.SERVICE_ID, serviceId));
}
if (serviceInstanceId != 0) { // 查询 TraceSegment所属的ServiceInstance,即过滤 service_instance_id字段
    boolQueryBuilder.must().add(QueryBuilders.termQuery(SegmentRecord.SERVICE_INSTANCE_ID, serviceInstanceId));
}
if (endpointId != 0) {// 查询 TraceSegment所属的 Endpoint,即过滤endpoint_id字段
    boolQueryBuilder.must().add(QueryBuilders.termQuery(SegmentRecord.ENDPOINT_ID, endpointId));
}
if (!Strings.isNullOrEmpty(traceId)) { // 查询 TraceSegment所属的 traceId,即过滤 trace_id字段
    boolQueryBuilder.must().add(QueryBuilders.termQuery(SegmentRecord.TRACE_ID, traceId));
}
switch (traceState) { // 查询 TraceSegment覆盖的逻辑是否发生异常,即过滤 is_error字段
    case ERROR:
        mustQueryList.add(QueryBuilders.matchQuery(SegmentRecord.IS_ERROR, BooleanUtils.TRUE));
        break;
    case SUCCESS:
        mustQueryList.add(QueryBuilders.matchQuery(SegmentRecord.IS_ERROR, BooleanUtils.FALSE));
        break;
}
switch (queryOrder) { // 查询得到的多个 TraceSegment的排序字段,可以按照 start_time字段或是 latency字段逆序排序
    case BY_START_TIME:
        sourceBuilder.sort(SegmentRecord.START_TIME, SortOrder.DESC);
        break;
    case BY_DURATION:
        sourceBuilder.sort(SegmentRecord.LATENCY, SortOrder.DESC);
        break;
}
sourceBuilder.size(limit); // 指定此次查询返回的Document个数
sourceBuilder.from(from); // 指定查询的起始位置
// 执行上述 SearchRequest请求,查询的是别名为 segment的Index
SearchResponse response = getClient().search(SegmentRecord.INDEX_NAME, sourceBuilder);
学新通

完成查询之后会将查询得到的所有 TraceSegment 的 segmentId、traceId、耗时(latency)、起始时间(startTime)以及 isError 状态封装成 BasicTrace 集合返回给前端进行展示,相应的逻辑比较简单,这里不再展开。

在“追踪”面板的区域 2 中展示了 BasicTrace 集合(即 TraceSegment 的简略信息)之后,我们可以点击任意一个 TraceSegment,可通过 queryTrace() 方法查询其所在 Trace 的全部 TraceSegment 并展示在区域 3 中,该请求会直接委托给 TraceQueryEsDAO.queryByTraceId() 方法,使用的 SearchRequest 请求比较简单:

SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
// 精确匹配 trace_id字段
sourceBuilder.query(QueryBuilders.termQuery(SegmentRecord.TRACE_ID, traceId));
// 一条Trace中TraceSegment的个数上限默认是200,application.yml文件中有相应配置项可调整
sourceBuilder.size(segmentQueryMaxSize);
// 执行 SearchRequest请求,查询的依旧是别名为 segment的Index
SearchResponse response = getClient().search(SegmentRecord.INDEX_NAME, sourceBuilder);

查询完成之后会为每个 Document 创建相应的 SegmentRecord 对象(前面 trace-receiver-plugin 写入 ES 的时候也是用的该对象)并将 Document 中的字段填充到 SegmentRecord 对象的字段。

接下来会创建一个 Trace 对象作为请求返回值,主要分为下面两个操作:

1、创建 Trace 返回值,收集全部 Span 对象。

TraceQueryService 会逐个反序列化上述 SegmentRecord 中的 dataBinary 字段,拿到该 TraceSegment 中的所有 Span ,然后将这些 Span 统统记录到 Trace 对象的spans 集合中。记得在 Trace 数据写入的过程中,有字符串转换到唯一 id 的过程(即 Exchange 过程),这里填充 Trace.spans 集合的时候会完成 id 到可读的字符串的逆转换,比如,serviceId 会被恢复成 serviceName、endpointId 会被恢复成 endpointName、componentId 会被恢复成 componentName 等等,这些都会伴随着一些前面介绍过的 Cache 以及 ES 查询。另外,还会反序列化每个 Span 携带的额外信息。例如 Log 信息和 Tag 信息。

Trace 类以及 Span 类对应的是 GraphQL Schema 中的 Trace 以及 Span 定义,是 Java 与前端代码交互的 DTO,而 SegmentRecord 则是 OAP 内部以及 OAP 与 ElasticSearch 交互的 Domain,虽然都表示TraceSegment、字段类似、携带的信息差不多,但是使用的位置不同,是常见的一种解耦方式。

2、 排序 Span。

TraceQueryService 会按照 parentSpanId 排序 Trace.spans 集合中 Span 对象(父 Span 在前,子 Span 在后),大致实现如下:

List<Span> sortedSpans = new LinkedList<>();
// 查找该Trace中最顶层的rootSpan,即第一个 Span
List<Span> rootSpans = findRoot(trace.getSpans());
rootSpans.forEach(span -> {
    List<Span> childrenSpan = new ArrayList<>();
    childrenSpan.add(span); 
    // 这里会递归查找当前span的子Span,并添加到sortedSpans这个List中
    findChildren(trace.getSpans(), span, childrenSpan);
    sortedSpans.addAll(childrenSpan);
});
// 重新设置 Trace.spans字段
trace.getSpans().clear();
trace.getSpans().addAll(sortedSpans);
return trace;

下面通过一个示例描述该递归排序 Span 的大致执行逻辑:

学新通

query-graphql-plugin 插件的分析就到此结束了,我们下一课时见。


这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhfckbja
系列文章
更多 icon
同类精品
更多 icon
继续加载