• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Kafka 插件并创建 Kafka Producer 发送

武飞扬头像
feidodo网络
帮助1

相关说明

  • 启动测试前清空所有数据。
  • 每次测试先把所有数据写入 Kafka,再加载 Kafka 插件同步数据到 DolphinDB 中。目的是将同步数据的压力全部集中到 Kafka 插件。
  • 以 Kafka 插件从收到第一批数据到收到最后一批数据的时间差作为同步数据的总耗时。

测试流程

  • 加载 Kafka 插件并创建 Kafka Producer 发送数据到 Kafka 中(以发送 100 万条数据为例)

DolphinDB GUI 连接 DolphinDB 执行以下脚本,本例中插件的安装路径为 /DolphinDB/server/plugins/kafka,用户需要根据自己实际环境进行修改:

  1.  
    // 加载插件
  2.  
    try{
  3.  
    loadPlugin("/DolphinDB/server/plugins/kafka/PluginKafka.txt")
  4.  
    loadPlugin("/DolphinDB/server/plugins/kafka/PluginEncoderDecoder.txt")
  5.  
    } catch(ex){print(ex)}
  6.  
     
  7.  
    // 创建 Producer
  8.  
    producerCfg = dict(STRING, ANY);
  9.  
    producerCfg["metadata.broker.list"] = "192.193.168.5:8992";
  10.  
    producer = kafka::producer(producerCfg);
  11.  
    kafka::producerFlush(producer);
  12.  
     
  13.  
    //向kafka传100万数据
  14.  
    tbl = table("R5L1B3T1N03D01" as deviceId, "2022-02-22 13:55:47.377" as timestamps, "voltage" as deviceType , 1.5 as value )
  15.  
     
  16.  
     
  17.  
    // 创建 Consume
  18.  
    consumerCfg = dict(STRING, ANY)
  19.  
     
  20.  
    consumerCfg["group.id"] = "test10"
  21.  
    consumerCfg["metadata.broker.list"] = "192.193.168.5:8992";
  22.  
     
  23.  
    for(i in 1..1000000) {
  24.  
    aclMsg = select *, string(now()) as pluginSendTime from tbl;
  25.  
    for(i in aclMsg) {
  26.  
    kafka::produce(producer, "test3", "1", i, true);
  27.  
    }
  28.  
    }
  29.  
     
  30.  
    consumer = kafka::consumer(consumerCfg)
  31.  
    topics=["test10"];
  32.  
    kafka::subscribe(consumer, topics);
  33.  
     
  34.  
    for(i in 1..1000000) {
  35.  
    aclMsg = select *, string(now()) as pluginSendTime from tbl;
  36.  
    for(i in aclMsg) {
  37.  
    kafka::produce(producer, "test10", "1", i, true);
  38.  
    }
  39.  
    }
