• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

bigdata_kafka和streaming

武飞扬头像
JIE_ling8
帮助2

  一丶Kafka应用  

  鉴于kafka在实际使用时,绝大多数应用场景均为Producer和Consumer的API配合使用,故在此只介绍这两种API操作方法,其它的Connector和Streams还有admin可以视自身情况自行学习。

   1.java版

  • 实现步骤

    • 创建maven项目(done)

    • 加入kafka依赖

    • producer push message实现

    • consumer pull message实现

    • 效果测试

  • 加入依赖

  1.  
            
  2.  
     
  3.  
    <dependency>
  4.  
     
  5.  
            <groupId>org.apache.kafka</groupId>
  6.  
     
  7.  
            <artifactId>kafka-clients</artifactId>
  8.  
     
  9.  
            <version>2.0.0</version>
  10.  
     
  11.  
    </dependency>
  12.  
     
  13.  
    <!--kafka的日志组件依赖包 -->
  14.  
     
  15.  
    <dependency>
  16.  
     
  17.  
            <groupId>org.slf4j</groupId>
  18.  
     
  19.  
            <artifactId>slf4j-simple</artifactId>
  20.  
     
  21.  
            <version>1.7.25</version>
  22.  
     
  23.  
    </dependency>
学新通

具体代码:

producer push message

  1.  
    import java.util.Properties;
  2.  
    import org.apache.kafka.clients.producer.KafkaProducer;
  3.  
    import org.apache.kafka.clients.producer.ProducerConfig;
  4.  
    import org.apache.kafka.clients.producer.ProducerRecord;
  5.  
    importorg.apache.kafka.common.serialization.StringSerializer;
  6.  
    /**
  7.  
    * kafka测试工具类
  8.  
    * @author tianliang
  9.  
    */
  10.  
    public class KafkaProducerUtil {
  11.  
    // 生产者抽象对象
  12.  
    public KafkaProducer<String, String> producer;
  13.  
    // 传入brokerList,以hostname:port的方式,多个之间用,号隔开
  14.  
    public KafkaProducerUtil(String brokerList) {
  15.  
    Properties props = new Properties();
  16.  
    // 服务器ip:端口号,集群用逗号分隔
  17.  
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
  18.  
    // key序列化指定类
  19.  
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  20.  
    StringSerializer.class.getName());
  21.  
    // value序列化指定类
  22.  
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  23.  
    StringSerializer.class.getName());
  24.  
    // 生产者对象
  25.  
    producer = new KafkaProducer<String, String>(props);
  26.  
    }
  27.  
     
  28.  
    public void close(){
  29.  
    this.producer.close();
  30.  
    }
  31.  
    public static void main(String[] args) {
  32.  
    // 初始化broker列表
  33.  
    String brokerList = "cluster1.hadoop:6667,cluster0.hadoop:6667";
  34.  
    String topic="TestKafka";
  35.  
    // 初始化生产者工具类
  36.  
    KafkaProducerUtil kafkaProducerUtil = new KafkaProducerUtil(brokerList);
  37.  
    //test_topic发送hello, kafka
  38.  
    kafkaProducerUtil.producer.send(new ProducerRecord<String, String>(
  39.  
    topic, "hello,李英杰!"));
  40.  
    kafkaProducerUtil.close();
  41.  
     
  42.  
    System.out.println("done!");
  43.  
    }
  44.  
    }
学新通

