• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

flume+kafka+SparkStreaming实时日志+结果存储到MySQL

武飞扬头像
一个人的牛牛
帮助1

目录

一.说明

二.flume

三.kafka

四.MySQL

五.IDEA写程序

六.运行


一.说明

1.1使用工具:IDEA,spark-2.1.0-bin-hadoop2.7,kafka_2.11-2.3.1,zookeeper-3.4.5,apache-flume-1.9.0-bin,jdk1.8.0_171 

Scala版本:2.12.15

学新通

1.3一定要在数据库中建表!!!!!

二.flume

2.1在flume的conf下写flume-kafka.conf

  1.  
    a5.channels=c5
  2.  
    a5.sources=s5
  3.  
    a5.sinks=k5
  4.  
     
  5.  
    a5.sources.s5.type=spooldir
  6.  
    #/root/testdata/f-k是flume监控的文件夹
  7.  
    a5.sources.s5.spoolDir=/root/testdata/f-k
  8.  
    a5.sources.s5.interceptors=head_filter
  9.  
    #正则拦截器
  10.  
    a5.sources.s5.interceptors.head_filter.type=regex_filter
  11.  
    a5.sources.s5.interceptors.head_filter.regex=^event_id.*
  12.  
    a5.sources.s5.interceptors.head_filter.excludeEvents=true
  13.  
     
  14.  
    #用来关联kafka
  15.  
    a5.sinks.k5.type=org.apache.flume.sink.kafka.KafkaSink
  16.  
    #连接kafka,hadoop01是我的虚拟机主机名
  17.  
    a5.sinks.k5.kafka.bootstrap.servers=hadoop01:9092
  18.  
    #topic的主题名要一致fktest
  19.  
    a5.sinks.k5.kafka.topic=fktest
  20.  
     
  21.  
    a5.channels.c5.type=memory
  22.  
    a5.channels.c5.capacity=10000
  23.  
    a5.channels.c5.transactionCapacity=10000
  24.  
     
  25.  
    a5.sinks.k5.channel=c5
  26.  
    a5.sources.s5.channels=c5
学新通

 2.2建/root/testdata/f-k文件夹

2.3启动flume的flume-kafka.conf(在flume的目录下)

bin/flume-ng agent -f conf/flume-kafka.conf -n a5 -Dflume.root.logger=INFO,console

看到k5,c5,s5到成功启动了(有Successfully)就是正常

学新通

三.kafka

3.1开启kafka(在kafka的目录下)(一定要先开启zookeeper)

bin/kafka-server-start.sh -daemon config/server.properties

 3.2建topic在kafka的目录下

bin/kafka-topics.sh --create --zookeeper hadoop01:2181 --replication-factor 1 --partitions 1 --topic fktest

查看topic

bin/kafka-topics.sh --zookeeper hadoop01:2181 --list

3.3打开kafka的消费者(在kafka的目录下)(hadoop01是我的主机名)

bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic fktest --from-beginning

四.MySQL

4.1建表

学新通

五.IDEA写程序

