• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

CnosDB 2.0 Arrow Flight SQL使用指北

武飞扬头像
CnosDB
帮助1

学新通

Arrow Flight SQL

Arrow Flight SQL 是一种使用 Arrow 内存格式和 Flight RPC 框架与 SQL 数据库交互的协议,其结合Arrow内存中的列式格式(Columnar Format)以及Flight RPC框架,来加速SQL数据库操作。通过使用Arrow Flight SQL,用户访问数据时,不仅可以使用原生SQL的标准语法,而且可以大幅度地提升数据访问性能,让十亿行数据查询秒级响应成为可能!

目前我们支持Arrow Flight SQL 客户端的环境有

• C

• Go

• Java

• Rust

• 基于Arrow Flight SQL 的JDBC

Arrow Flight SQL 的优势

Arrow Flight SQL具有的优势:

1. 功能强大:功能与JDBC和ODBC等API类似,包括执行查询,创建准备好的语句。

2. 安全:使用 Flight,支持开箱即用的加密和身份验证等功能。

3. 性能:与实现Arrow Flight 的客户端和服务端通信,无需进行数据转化,同时允许进一步优化,如并行数据访问,从而大幅度提升数据访问性能。

Arrow Flight 与 JDBC/ODBC性能的比较:

1. Arrow Flight 在客户端和服务端通信传输数据时,无需进行数据转化,而ODBC的实现通常需要自定义在线二进制协议。

2. Arrow Flight 可以并行传输数据,先获取数据的访问计划,数据可以分布在不同的服务器上,客户端可以并行从不同服务器上拉取数据。

3. Arrow Flight 采用 Arrow Columnar Format 格式,该格式获取数据的复杂度是O(1),对向量化计算友好。

虽然它可以直接用于数据库访问,但它不能直接替代 JDBC/ODBC。 但是,Arrow Flight SQL 可以用作具体的有线协议/驱动程序实现,支持 JDBC/ODBC 驱动程序,并减少数据库的实现负担。

学新通

客户端使用Arrow Flight SQL与数据库连接,查询数据,执行SQL的流程大致如下:

1. 创建Flight SQL客户端

2. 验证用户名,密码

3. 执行SQL,获取FlightInfo结构体

4. 通过FlightInfo结构体中的FlightEndPoint获取到FlightData数据流

FlightInfo中包含有关数据所在位置的详细信息,客户端可以从适当的服务器获取数据。服务器信息被编码为 FlightInfo 中的一系列 FlightEndpoint 消息。 每个Endpoint代表包含响应数据子集的某个位置。

一个FlightEndpoint包含一个服务器地址列表,一个Ticket,一个服务器用来识别请求数据的二进制Token。FlightEndPoint 没有定义顺序,如果数据集是排序的,只会在一个FlightEndPoint中返回数据。

流程图如下:

学新通

C

