Spark:SparkStreaming Kafka数据源
不同版本的offset存储位置
1、0-8 ReceiverAPI offset默认存储在:Zookeeper中
2、0-8 DirectAPI offset默认存储在:CheckPoint
3、0-10 DirectAPI offset默认存储在:_consumer_offsets系统主题
手动维护:MySQL等有事务的存储系统
需求
通过SparkStreaming从Kafka读取数据,并将读取过来的数据做简单计算,最终打印到控制台。
依赖导入
创建maven工程,并导入依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.lqs</groupId>
<artifactId>SparkStreaming</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
</project>
编写代码
package com.lqs.sparkstreaming
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* @Author lqs
* @Date 2022年02月18日 11:21:05
* @Version 1.0.0
* @ClassName Qs04_directAuto
* @Describe Kafka 0-10 Direct模式
* 需求:
* 通过SparkStreaming从Kafka读取数据,并将读取过来的数据做简单计算,最终打印到控制台。
*/
object Qs04_directAuto {
def main(args: Array[String]): Unit = {
//TODO 1、初始化SparkStreaming配置信息
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamin")
//TODO 2、初始化SparkStreamingContext
//Seconds(3):设置微批次流的间隔时间为3sec
val streamingContext = new StreamingContext(conf, Seconds(3))
/**
* 定义kafka参数;
* 1、kafka集群地址
* 2、消费者组名称
* 3、key序列化
* 4、value序列化
*/
val kafkaPara: Map[String, Object] = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "bdc112:9092,bdc113:9092,bdc114:9092",
ConsumerConfig.GROUP_ID_CONFIG -> "lqsGroup",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
)
//读取kafka数据创建DStream
val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
streamingContext,
//优先位置
LocationStrategies.PreferConsistent,
//消费策略:订阅多个主题,配置参数
ConsumerStrategies.Subscribe[String, String](Set("testTopic"), kafkaPara)
)
//将每条消息(k,v)的v取出来
val valueDStream: DStream[String] = kafkaDStream.map(_.value())
//计算WordCount
valueDStream.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ _)
.print()
//TODO 3、启动SparkStreamingContext
streamingContext.start()
//TODO 4、将主线程阻塞,主线程不退出
streamingContext.awaitTermination()
}
}
运行测试
1、启动zookeeper和kafka集群
2、创建一个testTopic主题,并分为两个分区
kafka-topics.sh --bootstrap-server bdc112:9092 --create --replication-factor 2 --partitions 2 --topic testTopic
3、查看Topic列表
kafka-topics.sh --bootstrap-server bdc112:9092 -list
4、查看Topic详情
kafka-topics.sh --bootstrap-server bdc112:9092 --describe --topic testTopic
5、创建kafka生产者
kafka-console-producer.sh --broker-list bdc112:9092 --topic testTopic
6、创建kafka消费者
kafka-console-consumer.sh --bootstrap-server bdc112:9092 --from-beginning --topic testTopic
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhgacabi
系列文章
更多
同类精品
更多
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
怎样阻止微信小程序自动打开
PHP中文网 06-13 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
photoshop蒙版画笔没反应怎么办
PHP中文网 06-24