5.1导入依赖(我的依赖)

  1.  
    <dependency>
  2.  
    <groupId>mysql</groupId>
  3.  
    <artifactId>mysql-connector-java</artifactId>
  4.  
    <version>5.1.6</version>
  5.  
    </dependency>
  6.  
    <dependency>
  7.  
    <groupId>org.apache.spark</groupId>
  8.  
    <artifactId>spark-core_2.12</artifactId>
  9.  
    <version>2.4.8</version>
  10.  
    </dependency>
  11.  
    <dependency>
  12.  
    <groupId>com.谷歌.guava</groupId>
  13.  
    <artifactId>guava</artifactId>
  14.  
    <version>14.0.1</version>
  15.  
    </dependency>
  16.  
    <dependency>
  17.  
    <groupId>org.apache.hadoop</groupId>
  18.  
    <artifactId>hadoop-client</artifactId>
  19.  
    <version>${hadoop.version}</version>
  20.  
    </dependency>
  21.  
    <dependency>
  22.  
    <groupId>org.apache.hadoop</groupId>
  23.  
    <artifactId>hadoop-client</artifactId>
  24.  
    <version>2.7.3</version>
  25.  
    </dependency>
  26.  
    <dependency>
  27.  
    <groupId>org.apache.hadoop</groupId>
  28.  
    <artifactId>hadoop-common</artifactId>
  29.  
    <version>2.7.3</version>
  30.  
    </dependency>
  31.  
    <dependency>
  32.  
    <groupId>org.apache.hadoop</groupId>
  33.  
    <artifactId>hadoop-hdfs</artifactId>
  34.  
    <version>2.7.3</version>
  35.  
    </dependency>
  36.  
    <dependency>
  37.  
    <groupId>org.apache.spark</groupId>
  38.  
    <artifactId>spark-core_${spark.artifact.version}</artifactId>
  39.  
    <version>${spark.version}</version>
  40.  
    </dependency>
  41.  
    <!-- 使用scala2.11.8进行编译和打包 -->
  42.  
    <dependency>
  43.  
    <groupId>org.scala-lang</groupId>
  44.  
    <artifactId>scala-library</artifactId>
  45.  
    <version>${scala.version}</version>
  46.  
    </dependency>
  47.  
    <dependency>
  48.  
    <groupId>org.apache.spark</groupId>
  49.  
    <artifactId>spark-sql_2.12</artifactId>
  50.  
    <version>2.4.8</version>
  51.  
    </dependency>
  52.  
    <dependency>
  53.  
    <groupId>org.apache.spark</groupId>
  54.  
    <artifactId>spark-streaming_2.12</artifactId>
  55.  
    <version>2.4.8</version>
  56.  
    </dependency>
  57.  
    <dependency>
  58.  
    <groupId>org.apache.spark</groupId>
  59.  
    <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
  60.  
    <version>2.4.8</version>
  61.  
    </dependency>
  62.  
    <dependency>
  63.  
    <groupId>org.apache.kafka</groupId>
  64.  
    <artifactId>kafka-clients</artifactId>
  65.  
    <version>0.11.0.0</version>
  66.  
    </dependency>
  67.  
    <dependency>
  68.  
    <groupId>org.apache.spark</groupId>
  69.  
    <artifactId>spark-core_2.12</artifactId>
  70.  
    <version>2.4.8</version>
  71.  
    </dependency>
  72.  
    <dependency>
  73.  
    <groupId>mysql</groupId>
  74.  
    <artifactId>mysql-connector-java</artifactId>
  75.  
    <version>5.1.6</version>
  76.  
    </dependency>
  77.  
    <dependency>
  78.  
    <groupId>org.apache.spark</groupId>
  79.  
    <artifactId>spark-hive_2.12</artifactId>
  80.  
    <version>2.4.8</version>
  81.  
    </dependency>
  82.  
    <dependency>
  83.  
    <groupId>org.apache.hive</groupId>
  84.  
    <artifactId>hive-jdbc</artifactId>
  85.  
    <version>1.2.1</version>
  86.  
    </dependency>
  87.  
    <dependency>
  88.  
    <groupId>org.apache.spark</groupId>
  89.  
    <artifactId>spark-streaming-flume_2.12</artifactId>
  90.  
    <version>2.4.8</version>
  91.  
     
  92.  
    <dependency>
  93.  
    <groupId>org.apache.flume.flume-ng-clients</groupId>
  94.  
    <artifactId>flume-ng-log4jappender</artifactId>
  95.  
    <version>1.9.0</version>
  96.  
    </dependency>
学新通