comsumer push message实现

  1.  
    import java.util.Arrays;
  2.  
    import java.util.Properties;
  3.  
    import org.apache.kafka.clients.consumer.ConsumerConfig;
  4.  
    import org.apache.kafka.clients.consumer.ConsumerRecord;
  5.  
    import org.apache.kafka.clients.consumer.ConsumerRecords;
  6.  
    import org.apache.kafka.clients.consumer.KafkaConsumer;
  7.  
    importorg.apache.kafka.common.serialization.StringDeserializer;
  8.  
    /**
  9.  
    * Kafka消费者工具类
  10.  
    *
  11.  
    * @author tianliang
  12.  
    */
  13.  
    public class KafkaConsumerUtil {
  14.  
    // 消费者对象
  15.  
    public KafkaConsumer<String, String> kafkaConsumer;
  16.  
    public KafkaConsumerUtil(String brokerList, String topic) {
  17.  
    Properties props = new Properties();
  18.  
    // 服务器ip:端口号,集群用逗号分隔
  19.  
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
  20.  
    // 消费者指定组,名称可以随意,注意相同消费组中的消费者只能对同一个分区消费一次
  21.  
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "TestTL");
  22.  
    // 是否启用自动提交offset,默认true
  23.  
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
  24.  
    // 自动提交间隔时间1s
  25.  
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
  26.  
    // key反序列化指定类
  27.  
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
  28.  
    StringDeserializer.class.getName());
  29.  
    // value反序列化指定类,注意生产者与消费者要保持一致,否则解析出问题
  30.  
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
  31.  
    StringDeserializer.class.getName());
  32.  
    // 消费者对象
  33.  
    kafkaConsumer = new KafkaConsumer<>(props);
  34.  
    //订阅Topic
  35.  
    kafkaConsumer.subscribe(Arrays.asList(topic));
  36.  
    }
  37.  
    public void close() {
  38.  
    kafkaConsumer.close();
  39.  
    }
  40.  
    public static void main(String[] args) {
  41.  
    // 初始化broker列表
  42.  
    String brokerList = "cluster0.hadoop:6667,cluster1.hadoop:6667";
  43.  
    String topic = "TestKafka";
  44.  
    // 初始化消费者工具类
  45.  
    KafkaConsumerUtil kafkaConsumerUtil = new KafkaConsumerUtil(brokerList,
  46.  
    topic);
  47.  
     
  48.  
    boolean runnable=true;
  49.  
    while (runnable) {
  50.  
    ConsumerRecords<String, String> records = kafkaConsumerUtil.kafkaConsumer
  51.  
    .poll(100);
  52.  
    for (ConsumerRecord<String, String> record : records) {
  53.  
    System.out.printf("key = %s, offset = %d, value = %s", record.key(),record.offset(),
  54.  
    record.value());
  55.  
    System.out.println();
  56.  
    }
  57.  
    }
  58.  
     
  59.  
    kafkaConsumerUtil.close();
  60.  
    System.out.println("done!");
  61.  
    }
  62.  
    }
学新通

2.scala版

producer push message实现

  1.  
    import org.apache.kafka.clients.producer.KafkaProducer
  2.  
    import java.util.Properties
  3.  
    import org.apache.kafka.clients.producer.ProducerRecord
  4.  
    import org.apache.kafka.clients.producer.ProducerConfig
  5.  
    import org.apache.kafka.common.serialization.StringSerializer
  6.  
    /**
  7.  
    * scala实现kafka producer工具类
  8.  
    */
  9.  
    object KafkaProducerUtil {
  10.  
    //将生产者对象的获取封装到方法中
  11.  
    def getKafkaProducer(brokerList: String): KafkaProducer[String, String] = {
  12.  
    val properties = new Properties()
  13.  
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
  14.  
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName) //key的序列化;
  15.  
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName) //value的序列化;
  16.  
    var producer4Kafka = new KafkaProducer[String, String](properties)
  17.  
    return producer4Kafka
  18.  
    }
  19.  
    def main(args: Array[String]): Unit = {
  20.  
    //定义broker list,topic
  21.  
    val brokersList = "sc-slave7:6667,sc-slave8:6667"
  22.  
    val topic:String = "TestKafka_scala"
  23.  
    //获取生产者对象
  24.  
    var producer4Kafka = KafkaProducerUtil.getKafkaProducer(brokersList)
  25.  
    //发送实际的message
  26.  
    producer4Kafka.send(new ProducerRecord(topic,"hello,李英杰!"))
  27.  
    //发送完成后关闭链接
  28.  
    producer4Kafka.close;
  29.  
    println("done!")
  30.  
    }
  31.  
    }
学新通

