• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Kafka部署安装和使用

武飞扬头像
jh035
帮助3

一、环境准备

1、jdk 8

2、zookeeper 

3、kafka

说明:在kafka较新版本中已经集成了zookeeper,所以不用单独安装zookeeper,只需要在kafka文件目录中启动zookeeper即可

二、下载地址

Apache Kafka

学新通

三、部署

1、启动zookeeper

-- 启动
./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

-- 查看是否启动成功
ps -ef | grep zoo

2、进入解压的kafka目录,修改/config/kafka-server的配置文件

vi config/server.properties
-- 重点配置节点说明

listeners=PLAINTEXT://localhost:9092   -- 将localhost修改为主机ip

log.dirs=/bigdata/kafka/logs-1  -- 默认不修改也可

学新通

 3、使用kafka-server-start.sh,启动kafka服务

./bin/kafka-server-start.sh config/server.properties

四、使用客户端kafka tools连接kafka

客户端下载地址:Offset Explorer

关于客户端如何使用可查看:kafka可视化客户端工具(Kafka Tool)的基本使用 - Frankdeng - 博客园

学新通

学新通

五、kafka实战简单使用

NuGet:Confluent.Kafka

 1、新建.Net Core控制台项目,代码如下:

学新通

static void Main(string[] args)
        {
            // 发送消息
            var producerConfig = new ProducerConfig
            {
                BootstrapServers = "192.168.140.131:9092",
                MessageTimeoutMs = 50000
            };
            var builder = new ProducerBuilder<string, string>(producerConfig);
            using (var producer = builder.Build())
            {
                var data = new { key = "1", value = "001" };
                var json = JsonConvert.SerializeObject(data);
                var dr = producer.ProduceAsync("order", new Message<string, string> { Key = "order", Value = json }).GetAwaiter().GetResult();
                Console.WriteLine($"发送事件{dr.Value}到{dr.TopicPartitionOffset}成功");
            }

            // 消费消息
            var consumerConfig = new ConsumerConfig {
                BootstrapServers = "192.168.140.131:9092",
                AutoOffsetReset=AutoOffsetReset.Earliest,
                GroupId="1111", // 自定义
                EnableAutoCommit=true
            };
            var consumerBuilder = new ConsumerBuilder<string, string>(consumerConfig);


            using (var consumer = consumerBuilder.Build())
            {
                // 1、订阅
                consumer.Subscribe("order");
                while (true)
                {
                    try
                    {
                        // 2、消费(自动确认)
                        var result = consumer.Consume();

                        // 3、业务逻辑
                        string key = result.Key;
                        string value = result.Value;

                        Console.WriteLine($"创建商品:Key:{key}");
                        Console.WriteLine($"创建商品:Order:{value}");
                        consumer.Commit(result);

                    }
                    catch (Exception e)
                    {
                        Console.WriteLine($"异常:Order:{e}");
                    }
                }
            }
        }
学新通

学新通

2、使用kafka客户端查看消息投递

学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhficjhj
系列文章
更多 icon
同类精品
更多 icon
继续加载