• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Spark:SparkStreaming Kafka数据源

武飞扬头像
小雏菊的成长
帮助1

不同版本的offset存储位置
1、0-8 ReceiverAPI offset默认存储在:Zookeeper中
2、0-8 DirectAPI offset默认存储在:CheckPoint

3、0-10 DirectAPI offset默认存储在:_consumer_offsets系统主题
手动维护:MySQL等有事务的存储系统

需求

通过SparkStreaming从Kafka读取数据,并将读取过来的数据做简单计算,最终打印到控制台。

依赖导入

创建maven工程,并导入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.lqs</groupId>
    <artifactId>SparkStreaming</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>

    </dependencies>
    
</project>
学新通

编写代码

package com.lqs.sparkstreaming

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * @Author lqs
 * @Date 2022年02月18日 11:21:05
 * @Version 1.0.0
 * @ClassName Qs04_directAuto
 * @Describe Kafka 0-10 Direct模式
 *           需求:
 *           通过SparkStreaming从Kafka读取数据,并将读取过来的数据做简单计算,最终打印到控制台。
 */
object Qs04_directAuto {
  def main(args: Array[String]): Unit = {
    //TODO 1、初始化SparkStreaming配置信息
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamin")

    //TODO 2、初始化SparkStreamingContext
    //Seconds(3):设置微批次流的间隔时间为3sec
    val streamingContext = new StreamingContext(conf, Seconds(3))

    /**
     * 定义kafka参数;
     * 1、kafka集群地址
     * 2、消费者组名称
     * 3、key序列化
     * 4、value序列化
     */
    val kafkaPara: Map[String, Object] = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "bdc112:9092,bdc113:9092,bdc114:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "lqsGroup",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
    )

    //读取kafka数据创建DStream
    val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      streamingContext,
      //优先位置
      LocationStrategies.PreferConsistent,
      //消费策略:订阅多个主题,配置参数
      ConsumerStrategies.Subscribe[String, String](Set("testTopic"), kafkaPara)
    )

    //将每条消息(k,v)的v取出来
    val valueDStream: DStream[String] = kafkaDStream.map(_.value())

    //计算WordCount
    valueDStream.flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_   _)
      .print()

    //TODO 3、启动SparkStreamingContext
    streamingContext.start()

    //TODO 4、将主线程阻塞,主线程不退出
    streamingContext.awaitTermination()
  }
}
学新通

运行测试

1、启动zookeeper和kafka集群
2、创建一个testTopic主题,并分为两个分区

kafka-topics.sh --bootstrap-server bdc112:9092 --create --replication-factor 2 --partitions 2 --topic testTopic

3、查看Topic列表

kafka-topics.sh --bootstrap-server bdc112:9092 -list

4、查看Topic详情

kafka-topics.sh --bootstrap-server bdc112:9092 --describe --topic testTopic

5、创建kafka生产者

kafka-console-producer.sh --broker-list bdc112:9092 --topic testTopic

6、创建kafka消费者

kafka-console-consumer.sh --bootstrap-server bdc112:9092 --from-beginning --topic testTopic

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgacabi
系列文章
更多 icon
同类精品
更多 icon
继续加载