consumer pull message实现

  1.  
    import org.apache.kafka.clients.producer.KafkaProducer
  2.  
    import java.util.Properties
  3.  
    import org.apache.kafka.clients.producer.ProducerRecord
  4.  
    import org.apache.kafka.clients.producer.ProducerConfig
  5.  
    import org.apache.kafka.common.serialization.StringSerializer
  6.  
    import org.apache.kafka.clients.consumer.KafkaConsumer
  7.  
    import java.util.Collections
  8.  
    import org.apache.kafka.clients.consumer.ConsumerConfig
  9.  
    import org.apache.kafka.common.serialization.StringDeserializer
  10.  
    /**
  11.  
    * scala实现kafka consumer工具类
  12.  
    */
  13.  
    object KafkaConsumerUtil {
  14.  
    //将消费者对象的获取封装到方法中,注意groupid是必选项,此为与java api不相同之处
  15.  
    def getKafkaConsumer(brokerList: String, topic: String, consumerGroupId: String): KafkaConsumer[String, String] = {
  16.  
    val properties = new Properties()
  17.  
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
  18.  
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName) //key的序列化;
  19.  
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName) //value的序列化;
  20.  
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId) //指定groupid
  21.  
    var consumer4Kafka = new KafkaConsumer[String, String](properties)
  22.  
    consumer4Kafka.subscribe(Collections.singletonList(topic))
  23.  
    return consumer4Kafka
  24.  
    }
  25.  
    def main(args: Array[String]): Unit = {
  26.  
    //指定broker list列表
  27.  
    val brokersList = "sc-slave7:6667"
  28.  
    //必须指定消费者组id
  29.  
    var consumerGroupId = "TestConsumerID"
  30.  
    val topic: String = "TestKafka_scala"
  31.  
    var consumer4Kafka = KafkaConsumerUtil.getKafkaConsumer(brokersList, topic, consumerGroupId)
  32.  
    //注意用标志位做循环判断
  33.  
    var runnable = true
  34.  
    while (runnable) {
  35.  
    //因为版本的原因,此处用iterator遍历,而不用for循环
  36.  
    val records = consumer4Kafka.poll(100)
  37.  
    var iter = records.iterator()
  38.  
    while (iter.hasNext()) {
  39.  
    val record = iter.next()
  40.  
    println(record.offset() "--" record.key() "--" record.value())
  41.  
    }
  42.  
    }
  43.  
    consumer4Kafka.close()
  44.  
    println("done!")
  45.  
    }
  46.  
    }
学新通

  二丶Kafka与Streaming

     两种方法

        1.基于Receiver方式

              先存于内存

  • 优点

    • 因为使用的kafka的高层API,用户在编码时更加可以专注数据本身,不需要关心offset等附加信息,而完全由zookeeper来管理,节省了工作量,减少了代码复杂度

    • 因为其简单性,当对数据处理要求不是极为严格时,一般建均建议采用这种方式。

  • 缺点

    • 如果这时候集群退出,而偏移量又没处理好的话,数据就丢掉了,存在程序失败丢失数据的可能,后在Spark 1.2时引入一个配置参数spark.streaming.receiver.writeAheadLog.enable以规避此风险,即通过先写日志的方式来解决,相当于存储了两次数据,降低了数据处理效率,同时增加了receiver负担。

    • 上边所述的方式,解决了数据丢失,但增加了数据重复消费的风险,比如程序计算完成并输出,但没有更新offset的情况,则会出现重复消费。

    • recevier也是executor的一部分,会占用相当一部分资源,降低了可用于streaming计算的资源,造成资源浪费。

    • receiver增加了数据消费链路的一个executor中转环节,该环节中的executor会和计算executor相一致才能保证系统稳定,而这两个环节之间是异步的,存在如网络异常、计算压力大的情况下,中转积压和消费缓慢的情况,导致系统崩溃。

         用的较少

           2.

  • 基于Direct直接读取的方式
    • 流程图

    • 具体流程

      • 实例化KafkaCluster,根据用户配置的Kafka参数,连接到Kafka集群

      • 通过Kafka API读取Topic中每个Partition最后一次读的Offset

      • 接收成功的数据,直接转换成KafkaRDD,供后续计算

    • 代码实现

      • 直接通过kafka consumer直接消费数据,形成一个Kafka的partition对应一个KafkaRDD的partition。

    • 实现逻辑

      • 使用Kafka Consumer直接消费其数据,不再需要Receiver作缓存。

    • 背景

      • Receiver方式存储数据存储浪费、效率低等问题,在Spark1.3之后推出了Direct方式。

                                         学新通

      • 优点

        • 存储效率更高: 不需要receiver中的防数据丢失的wal重复写一份了。

        • 简化并行设计: Kafka中的Partition和Spark中的Partition一个一个地对应,而Receiver并不对应,造成若干处理复杂,如流Join问题。

        • 降低内存使用量:之前的recevier也占用了内存,必然导致总内存申请量的提高。

        • 计算效率更高: 不需receiver后,降低了内存浪费,使更大比例内存用于实际的并行计算。

        • 当对数据处理效率、性能要求较高时,一般建议采用这种方式。

      • 问题点

        • offset在receiver时由zookeeper维护,而在direct时需要采用checkpoint或是第三方来存储维护,提高了用户开发成本。

        • 监控可视化:offset信息由zookeeper维护时,均可通过监控zk相关信息来监控消费情况,而direct的offset是自行维护,其消费监控因此也需要自行开发才行