5.2代码:

  1.  
    import java.sql.{Connection, DriverManager, PreparedStatement}
  2.  
    import org.apache.kafka.common.serialization.StringDeserializer
  3.  
    import org.apache.spark.SparkConf
  4.  
    import org.apache.spark.streaming.dstream.DStream
  5.  
    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
  6.  
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
  7.  
    import org.apache.spark.streaming.kafka010._
  8.  
    import org.apache.spark.streaming.{Seconds, StreamingContext}
  9.  
     
  10.  
    //写人MySQL表的表名和列名,string,int是数据格式
  11.  
    case class phone(
  12.  
    name: String,
  13.  
    count:Int
  14.  
    )
  15.  
     
  16.  
    object KafkaDemo {
  17.  
    def main(args: Array[String]): Unit = {
  18.  
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("KafkaTest")
  19.  
    val streamingContext = new StreamingContext(sparkConf, Seconds(20))
  20.  
    //20秒更新一次结果
  21.  
     
  22.  
    //连接kafka导入数据,hadoop01是我的虚拟机主机名
  23.  
    val kafkaParams = Map[String, Object](
  24.  
    "bootstrap.servers" -> "hadoop01:9092",从broker消费数据
  25.  
    "key.deserializer" -> classOf[StringDeserializer],//发序列化的参数,因为写入kafka的数据经过序列化
  26.  
    "value.deserializer" -> classOf[StringDeserializer],//发序列化的参数,因为写入kafka的数据经过序列化
  27.  
    "group.id" -> "use_a_separate_group_id_for_each_stream",//指定group.id
  28.  
    "auto.offset.reset" -> "latest",//指定消费的offset从哪里开始:① earliest:从头开始 ;② latest从消费者启动之后开始
  29.  
    "enable.auto.commit" -> (false: java.lang.Boolean) //是否自动提交偏移量 offset 。默认值就是true【5秒钟更新一次】, "false" 不让kafka自动维护偏移量 手动维护偏移
  30.  
    )
  31.  
     
  32.  
     
  33.  
    //kafka的topic
  34.  
    val topics = Array("fktest", "test")
  35.  
     
  36.  
    //订阅主题
  37.  
    val stream = KafkaUtils.createDirectStream[String, String](
  38.  
    streamingContext,
  39.  
    PreferConsistent,
  40.  
    Subscribe[String, String](topics, kafkaParams)
  41.  
    )
  42.  
     
  43.  
    //转换格式
  44.  
    val mapDStream: DStream[(String, String)] = stream.map(record => (record.key, record.value))
  45.  
    //分析处理出想要的数据
  46.  
    val resultRDD: DStream[(String, Int)] = mapDStream.map(lines=>{
  47.  
    val data = lines._2.split("_")
  48.  
    (data(1),1)}).reduceByKey(_ _)
  49.  
     
  50.  
     
  51.  
    //将DStream中的数据存储到mysql数据库中
  52.  
    resultRDD.foreachRDD(
  53.  
    rdd=>{
  54.  
    val url = "jdbc:mysql://192.168.17.128:3306/hive?useUnicode=true&characterEncoding=UTF-8"//192.168.17.140是我的主机IP地址,可以用localhost。hive是我的数据库名
  55.  
    val user = "root"//MySQL用户名
  56.  
    val password = "1234567"//MySQL密码
  57.  
    Class.forName("com.mysql.jdbc.Driver").newInstance()//驱动
  58.  
    rdd.foreach(
  59.  
    data=>{
  60.  
    var conn: Connection = DriverManager.getConnection(url,user,password)
  61.  
    val sql = "insert into phone(name,count) values(?,?)"
  62.  
    //第一个phone是表名,(name,count)是列名
  63.  
    var stmt : PreparedStatement = conn.prepareStatement(sql)
  64.  
    stmt.setString(1,data._1.toString)
  65.  
    stmt.setString(2,data._2.toString)
  66.  
    stmt.executeUpdate()
  67.  
    conn.close()
  68.  
    }
  69.  
    )
  70.  
    }
  71.  
     
  72.  
    )
  73.  
     
  74.  
    //输出结果到控制台
  75.  
    resultRDD.print()
  76.  
    // 启动
  77.  
    streamingContext.start()
  78.  
    // 等待计算结束
  79.  
    streamingContext.awaitTermination()
  80.  
    }
  81.  
    }
学新通

4.3运行程序

六.运行

6.1在/root/testdata/f-k文件夹里面添加数据,直接拖入xxx.log。

flume处理后是这样的。

学新通

6.2拖入后kafka消费者会显示内容。

6.3没有开消费者的话可以到kafka设置的日志文件夹下查看。

学新通

 6.4MySQL的显示

学新通

谢谢!!!!

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgachka
系列文章
更多 icon
同类精品
更多 icon
继续加载