• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

KafkaStream基本使用

武飞扬头像
Success___
帮助1

简介:

        kafkaStream:提供了对存储在kafka中的数据进行流式处理和分析的功能

特点:

        KafkasSream提供了一个非常简单轻量的Library,它可以非常方便的嵌入到java程序中,也可以任何方式打包部署

入门案例:

  1、新建工程kafka-demo

           引入kafkaStream依赖

  1.  
    <dependencies>
  2.  
    <dependency>
  3.  
    <groupId>org.springframework.boot</groupId>
  4.  
    <artifactId>spring-boot-starter-web</artifactId>
  5.  
    </dependency>
  6.  
    <!-- kafkfa -->
  7.  
    <dependency>
  8.  
    <groupId>org.springframework.kafka</groupId>
  9.  
    <artifactId>spring-kafka</artifactId>
  10.  
    <exclusions>
  11.  
    <exclusion>
  12.  
    <groupId>org.apache.kafka</groupId>
  13.  
    <artifactId>kafka-clients</artifactId>
  14.  
    </exclusion>
  15.  
    </exclusions>
  16.  
    </dependency>
  17.  
    <dependency>
  18.  
    <groupId>org.apache.kafka</groupId>
  19.  
    <artifactId>kafka-clients</artifactId>
  20.  
    </dependency>
  21.  
    <dependency>
  22.  
    <groupId>com.alibaba</groupId>
  23.  
    <artifactId>fastjson</artifactId>
  24.  
    </dependency>
  25.  
     
  26.  
    <!--kafkaStream-->
  27.  
    <dependency>
  28.  
    <groupId>org.apache.kafka</groupId>
  29.  
    <artifactId>kafka-streams</artifactId>
  30.  
    <exclusions>
  31.  
    <exclusion>
  32.  
    <artifactId>connect-json</artifactId>
  33.  
    <groupId>org.apache.kafka</groupId>
  34.  
    </exclusion>
  35.  
    <exclusion>
  36.  
    <groupId>org.apache.kafka</groupId>
  37.  
    <artifactId>kafka-clients</artifactId>
  38.  
    </exclusion>
  39.  
    </exclusions>
  40.  
    </dependency>
  41.  
    </dependencies>
学新通

   2、新建流式处理类

          代码如下

  1.  
    package com.heima.kafkademo.sample;
  2.  
     
  3.  
    import org.apache.kafka.common.serialization.Serdes;
  4.  
    import org.apache.kafka.streams.KafkaStreams;
  5.  
    import org.apache.kafka.streams.KeyValue;
  6.  
    import org.apache.kafka.streams.StreamsBuilder;
  7.  
    import org.apache.kafka.streams.StreamsConfig;
  8.  
    import org.apache.kafka.streams.kstream.KStream;
  9.  
    import org.apache.kafka.streams.kstream.TimeWindows;
  10.  
    import org.apache.kafka.streams.kstream.ValueMapper;
  11.  
     
  12.  
    import java.time.Duration;
  13.  
    import java.util.Arrays;
  14.  
    import java.util.Properties;
  15.  
     
  16.  
    /*
  17.  
    * 流式处理
  18.  
    * */
  19.  
    public class KafkaStreamQuickStart {
  20.  
    public static void main(String[] args) {
  21.  
    /*创建kafka配置中心并配置参数*/
  22.  
    Properties prop = new Properties();
  23.  
    //连接地址
  24.  
    prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
  25.  
    //key序列化
  26.  
    prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  27.  
    //value序列化
  28.  
    prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  29.  
    //创建id名称
  30.  
    prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart");
  31.  
     
  32.  
    //stream构造器
  33.  
    StreamsBuilder streamsBuilder = new StreamsBuilder();
  34.  
     
  35.  
    //流式计算
  36.  
    streamProcessor(streamsBuilder);
  37.  
     
  38.  
    //创建KafkaStream对象
  39.  
    KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),prop);
  40.  
     
  41.  
    //开启流式计算
  42.  
    kafkaStreams.start();
  43.  
    }
  44.  
     
  45.  
    //流式计算方法
  46.  
    private static void streamProcessor(StreamsBuilder streamsBuilder) {
  47.  
    //创建kafka对象,同时指定从哪个topic获取消息
  48.  
    KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
  49.  
    //处理消息的value
  50.  
    stream.flatMapValues(new ValueMapper<String, Iterable<?>>() {
  51.  
    @Override
  52.  
    public Iterable<String> apply(String value) {
  53.  
    return Arrays.asList(value.split(" "));
  54.  
    }
  55.  
    }) //按照value进行聚合
  56.  
    .groupBy((key,value)->value)
  57.  
    //时间窗口,每隔10秒更新一次
  58.  
    .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
  59.  
    //统计单词个数
  60.  
    .count()
  61.  
    //转换为kStream
  62.  
    .toStream()
  63.  
    .map((key,value)->{
  64.  
    System.out.println("key:" key ",vlaue:" value);
  65.  
    return new KeyValue<>(key.key().toString(),value.toString());
  66.  
    })
  67.  
    //发送消息
  68.  
    .to("itcast-topic-out");
  69.  
    }
  70.  
    }
学新通

3、启动消费者类和流式处理类监听消息

        使用生产者类发送消息

4、测试

        成功接收到消息

学新通 

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgfeaga
系列文章
更多 icon
同类精品
更多 icon
继续加载