2、基于Direct方式读取kafka的代码实现

  1.  
    package com.tl.job015.streamingwithkafka
  2.  
     
  3.  
    import org.apache.spark.SparkConf
  4.  
    import org.apache.spark.SparkContext
  5.  
    import org.apache.spark.streaming.dstream.DStream
  6.  
    import org.apache.spark.streaming.StreamingContext
  7.  
    import org.apache.spark.streaming.kafka010.LocationStrategies
  8.  
    import org.apache.spark.streaming.kafka010.ConsumerStrategies
  9.  
    import org.apache.spark.streaming.kafka010.KafkaUtils
  10.  
    import org.apache.kafka.clients.consumer.ConsumerConfig
  11.  
    import org.apache.kafka.common.serialization.StringDeserializer
  12.  
    import org.apache.spark.streaming.Seconds
  13.  
     
  14.  
    /**
  15.  
    * streaming集成kafka
  16.  
    */
  17.  
    object SparkStreamingReadKafka4Direct {
  18.  
    def main(args: Array[String]): Unit = {
  19.  
    /**
  20.  
    * 1、构建ssc
  21.  
    * 2、设置checkpoint
  22.  
    * 3、构造direct stream对象
  23.  
    * 4、针对DStream的算子操作
  24.  
    * 5、环境变量操作
  25.  
    */
  26.  
     
  27.  
    // 1、构造ssc对象
  28.  
    val parasArray = Array[String]("cluster0.hadoop:6667", "TestKafka4Job015", "consumer_job015", "200")
  29.  
    val Array(brokers, topics, groupId, maxPoll) = parasArray
  30.  
     
  31.  
    val sparkConf = new SparkConf().setAppName("KafkaDirect4Job011")
  32.  
    //可以代码设置运行模式,也可以在spark-submit当中设置
  33.  
    //sparkConf.setMaster(master)
  34.  
     
  35.  
    val sc = new SparkContext(sparkConf)
  36.  
    sc.setLogLevel("WARN")
  37.  
    val ssc = new StreamingContext(sc, Seconds(5))
  38.  
     
  39.  
    //2、设置offset的存储目录,此目录一般为hdfs目录
  40.  
    ssc.checkpoint("./kafka_direct")
  41.  
     
  42.  
    //3、构造direct stream对象
  43.  
    val topicsSet = topics.split(",").toSet
  44.  
    val kafkaParams = Map(
  45.  
    ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
  46.  
    ConsumerConfig.GROUP_ID_CONFIG -> groupId,
  47.  
    ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> maxPoll.toString,
  48.  
    ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
  49.  
    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])
  50.  
     
  51.  
    val messages = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,
  52.  
    ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
  53.  
     
  54.  
    //4、针对DStream的算子操作
  55.  
    val result: DStream[(String, Int)] = messages.map(_.value).flatMap(_.split("\\s ")).map((_, 1)).reduceByKey(_ _)
  56.  
    // result.print
  57.  
    result.foreachRDD(x => {
  58.  
    x.foreachPartition(part => {
  59.  
    part.foreach(print)
  60.  
    })
  61.  
    })
  62.  
     
  63.  
    //5、环境变量操作
  64.  
    ssc.start()
  65.  
    ssc.awaitTermination()
  66.  
    }
  67.  
    }
学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgchajb
系列文章
更多 icon
同类精品
更多 icon
继续加载