学新通
  • 订阅 Kafka 中数据进行消费
  1.  
    // 创建存储解析完数据的表
  2.  
    colDefName = ["deviceId","timestamps","deviceType","value", "pluginSendTime", "pluginReceived"]
  3.  
     
  4.  
    colDefType = [SYMBOL,TIMESTAMP,SYMBOL,DOUBLE,TIMESTAMP,TIMESTAMP]
  5.  
    dest = table(1:0, colDefName, colDefType);
  6.  
    share dest as `streamZd
  7.  
     
  8.  
    // 解析函数
  9.  
    def temporalHandle(mutable dictVar, mutable dest){
  10.  
    try{
  11.  
    t = dictVar
  12.  
    t.replaceColumn!(`timestamps, temporalParse(dictVar[`timestamps],"yyyy-MM-dd HH:mm:ss.SSS"))
  13.  
    t.replaceColumn!(`pluginSendTime, timestamp(dictVar[`pluginSendTime]))
  14.  
    t.update!(`received, now());
  15.  
    dest.append!(t);
  16.  
    }catch(ex){
  17.  
    print("kafka errors : " ex)
  18.  
    }
  19.  
    }
  20.  
     
  21.  
    // 创建 decoder
  22.  
    name = colDefName[0:5];
  23.  
    type = colDefType[0:5];
  24.  
    type[1] = STRING;
  25.  
    type[4] = STRING;
  26.  
    decoder = EncoderDecoder::jsonDecoder(name, type, temporalHandle{, dest}, 15, 100000, 0.5)
  27.  
     
  28.  
    // 创建subjob函数
  29.  
    kafka::createSubJob(consumer, , decoder, `DecoderKafka)
学新通

此时我们观察共享流表的数据量,当达到 100 万条时说明消费完成,测试结束。

5. Kerberos 认证

5.1 什么是 Kerberos ?

Kerberos 是一种基于加密 Ticket 的身份认证协议,支持双向认证且性能较高。主要有三个组成部分:Kdc, Client 和 Service。

生产环境的 Kafka 一般需要开启 Kerberos 认证,为 Kafka 提供权限管理,提高安全性。

学新通

5.2 前置条件

  • Java 8
  • kerberos:包括 Kdc 和 Client
  • keytab 证书

5.3 认证相关配置说明

环境相关配置说明

以下是 Kerberos 认证涉及的关键配置信息,具体配置文件的路径根据实际情况调整

  1. 安装 kdc
yum install -y krb5-server krb5-libs krb5-workstation krb5-devel krb5-auth-dialog

2. 配置 /etc/krb5.conf

  1.  
    [realms]
  2.  
    HADOOP.COM = {
  3.  
    kdc = cnserver9:88
  4.  
    admin_server = cnserver9:749
  5.  
    default_domain = HADOOP.COM
  6.  
    }

3. 配置 /var/kerberos/krb5kdc/kadm5.acl

*/admin@HADOOP.COM	*

4. 创建生成 kdc 数据库文件

sudo kdb5_util create -r HADOOP.COM –s

5. 启动 kerberos 服务

  1.  
    sudo systemctl start krb5kdc
  2.  
    sudo systemctl status krb5kdc

6. 安装 kerberos 客户端

yum install -y krb5-devel krb5-workstation krb5-client

7. 启动 kerberos 客户端

sudo kadmin.local -q "addprinc hadoop/admin"

DolphinDB Kafka Plugin 配置说明

  • 关键配置参数说明
    • security.protocol:指定通信协议
    • sasl.kerberos.service.name:指定 service 名称
    • sasl.mechanism:SASL 机制,包括 GSSAPI, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER
    • sasl.kerberos.keytab:keytab 文件的路径
    • sasl.kerberos.principal:指定 principal
  • 具体代码实现
  1.  
    // 加载插件
  2.  
    try{loadPlugin("/path/to/DolphinDBPlugin/kafka/bin/linux/PluginKafka.txt")} catch(ex){print(ex)}
  3.  
     
  4.  
    // 生产者配置
  5.  
    producerCfg = dict(STRING, ANY);
  6.  
    producerCfg["bootstrap.servers"] = "cnserver9:9992";
  7.  
    producerCfg["security.protocol"] = "SASL_PLAINTEXT";
  8.  
    producerCfg["sasl.kerberos.service.name"] = "kafka";
  9.  
    producerCfg["sasl.mechanism"] = "GSSAPI";
  10.  
    producerCfg["sasl.kerberos.keytab"] = "/home/test/hadoop.keytab";
  11.  
    producerCfg["sasl.kerberos.principal"] = "kafka/cnserver9@HADOOP.COM";
  12.  
    producer = kafka::producer(producerCfg);
  13.  
     
  14.  
    // 消费者配置
  15.  
    consumerCfg = dict(STRING, ANY)
  16.  
    consumerCfg["group.id"] = "test"
  17.  
    consumerCfg["bootstrap.servers"] = "cnserver9:9992";
  18.  
    consumerCfg["security.protocol"] = "SASL_PLAINTEXT";
  19.  
    consumerCfg["sasl.kerberos.service.name"] = "kafka";
  20.  
    consumerCfg["sasl.mechanism"] = "GSSAPI";
  21.  
    consumerCfg["sasl.kerberos.keytab"] = "/home/test/hadoop.keytab";
  22.  
    consumerCfg["sasl.kerberos.principal"] = "kafka/cnserver9@HADOOP.COM";
  23.  
    consumer = kafka::consumer(consumerCfg)
学新通
注意:适配 Kerberos 认证只需修改 Kafka 插件有关生产者和消费者的配置即可,其余脚本无需改动。

6. 其他说明

本教程展示了 DolphinDB Kafka Plugin 中常用的接口函数,完整的函数支持请参考官网文档:DolphinDB Kafka 插件官方教程

使用过程中如果遇到任何问题,欢迎大家在项目仓库反馈

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgaekcf
系列文章
更多 icon
同类精品
更多 icon
继续加载