1.安装Apache Arrow 你可以去官方文档(
https://arrow.apache.org/install/)找到详细的安装教程。在Mac系统下,使用brew命令就可以简单安装。

  1.  
    brew install apache-arrow
  2.  
    brew install apache-arrow-glib

2. 配置CMakeLists.txt

  1.  
    cmake_minimum_required(VERSION 3.24)
  2.  
    project(arrow_flight_cpp)
  3.  
     
  4.  
     
  5.  
    set(CMAKE_CXX_STANDARD 20)
  6.  
     
  7.  
     
  8.  
    find_package(Arrow REQUIRED)
  9.  
    find_package(ArrowFlight REQUIRED)
  10.  
    find_package(ArrowFlightSql REQUIRED)
  11.  
     
  12.  
     
  13.  
    include_directories(${ARROW_INCLUDE_DIR})
  14.  
    add_executable(arrow_flight_cpp main.cpp)
  15.  
    target_link_libraries(arrow_flight_cpp PRIVATE Arrow::arrow_shared)
  16.  
    target_link_libraries(arrow_flight_cpp PRIVATE ArrowFlight::arrow_flight_shared)
  17.  
    target_link_libraries(arrow_flight_cpp PRIVATE ArrowFlightSql::arrow_flight_sql_shared)
学新通

3. C Arrow库的用法arrow的函数大多数是返回arrow::Result<T>类型,因此需要把代码写在返回值为 arrow::Result<T>的类型的函数中,如下

  1.  
    arrow::Result <std::unique_ptr<FlightClient>> get_location() {
  2.  
    ARROW_ASSIGN_OR_RAISE(auto location, Location::ForGrpcTcp("localhost", 31004));
  3.  
    ARROW_ASSIGN_OR_RAISE(auto client, FlightClient::Connect(location))
  4.  
    }

ARROW_ASSIGN_OR_RAISE宏的效果是,先对右边返回值为arrow::Result<T>类型的表达式求值,如果出现异常,则提前return,赋上相应的Status值

为了方便,示例代码均写在lambda函数中

  1.  
    int main() {
  2.  
    auto fun = []() {
  3.  
    // code
  4.  
    }
  5.  
    fun();
  6.  
    return 0;
  7.  
    }

4. 执行身份验证,并创建一个FlightSqlClient

  1.  
    ARROW_ASSIGN_OR_RAISE(auto location, Location::ForGrpcTcp("localhost", 31004));
  2.  
    ARROW_ASSIGN_OR_RAISE(auto client, FlightClient::Connect(location))
  3.  
    auto user = "root";
  4.  
    auto password = "";
  5.  
    //Base64加密认证
  6.  
    auto auth = client->AuthenticateBasicToken({}, user, password);
  7.  
    auto sql_client = std::make_unique<FlightSqlClient>(std::move(client));
  8.  
     
  9.  
     
  10.  
    ARROW_RETURN_NOT_OK(auth); // 如果result出现异常,直接return
  11.  
    FlightCallOptions call_options;
  12.  
    call_options.headers.push_back(auth.ValueOrDie()); //把认证放到调用选项中
  13.  
     

5. 执行SQL取得FlightInfo

  1.  
    ARROW_ASSIGN_OR_RAISE(auto info, sql_client->Execute(call_options, "SELECT now();"));
  2.  
    const auto endpoints = info->endpoints();

6. 通过FlightEndPoint取回数据

  1.  
    for (auto i = 0; i < endpoints.size(); i ) {
  2.  
    auto &ticket = endpoints[i].ticket;
  3.  
    // stream中包含数据
  4.  
    ARROW_ASSIGN_OR_RAISE(auto stream, sql_client->DoGet(call_options, ticket));
  5.  
    // 获取数据的Schema
  6.  
    auto schema = stream->GetSchema();
  7.  
    ARROW_RETURN_NOT_OK(schema);
  8.  
    std::cout << "Schema:" << schema->get()->ToString() << std::endl;
  9.  
    // 取得并打印数据
  10.  
    while(true) {
  11.  
    ARROW_ASSIGN_OR_RAISE(FlightStreamChunk chunk, stream->Next());
  12.  
    if (chunk.data == nullptr) {
  13.  
    break;
  14.  
    }
  15.  
    std::cout << chunk.data->ToString();
  16.  
    }
  17.  
    }
学新通

7.整体代码

  1.  
    #include <iostream>
  2.  
    #include <arrow/flight/api.h>
  3.  
    #include <arrow/flight/sql/api.h>
  4.  
    using namespace arrow::flight;
  5.  
    using namespace arrow::flight::sql;
  6.  
    using namespace arrow;
  7.  
     
  8.  
     
  9.  
    int main() {
  10.  
     
  11.  
     
  12.  
    auto fun = []() {
  13.  
    ARROW_ASSIGN_OR_RAISE(auto location, Location::ForGrpcTcp("localhost", 31004));
  14.  
    ARROW_ASSIGN_OR_RAISE(auto client, FlightClient::Connect(location))
  15.  
    std::cout << "location client" << std::endl;
  16.  
    auto sql_client = std::make_unique<FlightSqlClient>(std::move(client));
  17.  
     
  18.  
     
  19.  
    auto user = "root";
  20.  
    auto password = "";
  21.  
    auto auth = client->AuthenticateBasicToken({}, user, password);
  22.  
    ARROW_RETURN_NOT_OK(auth);
  23.  
    FlightCallOptions call_options;
  24.  
    call_options.headers.push_back(auth.ValueOrDie());
  25.  
     
  26.  
     
  27.  
    ARROW_ASSIGN_OR_RAISE(auto info, sql_client->Execute(call_options, "SELECT now();"));
  28.  
    const auto endpoints = info->endpoints();
  29.  
    for (auto i = 0; i < endpoints.size(); i ) {
  30.  
    auto &ticket = endpoints[i].ticket;
  31.  
     
  32.  
     
  33.  
    ARROW_ASSIGN_OR_RAISE(auto stream, sql_client->DoGet(call_options, ticket));
  34.  
     
  35.  
     
  36.  
    auto schema = stream->GetSchema();
  37.  
    ARROW_RETURN_NOT_OK(schema);
  38.  
     
  39.  
     
  40.  
    std::cout << "Schema:" << schema->get()->ToString() << std::endl;
  41.  
    while(true) {
  42.  
    ARROW_ASSIGN_OR_RAISE(FlightStreamChunk chunk, stream->Next());
  43.  
    if (chunk.data == nullptr) {
  44.  
    break;
  45.  
    }
  46.  
    std::cout << chunk.data->ToString();
  47.  
    }
  48.  
    }
  49.  
    return Status::OK();
  50.  
    };
  51.  
     
  52.  
     
  53.  
    auto status = fun();
  54.  
    std::cout << status.ToString() << std::endl;
  55.  
     
  56.  
     
  57.  
    return 0;
  58.  
    }
学新通

GO

1. 添加依赖

在go.mod中写入依赖

  1.  
    require (
  2.  
    github.com/apache/arrow/go/v10 v10.0.1
  3.  
    谷歌.golang.org/grpc v1.51.0
  4.  
    )

2. 创建Flight SQL客户端

  1.  
    var dialOpts = []grpc.DialOption{
  2.  
    grpc.WithTransportCredentials(insecure.NewCredentials()),
  3.  
    }
  4.  
    cl, err := flightsql.NewClient("localhost:31004", nil, nil, dialOpts...)
  5.  
    if err != nil {
  6.  
    fmt.Print(err)
  7.  
    return
  8.  
    }

3. 设置连接凭证,并取得已经验证的上下文

  1.  
    ctx, err := cl.Client.AuthenticateBasicToken(context.Background(), "root", "")
  2.  
    if err != nil {
  3.  
    fmt.Print(err)
  4.  
    return
  5.  
    }

4. 在已经验证的上下文中执行SQL,取得FlightInfo

  1.  
    info, err := cl.Execute(ctx, "SELECT now();")
  2.  
    if err != nil {
  3.  
    fmt.Print(err)
  4.  
    return
  5.  
    }
  6.  
    fmt.Println(info.Schema)

5. 根据FlightInfo取得数据Reader

  1.  
    // 目前CnosDb仅实现了一个EndPoint
  2.  
    rdr, err := cl.DoGet(ctx, info.GetEndpoint()[0].Ticket)
  3.  
    if err != nil {
  4.  
    fmt.Print(err)
  5.  
    fmt.Println(35)
  6.  
    return
  7.  
    }
  8.  
    defer rdr.Release()

6. 操作Reader打印数据

  1.  
    n := 0
  2.  
    for rdr.Next() {
  3.  
    record := rdr.Record()
  4.  
    for i, col := range record.Columns() {
  5.  
    fmt.Printf("rec[%d][%q]: %v\n", n, record.ColumnName(i), col)
  6.  
    }
  7.  
    column := record.Column(0)
  8.  
    column.String()
  9.  
    n
  10.  
    }

JAVA

1. 添加依赖

  1. • 如果你使用maven构建Java项目,在pom.xml中写入依赖
  1.  
    <dependencies>
  2.  
    <!-- https://mvnrepository.com/artifact/org.apache.arrow/arrow-flight -->
  3.  
    <dependency>
  4.  
    <groupId>org.apache.arrow</groupId>
  5.  
    <artifactId>arrow-flight</artifactId>
  6.  
    <version>10.0.1</version>
  7.  
    <type>pom</type>
  8.  
    </dependency>
  9.  
     
  10.  
     
  11.  
    <!-- https://mvnrepository.com/artifact/org.apache.arrow/flight-sql -->
  12.  
    <dependency>
  13.  
    <groupId>org.apache.arrow</groupId>
  14.  
    <artifactId>flight-sql</artifactId>
  15.  
    <version>10.0.1</version>
  16.  
    </dependency>
  17.  
     
  18.  
     
  19.  
    <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
  20.  
    <dependency>
  21.  
    <groupId>org.slf4j</groupId>
  22.  
    <artifactId>slf4j-api</artifactId>
  23.  
    <version>2.0.5</version>
  24.  
    </dependency>
  25.  
     
  26.  
     
  27.  
    <!-- https://mvnrepository.com/artifact/org.apache.arrow/flight-core -->
  28.  
    <dependency>
  29.  
    <groupId>org.apache.arrow</groupId>
  30.  
    <artifactId>arrow-memory-netty</artifactId>
  31.  
    <version>10.0.1</version>
  32.  
    </dependency>
  33.  
     
  34.  
     
  35.  
    <!-- https://mvnrepository.com/artifact/org.apache.arrow/flight-core -->
  36.  
    <dependency>
  37.  
    <groupId>org.apache.arrow</groupId>
  38.  
    <artifactId>flight-core</artifactId>
  39.  
    <version>10.0.1</version>
  40.  
    </dependency>
  41.  
    </dependencies>
学新通

• 再写入

  1.  
    <build>
  2.  
    <extensions>
  3.  
    <extension>
  4.  
    <groupId>kr.motd.maven</groupId>
  5.  
    <artifactId>os-maven-plugin</artifactId>
  6.  
    <version>1.7.1</version>
  7.  
    </extension>
  8.  
    </extensions>
  9.  
    </build>

• 添加环境变量

_JAVA_OPTIONS="--add-opens=java.base/java.nio=ALL-UNNAMED"
  1.  
    java --add-opens=java.base/java.nio=ALL-UNNAMED -jar ...
  2.  
    # 或
  3.  
    env _JAVA_OPTIONS="--add-opens=java.base/java.nio=ALL-UNNAMED" java -jar ...
  4.  
     
  5.  
     
  6.  
     
  7.  
     
  8.  
    # 如果使用 maven
  9.  
    _JAVA_OPTIONS="--add-opens=java.base/java.nio=ALL-UNNAMED" mvn exec:java -Dexec.mainClass="YourMainCode"

2. 建FlightSqlClient

  1.  
    BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
  2.  
    final Location clientLocation = Location.forGrpcInsecure("localhost", 31004);
  3.  
     
  4.  
     
  5.  
    FlightClient client = FlightClient.builder(allocator, clientLocation).build();
  6.  
    FlightSqlClient sqlClinet = new FlightSqlClient(client);

3. 配置认证

  1.  
    Optional<CredentialCallOption> credentialCallOption = client.authenticateBasicToken("root", "");
  2.  
    final CallHeaders headers = new FlightCallHeaders();
  3.  
    headers.insert("tenant", "cnosdb");
  4.  
    Set<CallOption> options = new HashSet<>();
  5.  
     
  6.  
     
  7.  
    credentialCallOption.ifPresent(options::add);
  8.  
    options.add(new HeaderCallOption(headers));
  9.  
    CallOption[] callOptions = options.toArray(new CallOption[0]);

4. 执行SQL,取得FlightInfo

  1.  
    try (final FlightSqlClient.PreparedStatement preparedStatement = sqlClinet.prepare("SELECT now();", callOptions)) {
  2.  
    final FlightInfo info = preparedStatement.execute();
  3.  
    System.out.println(info.getSchema());
  4.  
     
  5.  
    //剩余代码在下一个步骤
  6.  
    }

5. 取得数据

  1.  
    final Ticket ticket = info.getEndpoints().get(0).getTicket();
  2.  
    try (FlightStream stream = sqlClinet.getStream(ticket)) {
  3.  
    int n = 0;
  4.  
    while (stream.next()) {
  5.  
    List<FieldVector> vectors = stream.getRoot().getFieldVectors();
  6.  
    for (int i = 0; i < vectors.size(); i ) {
  7.  
    System.out.printf("%d %d %s", n, i , vectors.get(i));
  8.  
    }
  9.  
    n ;
  10.  
    }
  11.  
    } catch (Exception e) {
  12.  
    throw new RuntimeException(e);
  13.  
    }

6. 全部代码

  1.  
    package org.example;
  2.  
     
  3.  
     
  4.  
    import org.apache.arrow.flight.*;
  5.  
    import org.apache.arrow.flight.grpc.CredentialCallOption;
  6.  
    import org.apache.arrow.flight.sql.FlightSqlClient;
  7.  
    import org.apache.arrow.memory.BufferAllocator;
  8.  
    import org.apache.arrow.memory.RootAllocator;
  9.  
    import org.apache.arrow.vector.FieldVector;
  10.  
     
  11.  
     
  12.  
    import java.util.HashSet;
  13.  
    import java.util.List;
  14.  
    import java.util.Optional;
  15.  
    import java.util.Set;
  16.  
     
  17.  
     
  18.  
     
  19.  
     
  20.  
    public class Main {
  21.  
    public static void main(String[] args) {
  22.  
    BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
  23.  
    final Location clientLocation = Location.forGrpcInsecure("localhost", 31004);
  24.  
     
  25.  
     
  26.  
    FlightClient client = FlightClient.builder(allocator, clientLocation).build();
  27.  
    FlightSqlClient sqlClinet = new FlightSqlClient(client);
  28.  
     
  29.  
     
  30.  
    Optional<CredentialCallOption> credentialCallOption = client.authenticateBasicToken("root", "");
  31.  
    final CallHeaders headers = new FlightCallHeaders();
  32.  
    headers.insert("tenant", "cnosdb");
  33.  
    Set<CallOption> options = new HashSet<>();
  34.  
     
  35.  
     
  36.  
    credentialCallOption.ifPresent(options::add);
  37.  
    options.add(new HeaderCallOption(headers));
  38.  
    CallOption[] callOptions = options.toArray(new CallOption[0]);
  39.  
     
  40.  
     
  41.  
    try (final FlightSqlClient.PreparedStatement preparedStatement = sqlClinet.prepare("SELECT now();", callOptions)) {
  42.  
    final FlightInfo info = preparedStatement.execute();
  43.  
    System.out.println(info.getSchema());
  44.  
    final Ticket ticket = info.getEndpoints().get(0).getTicket();
  45.  
    try (FlightStream stream = sqlClinet.getStream(ticket)) {
  46.  
    int n = 0;
  47.  
    while (stream.next()) {
  48.  
    List<FieldVector> vectors = stream.getRoot().getFieldVectors();
  49.  
    for (int i = 0; i < vectors.size(); i ) {
  50.  
    System.out.printf("%d %d %s", n, i , vectors.get(i));
  51.  
    }
  52.  
    n ;
  53.  
    }
  54.  
    } catch (Exception e) {
  55.  
    throw new RuntimeException(e);
  56.  
    }
  57.  
    }
  58.  
    }
  59.  
    }
学新通

JDBC

1. 添加依赖

  1.  
    <dependencies>
  2.  
    <dependency>
  3.  
    <groupId>org.apache.arrow</groupId>
  4.  
    <artifactId>arrow-jdbc</artifactId>
  5.  
    <version>10.0.1</version>
  6.  
    </dependency>
  7.  
    <!-- https://mvnrepository.com/artifact/org.apache.arrow/flight-sql-jdbc-driver -->
  8.  
    <dependency>
  9.  
    <groupId>org.apache.arrow</groupId>
  10.  
    <artifactId>flight-sql-jdbc-driver</artifactId>
  11.  
    <version>10.0.1</version>
  12.  
    </dependency>
  13.  
    </dependencies>

添加环境变量

_JAVA_OPTIONS="--add-opens=java.base/java.nio=ALL-UNNAMED"
  1.  
    java --add-opens=java.base/java.nio=ALL-UNNAMED -jar ...
  2.  
    # 或
  3.  
    env _JAVA_OPTIONS="--add-opens=java.base/java.nio=ALL-UNNAMED" java -jar ...
  4.  
     
  5.  
     
  6.  
     
  7.  
     
  8.  
    # 如果使用 maven
  9.  
    _JAVA_OPTIONS="--add-opens=java.base/java.nio=ALL-UNNAMED" mvn exec:java -Dexec.mainClass="YourMainCode"

2. 设置属性并查询

  1.  
    package org.example;
  2.  
     
  3.  
     
  4.  
    import java.sql.*;
  5.  
    import java.util.Properties;
  6.  
     
  7.  
     
  8.  
    public class Main {
  9.  
    public static void main(String[] args) {
  10.  
    final Properties properties = new Properties();
  11.  
    properties.put("user", "root"); //用户名
  12.  
    properties.put("password", ""); //密码
  13.  
    properties.put("tenant", "cnosdb");//租户
  14.  
    properties.put("useEncryption", false);
  15.  
    try (
  16.  
    Connection connection = DriverManager.getConnection(
  17.  
    "jdbc:arrow-flight-sql://localhost:31004", properties
  18.  
    );
  19.  
    Statement statement = connection.createStatement())
  20.  
    {
  21.  
    ResultSet resultSet = statement.executeQuery("SELECT 1, 2, 3;");
  22.  
     
  23.  
     
  24.  
    while (resultSet.next()) {
  25.  
    int column1 = resultSet.getInt(1);
  26.  
    int column2 = resultSet.getInt(2);
  27.  
    int column3 = resultSet.getInt(3);
  28.  
    System.out.printf("%d %d %d", column1, column2, column3);
  29.  
    }
  30.  
    } catch (SQLException e) {
  31.  
    throw new RuntimeException(e);
  32.  
    }
  33.  
    }
  34.  
    }
学新通

3.设置属性并执行SQL

  1.  
    package org.example;
  2.  
     
  3.  
     
  4.  
    import java.sql.*;
  5.  
    import java.util.Properties;
  6.  
     
  7.  
     
  8.  
    public class Main {
  9.  
    public static void main(String[] args) {
  10.  
    final Properties properties = new Properties();
  11.  
    properties.put("user", "root");
  12.  
    properties.put("password", "");
  13.  
    properties.put("tenant", "cnosdb");
  14.  
    properties.put("useEncryption", false);
  15.  
    try (
  16.  
    Connection connection = DriverManager.getConnection(
  17.  
    "jdbc:arrow-flight-sql://localhost:31004", properties
  18.  
    );
  19.  
    Statement statement = connection.createStatement())
  20.  
    {
  21.  
    statement.execute("CREATE TABLE IF NOT EXISTS air\n"
  22.  
    "(\n"
  23.  
    " visibility DOUBLE,\n"
  24.  
    " temperature DOUBLE,\n"
  25.  
    " pressure DOUBLE,\n"
  26.  
    " TAGS(station)\n"
  27.  
    ");");
  28.  
    statement.executeUpdate("INSERT INTO air (TIME, station, visibility, temperature, pressure) VALUES\n"
  29.  
    " (1666165200290401000, 'XiaoMaiDao', 56, 69, 77);");
  30.  
    ResultSet resultSet = statement.executeQuery("select * from air limit 1;");
  31.  
     
  32.  
     
  33.  
    while (resultSet.next()) {
  34.  
    Timestamp column1 = resultSet.getTimestamp(1);
  35.  
    String column2 = resultSet.getString(2);
  36.  
    Double column3 = resultSet.getDouble(3);
  37.  
    Double column4 = resultSet.getDouble(4);
  38.  
    Double column5 = resultSet.getDouble(5);
  39.  
     
  40.  
     
  41.  
    System.out.printf("%s %s %f %f %f", column1, column2, column3, column4, column5);
  42.  
    }
  43.  
    } catch (SQLException e) {
  44.  
    throw new RuntimeException(e);
  45.  
    }
  46.  
    }
  47.  
    }
  48.  
     
学新通

Rust

代码运行在异步环境下

1. 添加依赖

  1.  
    arrow = {version = "28.0.0", features = ["prettyprint"] }
  2.  
    arrow-flight = {version = "28.0.0", features = ["flight-sql-experimental"]}
  3.  
    tokio = "1.23.0"
  4.  
    futures = "0.3.25"
  5.  
    prost-types = "0.11.2"
  6.  
    tonic = "0.8.3"
  7.  
    prost = "0.11.3"
  8.  
    http-auth-basic = "0.3.3"
  9.  
    base64 = "0.13.1"

2. 创建FlightServerClient

  1.  
    let mut client = FlightServiceClient::connect("http://localhost:31004")
  2.  
    .await
  3.  
    .expect("connect faile");

3. 进行验证

  1.  
    let mut req = Request::new(futures::stream::iter(iter::once(
  2.  
    HandshakeRequest::default(),
  3.  
    )));
  4.  
     
  5.  
     
  6.  
    req.metadata_mut().insert(
  7.  
    AUTHORIZATION.as_str(),
  8.  
    AsciiMetadataValue::try_from(format!(
  9.  
    "Basic {}",
  10.  
    base64::encode(format!("{}:{}", "root", ""))
  11.  
    ))
  12.  
    .expect("metadata construct fail"),
  13.  
    );
  14.  
     
  15.  
     
  16.  
    let resp = client.handshake(req).await.expect("handshake");
  17.  
     
  18.  
     
  19.  
    println!("handshake resp: {:?}", resp.metadata());
学新通

4. 执行SQL

let cmd = CommandStatementQuery {  query: "select now();".to_string(),};let pack = prost_types::Any::pack(&cmd).expect("pack");let fd = FlightDescriptor::new_cmd(pack.encode_to_vec());let mut req = Request::new(fd);req.metadata_mut().insert(  AUTHORIZATION.as_str(),  resp.metadata().get(AUTHORIZATION.as_str()).unwrap().clone(),);let resp = client.get_flight_info(req).await.expect("get_flight_info");let flight_info = resp.into_inner();let schema_ref =Arc::new(Schema::try_from(IpcMessage(flight_info.schema)).expect("Schema::try_from"));println!("{}", schema_ref);

5. 取得数据并打印

for ep in flight_info.endpoint {  if let Some(ticket) = ep.ticket {    let resp = client.do_get(ticket).await.expect("do_get");    let mut stream = resp.into_inner();    let mut dictionaries_by_id = HashMap::new();    let mut record_batches = Vec::new();    while let Some(Ok(flight_data)) = stream.next().await {      let message =      root_as_message(&flight_data.data_header[..]).expect("root as message");      match message.header_type() {        ipc::MessageHeader::Schema => {          println!("a schema when messages are read",);        }        ipc::MessageHeader::RecordBatch => {          let record_batch = flight_data_to_arrow_batch(            &flight_data,            schema_ref.clone(),            &dictionaries_by_id,          )          .expect("record_batch_from_message");          record_batches.push(record_batch);        }        ipc::MessageHeader::DictionaryBatch => {          let ipc_batch = message.header_as_dictionary_batch().unwrap();          reader::read_dictionary(            &Buffer::from(flight_data.data_body),            ipc_batch,            &schema_ref,            &mut dictionaries_by_id,            &message.version(),          )          .unwrap();        }        _ => {          panic!("Reading types other than record batches not yet supported");        }      }    }    println!(      "{}",      arrow::util::pretty::pretty_format_batches(&record_batches).expect("print")    );  }}

6. 完整代码

  1.  
    use std::collections::HashMap;
  2.  
    use std::iter;
  3.  
    use std::sync::Arc;
  4.  
     
  5.  
     
  6.  
    use arrow::buffer::Buffer;
  7.  
    use arrow::datatypes::Schema;
  8.  
    use arrow::ipc;
  9.  
    use arrow::ipc::{reader, root_as_message};
  10.  
    use arrow_flight::flight_service_client::FlightServiceClient;
  11.  
    use arrow_flight::sql::{CommandStatementQuery, ProstAnyExt};
  12.  
    use arrow_flight::utils::flight_data_to_arrow_batch;
  13.  
    use arrow_flight::{FlightDescriptor, HandshakeRequest, IpcMessage};
  14.  
    use futures::StreamExt;
  15.  
     
  16.  
     
  17.  
    use prost::Message;
  18.  
    use tonic::codegen::http::header::AUTHORIZATION;
  19.  
    use tonic::metadata::AsciiMetadataValue;
  20.  
    use tonic::Request;
  21.  
     
  22.  
     
  23.  
    #[tokio::main]
  24.  
    async fn main() {
  25.  
     
  26.  
     
  27.  
    let mut client = FlightServiceClient::connect("http://localhost:31004")
  28.  
    .await
  29.  
    .expect("connect");
  30.  
     
  31.  
     
  32.  
    let mut req = Request::new(futures::stream::iter(iter::once(
  33.  
    HandshakeRequest::default(),
  34.  
    )));
  35.  
     
  36.  
     
  37.  
    req.metadata_mut().insert(
  38.  
    AUTHORIZATION.as_str(),
  39.  
    AsciiMetadataValue::try_from(format!(
  40.  
    "Basic {}",
  41.  
    base64::encode(format!("{}:{}", "root", ""))
  42.  
    ))
  43.  
    .expect("metadata construct fail"),
  44.  
    );
  45.  
     
  46.  
     
  47.  
    let resp = client.handshake(req).await.expect("handshake");
  48.  
     
  49.  
     
  50.  
    println!("handshake resp: {:?}", resp.metadata());
  51.  
     
  52.  
     
  53.  
    let cmd = CommandStatementQuery {
  54.  
    query: "select now();".to_string(),
  55.  
    };
  56.  
    let pack = prost_types::Any::pack(&cmd).expect("pack");
  57.  
    let fd = FlightDescriptor::new_cmd(pack.encode_to_vec());
  58.  
     
  59.  
     
  60.  
    let mut req = Request::new(fd);
  61.  
    req.metadata_mut().insert(
  62.  
    AUTHORIZATION.as_str(),
  63.  
    resp.metadata().get(AUTHORIZATION.as_str()).unwrap().clone(),
  64.  
    );
  65.  
    let resp = client.get_flight_info(req).await.expect("get_flight_info");
  66.  
     
  67.  
     
  68.  
    let flight_info = resp.into_inner();
  69.  
    let schema_ref =
  70.  
    Arc::new(Schema::try_from(IpcMessage(flight_info.schema)).expect("Schema::try_from"));
  71.  
    println!("{}", schema_ref);
  72.  
     
  73.  
     
  74.  
    for ep in flight_info.endpoint {
  75.  
    if let Some(ticket) = ep.ticket {
  76.  
    let resp = client.do_get(ticket).await.expect("do_get");
  77.  
    let mut stream = resp.into_inner();
  78.  
    let mut dictionaries_by_id = HashMap::new();
  79.  
     
  80.  
     
  81.  
    let mut record_batches = Vec::new();
  82.  
    while let Some(Ok(flight_data)) = stream.next().await {
  83.  
    let message =
  84.  
    root_as_message(&flight_data.data_header[..]).expect("root as message");
  85.  
    match message.header_type() {
  86.  
    ipc::MessageHeader::Schema => {
  87.  
    println!("a schema when messages are read",);
  88.  
    }
  89.  
     
  90.  
     
  91.  
    ipc::MessageHeader::RecordBatch => {
  92.  
    let record_batch = flight_data_to_arrow_batch(
  93.  
    &flight_data,
  94.  
    schema_ref.clone(),
  95.  
    &dictionaries_by_id,
  96.  
    )
  97.  
    .expect("record_batch_from_message");
  98.  
    record_batches.push(record_batch);
  99.  
    }
  100.  
    ipc::MessageHeader::DictionaryBatch => {
  101.  
    let ipc_batch = message.header_as_dictionary_batch().unwrap();
  102.  
     
  103.  
     
  104.  
    reader::read_dictionary(
  105.  
    &Buffer::from(flight_data.data_body),
  106.  
    ipc_batch,
  107.  
    &schema_ref,
  108.  
    &mut dictionaries_by_id,
  109.  
    &message.version(),
  110.  
    )
  111.  
    .unwrap();
  112.  
    }
  113.  
    _ => {
  114.  
    panic!("Reading types other than record batches not yet supported");
  115.  
    }
  116.  
    }
  117.  
    }
  118.  
     
  119.  
     
  120.  
    println!(
  121.  
    "{}",
  122.  
    arrow::util::pretty::pretty_format_batches(&record_batches).expect("print")
  123.  
    );
  124.  
    }
  125.  
    }
  126.  
    }
  127.  
     
学新通

ODBC

目前仅支持x86_64架构的系统,Linux仅支持CentOS和RedHat系列发行版。

更多关于Arrow Flight SQL ODBC的内容,请查看Dremio文档(
https://docs.dremio.com/software/drivers/arrow-flight-sql-odbc-driver/)。

以下步骤基于CentOS 7。

安装ODBC管理器

在Linux下安装unixODBC

yum install unixODBC-devel

1. 安装arrow-flight-odbc驱动

  1.  
    wget https://download.dremio.com/arrow-flight-sql-odbc-driver/arrow-flight-sql-odbc-driver-LATEST.x86_64.rpm
  2.  
    yum localinstall arrow-flight-sql-odbc-driver-LATEST.x86_64.rpm

2. 修改配置文件修改位于/etc/odbc.ini的配置文件

  1.  
    [ODBC Data Sources]
  2.  
    CNOSDB=Arrow Flight SQL ODBC Driver
  3.  
     
  4.  
     
  5.  
    [CNOSDB]
  6.  
    Description=ODBC Driver DSN for Arrow Flight SQL developed by Dremio
  7.  
    Driver=Arrow Flight SQL ODBC Driver
  8.  
    Host=localhost
  9.  
    Port=31004
  10.  
    UID=root
  11.  
    PWD=
  12.  
    Database=public
  13.  
    Tenant=cnosdb
  14.  
    useEncryption=false
  15.  
    TrustedCerts=/opt/arrow-flight-sql-odbc-driver/lib64/cacerts.pem
  16.  
    UseSystemTrustStore=true
学新通

其中 UID是用户名,PWD是密码

测试是否连接

isql -v CNOSDB

如果出现如下内容,说明连接成功

  1.  
    ---------------------------------------
  2.  
    | Connected! |
  3.  
    | |
  4.  
    | sql-statement |
  5.  
    | help [tablename] |
  6.  
    | quit |
  7.  
    | |
  8.  
    ---------------------------------------
  9.  
    SQL>

下面进入代码测试

1. 编写cmak

  1.  
    cmake_minimum_required(VERSION 3.24)
  2.  
    project(arrow_flight_odbc C)
  3.  
     
  4.  
     
  5.  
    set(CMAKE_C_STANDARD 11)
  6.  
    find_package(ODBC)
  7.  
    include_directories(${ODBC_INCLUDE_DIR})
  8.  
    link_directories(/opt/arrow-flight-sql-odbc-driver/lib64)
  9.  
    add_executable(arrow_flight_odbc main.c)
  10.  
    target_link_libraries(arrow_flight_odbc ${ODBC_LIBRARY})

2. 编写c语言代码 main.c

  1.  
    #include <stdio.h>
  2.  
    #include <sql.h>
  3.  
    #include <sqlext.h>
  4.  
     
  5.  
     
  6.  
    int main() {
  7.  
    SQLHENV henv;
  8.  
    SQLHDBC hdbc;
  9.  
    SQLHSTMT hsmt;
  10.  
    SQLRETURN ret;
  11.  
     
  12.  
     
  13.  
     
  14.  
     
  15.  
    // 分配环境内存
  16.  
    ret = SQLAllocEnv(&henv);
  17.  
    if (ret != SQL_SUCCESS) {
  18.  
    fprintf(stderr, "Unable to allocate an environment handle");
  19.  
    return -1;
  20.  
    }
  21.  
    // 设置环境属性
  22.  
    ret = SQLSetEnvAttr(henv, SQL_ATTR_ODBC_VERSION, (void *) SQL_OV_ODBC3, 0);
  23.  
    if (ret != SQL_SUCCESS) {
  24.  
    fprintf(stderr, "Unable to set env attr");
  25.  
    return -1;
  26.  
    }
  27.  
    // 分配连接内存
  28.  
    ret = SQLAllocConnect(henv, &hdbc);
  29.  
    if (ret != SQL_SUCCESS) {
  30.  
    fprintf(stderr, "Unable to allocate connection");
  31.  
    }
  32.  
    //连接到driver
  33.  
    ret = SQLDriverConnect(hdbc, NULL, (SQLCHAR*) "DSN=CNOSDB;UID=root;PWD=", SQL_NTS,
  34.  
    NULL, 0, NULL, SQL_DRIVER_NOPROMPT);
  35.  
    if (ret != SQL_SUCCESS) {
  36.  
    fprintf(stderr, "connect fail");
  37.  
    }
  38.  
    // 分配语句空间
  39.  
    SQLAllocStmt(hdbc, &hsmt);
  40.  
     
  41.  
     
  42.  
    SQLCHAR *sql = "CREATE TABLE IF NOT EXISTS air (\n"
  43.  
    " visibility DOUBLE,\n"
  44.  
    " temperature DOUBLE,\n"
  45.  
    " pressure DOUBLE,\n"
  46.  
    " TAGS(station));";
  47.  
    // 执行 Create table
  48.  
    ret = SQLExecDirect(hsmt, sql, SQL_NTS);
  49.  
    if (ret != SQL_SUCCESS) {
  50.  
    fprintf(stderr, "Execute create fail");
  51.  
    }
  52.  
     
  53.  
     
  54.  
     
  55.  
     
  56.  
    sql = "INSERT INTO air (TIME, station, visibility, temperature, pressure) VALUES\n"
  57.  
    " (1666165200290401000, 'XiaoMaiDao', 56, 69, 77);";
  58.  
    // 执行 insert
  59.  
    ret = SQLExecDirect(hsmt, sql, SQL_NTS);
  60.  
    if (ret != SQL_SUCCESS) {
  61.  
    fprintf(stderr, "Execute insert fail");
  62.  
    }
  63.  
     
  64.  
     
  65.  
    sql = "SELECT * FROM air LIMIT 1";
  66.  
    //执行查询
  67.  
    ret = SQLExecDirect(hsmt, sql ,SQL_NTS);
  68.  
    if (ret != SQL_SUCCESS) {
  69.  
    fprintf(stderr, "Execute query fail");
  70.  
    }
  71.  
    SQL_TIMESTAMP_STRUCT time;
  72.  
    SQLCHAR station[50];
  73.  
    SQLDOUBLE visibility, temperature, pressure;
  74.  
    long time_len, station_len;
  75.  
     
  76.  
     
  77.  
    // 获取结果集
  78.  
    while (1) {
  79.  
    ret = SQLFetch(hsmt);
  80.  
    if (ret == SQL_ERROR || ret == SQL_SUCCESS_WITH_INFO) {
  81.  
    printf("error SQLFetch");
  82.  
    }
  83.  
    // 获取列的数据
  84.  
    if (ret == SQL_SUCCESS || ret == SQL_SUCCESS_WITH_INFO) {
  85.  
    SQLGetData(hsmt, 1, SQL_C_TIMESTAMP, &time, 0, NULL);
  86.  
    SQLGetData(hsmt, 2, SQL_C_CHAR, station, 50, &station_len);
  87.  
    SQLGetData(hsmt, 3, SQL_C_DOUBLE, &visibility, 0, NULL);
  88.  
    SQLGetData(hsmt, 4, SQL_C_DOUBLE, &temperature, 0, NULL);
  89.  
    SQLGetData(hsmt, 5, SQL_C_DOUBLE, &pressure, 0, NULL);
  90.  
    printf("%d-d-dTd:d:d, %s, %.2lf, %.2lf, %.2lf\n", time.year, time.month, time.day, time.hour, time.minute, time.second, station, visibility, temperature, pressure);
  91.  
    } else {
  92.  
    break;
  93.  
    }
  94.  
    }
  95.  
     
  96.  
     
  97.  
    return 0;
  98.  
    }
学新通

总结

CnosDB 2.0的原生Arrow架构,提供了基于Arrow Flight SQL 接口,通过使用Arrow Flight SQL,可以用多种语言连接CnosDB 2.0 时序数据库,高效地写入数据与查询数据,更能支持十亿行数据查询秒级响应。

关于更多的细节,可以在CnosDB2.0 使用手册|连接器
https://docs.cnosdb.com/zh/guide/reference/connector.html)中查看相关使用方法,如果有需求或者建议,也请在GitHub
https://github.com/cnosdb/cnosdb)上给我们提issue。

CnosDB简介

CnosDB是一款高性能、高易用性的开源分布式时序数据库,现已正式发布及全部开源。

欢迎关注我们的社区网站:https://www.cnosdb.com

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgaebbe
系列文章
更多 icon
同类精品
更多 icon
继续加载