• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

全国大学生大数据技能竞赛——Scala和Echart的大数据和挖掘

武飞扬头像
-雾蓝-
帮助1

题目三:大数据分析与挖掘案例

搜狗实验室提供【用户查询日志(SogouQ)】数据
数据:本项目是对日志数据;
要求:搜索关键字进行统计,可以作为热点词汇来帮助用户快速查询相关信息;并且针对用户点击数进行统计分析,可以作为搜索引擎搜索效果评价指标;对搜索时间段进行统计分析,获知高频时段访问等运营商可以做好应对方案,防止出现故障情况发生。并对数据进行相关可视化处理;不提供任何预置代码,按题目要求进行代码编写,
考核点:包括数据读取,数据切割,数据过滤,数据解析,数据缓存,分组聚合,数据可视化。
本题需要完成题目一才可操作;建议使用Eclipse开发工具,使用Spark(Scala)技术编写程序业务逻辑,使用ECharts5进行数据可视化

参考文章
大数据云计算——Spark实战(sogou日志分析)

package search
 
import java.util
 
import com.hankcs.hanlp.HanLP
import com.hankcs.hanlp.seg.common.Term
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
 
 
/**
 *  用户查询日志(SogouQ)分析,数据来源Sogou搜索引擎部分网页查询需求及用户点击情况的网页查询日志数据集合。
 * 1.搜索关键词统计,使用HanLP中文分词
 * 2.用户搜索次数统计
 * 3.搜索时间段统计*数据格式:访问时间\t用户ID\t[查询词]\t i该URL在返回结果中的排名\t用户点击的顺序号\t用户点击的URL
 * 其中,用户ID是根据用户使用浏览器访问搜索引擎时的Cookie信息自动赋值,即同一次使用浏览器输入的不同查询对应同一个用户ID
 */
 
object SougouQueryAnalysis {
  def main(args: Array[String]): Unit = {
    //TODO 构建一个spark的对象
    //构建Spark Application 应用的入口实例
    val sc: SparkContext = {
      val sparkConf: SparkConf = new SparkConf()
        .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
        .setMaster("local[2]")
      SparkContext.getOrCreate(sparkConf)
    }
    //TODO:1 加载搜狗的数据的集合使用小数据集合
    val inputpath = "E:\\GItHub_project\\Big_Data\\Spark\\Sparkday02_2.11\\src\\main\\resources\\SogouQ.sample"
    val inputpath2 = "E:\\GItHub_project\\Big_Data\\Spark\\Sparkday02_2.11\\src\\main\\resources\\SogouQ.reduced"
    val sougouRDD = sc.textFile(inputpath2, minPartitions = 2)
    //显示数据条是否正确
    print(s"count=${sougouRDD.count()}")
    println(sougouRDD.first())
 
 
    //TODO 2:数据的ETL操作的
    val etlRDD = sougouRDD
      .filter(line => null != line && line.trim.split("\\s ").length == 6)
      .mapPartitions { iter =>
        iter.map { line =>
          val array = line.trim.split("\\s ")
          //构建一个对象
          SougouRecord(
            array(0), array(1),
            array(2).replace("\\[\\]", ""),
            array(3).toInt, array(4).toInt,
            array(5)
          )
        }
      }
    println(etlRDD.first())
    //由于数据使用多次 需要缓存数据
    etlRDD.persist(StorageLevel.MEMORY_AND_DISK)
 
    //TODO:搜索关键次统计
    val resultRDD = etlRDD
      .filter(recode => null != recode.queryWords && recode.queryWords.trim.length > 0)
      .flatMap { record =>
        //360安全卫士
        val words = record.queryWords.trim
        //使用的HanLP分词进行中文分词 360  安全  卫士
        val terms: util.List[Term] = HanLP.segment(words)
        //将java中的list转化为的scala中的list
        import scala.collection.JavaConverters._
        //封装到二元组的中的表示每一个搜索单词的出现的一次
        val result = terms.asScala.map {
          term => (term.word, 1)
        }
        //返回的结果
        result
      }
      //分组聚合
      .reduceByKey((tmp, item) => tmp   item)
    //查找次数最多的10个单词
    resultRDD
      .sortBy(tuple => tuple._2, ascending = false)
      .take(10)
      .foreach(println)
 
    //TODO:用户搜索及统计
    /**
     * 分组的字段 先按照用户分组 在按照的搜索四分词分组
     */
 
    val perUserQueryWordsCountRDD: RDD[((String, String), Int)] = etlRDD
      .mapPartitions { iter =>
        iter.map { record =>
          val userID = record.userId
          val querywords = record.queryWords
          //组合用户的ID和queryword为key
          ((userID, querywords), 1)
        }
      }
      //分组聚合
      .reduceByKey((tmp, item) => tmp   item)
    //TODO :获取搜索的点击的次数的最大值和最小中 和平均值
    val restRDD: RDD[Int] = perUserQueryWordsCountRDD.map(tuple => tuple._2)
    println(s"max click count=${restRDD.max()}")
    println(s"min click count=${restRDD.min()}")
    println(s"avg click count=${restRDD.mean()}")
 
    //TODO:搜索的时间的统计
    etlRDD
      .map { record =>
        //获取小时
        val hourStr: String = record.queryTime.substring(0, 2)
        //返回二元组
        (hourStr, 1)
      }
      .reduceByKey((tmp, item) => tmp   item)
      .top(num = 24)(Ordering.by(tuple => tuple._2))
      .foreach(println)
 
    //释放资源资源
    etlRDD.unpersist()
 
    Thread.sleep(100000000)
 
    //TODO 关闭的对象
    sc.stop()
  }
}
学新通

Spark基站统计分析

基站即公用移动通信基站,是移动设备接入互联网的接口设备,用来保证我们在移动的过程中手机可以随时随地保持着有信号,可以保证通话以及收发信息等需求。
现有基站数据字段:
☞用户使用的手机号码;
☞用户进入基站的时间;
☞离开基站的时间;
☞基站的ID;
☞进入和离开基站的标识。
需求:要求使用Sparkcore对数据进行分割处理获取业务需求所使用到的数据,根据不同用户在基站停留的时长数据统计出用户与停留地点的关系。
思路:
想要获取用户家,公司位置,可以通过用户在某一地点的时间长短进行判断(关联基站);
两个位置有多个基站;
计算每个基站停留时间;
找出停留时间最长的2个;
获取基站位置,基站位置由经度纬度构成;
根据排序,前两个常去的位置则可能是家和公司。
学习相关知识点
添加基站数据,创建spark项目;
配置spark基站统计分析项目的依赖;
配置spark基站统计项目的创建工作;
完成Spark基站统计项目的代码编写;
输出最终的统计分析结果,求出用户家和公司的位置。
任务1:保存基站数据
1.点击终端,执行如下命令制作基站经纬度数据:
进入目录:cd /root
编辑文件:vim loc_info

2.写数据需要切换输入模式(键入a或者i)

3.复制如下数据,通过平台上的复制粘贴工具从平台粘贴至环境内。
9F36407EAD0629FC166F14DDE7970F68,116.304864,40.050645,6
CC0710CC94ECC657A8561DE549D940E0,116.303955,40.041935,6
16030401EAFB68F1E3CDF819735E1C66,116.296302,40.032296,6
4.粘贴完毕数据 按ESC键 之后输入:wq! 进行保存(右键粘贴数据)

5.同样方式加入用户数据(注意/root目录下)
编辑文件:vim text
18688888888,20160327082400,16030401EAFB68F1E3CDF819735E1C66,1
18611132889,20160327082500,16030401EAFB68F1E3CDF819735E1C66,1
18688888888,20160327170000,16030401EAFB68F1E3CDF819735E1C66,0
18611132889,20160327180000,16030401EAFB68F1E3CDF819735E1C66,0
18611132889,20160327075000,9F36407EAD0629FC166F14DDE7970F68,1
18688888888,20160327075100,9F36407EAD0629FC166F14DDE7970F68,1
18611132889,20160327081000,9F36407EAD0629FC166F14DDE7970F68,0
18688888888,20160327081300,9F36407EAD0629FC166F14DDE7970F68,0
18688888888,20160327175000,9F36407EAD0629FC166F14DDE7970F68,1
18611132889,20160327182000,9F36407EAD0629FC166F14DDE7970F68,1
18688888888,20160327220000,9F36407EAD0629FC166F14DDE7970F68,0
18611132889,20160327230000,9F36407EAD0629FC166F14DDE7970F68,0
18611132889,20160327081100,CC0710CC94ECC657A8561DE549D940E0,1
18688888888,20160327081200,CC0710CC94ECC657A8561DE549D940E0,1
18688888888,20160327081900,CC0710CC94ECC657A8561DE549D940E0,0
18611132889,20160327082000,CC0710CC94ECC657A8561DE549D940E0,0
18688888888,20160327171000,CC0710CC94ECC657A8561DE549D940E0,1
18688888888,20160327171600,CC0710CC94ECC657A8561DE549D940E0,0
18611132889,20160327180500,CC0710CC94ECC657A8561DE549D940E0,1
18611132889,20160327181500,CC0710CC94ECC657A8561DE549D940E0,0

任务2:创建spark基站统计项目,添加依赖
开启eclipse,在空白区域,右键New–Scala Project

输入“sparktest”项目名称后,选择Scala版本。右键【Scala library】,找到对应的“properties“
弹出的对话框中,选择如图的2.11.11版本,然后单击“apply and close“

右键项目sparktest,找到【build path】 > 【add external archives】

选择Spark的jar包,通过左侧的“other location”找到jar包存放的路径,全选后单击右下角的【OK】
s
任务3:添加项目包名,创建对象文件
1.在src下新建com.cn.sparkcount包,包下新建Sparktext对象。

2.输入包名:com.cn.sparkcount

3.创建对象,如下右键新建Scala Object

4.输入文件名:com.cn.sparkcount.sparkproject

5.写入main方法,在Sparkproject对象里面写入main方法输入,main,然后快捷键alt /补全代码。

任务4:编写spark基站统计项目逻辑代码
1.读取text数据,输出数据(电话,时间,基站)
package com.cn.sparkcount

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object sparkproject {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(“project”)
val sc = new SparkContext(conf)

//获取数据需要textFile这方法
val rdd1=sc.textFile("/root/text").map(t=>{
    val fields = t.split(",")
    val phone = fields(0)    
    val time = fields(1)
    val lacId = fields(2)
    val status = fields(3)

    //指定输出的格式 电话,时间,基站(状态省略)
    (phone,time,lacId)
  })

  // 转换数据,打印
  println(rdd1.collect().toBuffer) 

}
}
2.运行程序,首先进行本地模式设置
(1)设置模式为本地模式

(2)选择Arguments。

(3)输入 -Dspark.master=local

(4)运行代码

5.最后运行结果:

3.用户在基站下的停留时长(键值对形式)
package com.cn.sparkcount

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object sparkproject {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(“project”)
val sc = new SparkContext(conf)

//获取数据需要textFile这方法

val rdd1=sc.textFile("/root/text").map(t=>{
    val fields = t.split(",")
    val phone = fields(0)    
    val time = fields(1)
    val lacId = fields(2)
    val status = fields(3)
    
    //使用reduceByKey(_ _)是把数据进行相加 如果数据有一个是负数,那么可以理解为相减
    val timeLong = if(status=="1") -time.toLong else time.toLong

    //指定输出的格式 每个用户在基站下的停留时间
    //(key,vaule)
    ((phone,lacId),timeLong)  
  })

  val rdd2 = rdd1.reduceByKey(_ _)
  // 转换数据,打印
  println(rdd2.collect().toBuffer)
学新通

}
}

任务5:输出spark基站统计项目最终输出结果
1.合并数据,进行基站位置输出
package com.cn.sparkcount

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object sparkproject {
def main(args: Array[String]): Unit = {
//创建可以获取配置信息的对象,并且给与当前这个作业一个名字
//导包快捷键 ctrl shift o
val conf = new SparkConf().setAppName(“project”)

//创建可以操作spark程序的对象(操作spark对象,需要配置信息)
//导包快捷键  ctrl shift o
val sc = new SparkContext(conf)

//获取数据需要textFile这方法

val rdd1=sc.textFile("/root/text").map(t=>{
    val fields = t.split(",")
    val phone = fields(0)
    val time = fields(1)
    val lacId = fields(2)
    val status = fields(3)

    //使用reduceByKey(_ _)是把数据进行相加 如果数据有一个是负数,那么可以理解为相减
    val timeLong = if(status=="1") -time.toLong else time.toLong

    //指定输出的格式 思路:根据用户电话和基站 求出每个用户在不同基站的停留时间
    ((phone,lacId),timeLong)
  })

  //如果要把两个数据放到一起输出 也就是说,之前每个用户停留时间已经算出,加上基站的位置,他们的key必须相同
val rdd2=rdd1.reduceByKey(_ _).map(t=>{
    (t._1._2,(t._1._1,t._2))
  })

val rdd3=sc.textFile("/root/loc_info").map(t=>{
   	val lcainfo=t.split(",")
    //获取的数据为 基站的ID 和经度纬度,2个数据合并需要key相同
    (lcainfo(0),(lcainfo(1),lcainfo(2)))
  })

//使用join数据放在一起 算出每个用户在每个基站的停留时间 
//(基站,((电话,时间),(经度,纬度)))
val joined=rdd2.join(rdd3).map(t=>{
  	val lacId=t._1
    val phone=t._2._1._1
    val time=t._2._1._2
    val x=t._2._2._1
    val y=t._2._2._2
    (phone,lacId,time,x,y)
  })

//按照用户分组(_._1)在进行排序 取前二
val rdd5=joined.groupBy(_._1).mapValues(it=>{
    it.toList.sortBy(_._3).reverse.take(2)
  })

rdd5.saveAsTextFile("/root/out1")

//如果想要程序真正的运行,需要一种特定的方法,例如collect(),数据想要被我们看到需要toBuffer()方法

println(rdd5.collect().toBuffer)
学新通

}
}

  1. 最后运行结果

Scala语法基础

Scnla 是一种基于JVM(Java 虚拟机)的跨平台编程语言,Scala 编译器可将 Scala源码编译成符合JVM 虚拟机规范的中间字节码文件,在JVM 平台上解释和运行:Scala 语言是对 Java 语言的一种补充和扩展,其API可无缝兼容Java的API,可认为Java的API是其 Scala API的子集,Scala是完全并彻底而向对象的一种编程语言。编程模式提供面向过程化和面向对象化两种编码设计模式;运行模式则提供编译和解释两种操作模式。
13.1.1变量、常量与赋值
1.声明变量
变量的值可以被重新指定。类型可依赖于值自动推断。
var VariableName[:DataType] [=Initial Value]
示例如下:
var ab:Int=30
ab:Int=30
ab=50 提示:此处ab变量被重新赋值
ab:Int=50
var ab=30
ab:Int=30提示:此处变量ab的类型被自动推断为Int类型
2.声明常量
常量的值不可以被重新指定。因此在声明常量时必须指定值,类型也可依赖于值自动
推断。
val VariableName[:DataType]=Initial Value
示例如下:
val ab:Int=30
ab:Int=30
ab=50 提示:此处常量ab不能被重新赋值,如果重新赋值将发生以下错误
<console>:12: error: reassignment to val
ab=50
^
val ab=30
ab:Int=30提示:此处常量ab的类型被自动推断为Int类型
13.1.2运算符与表达式。
Scala中常用基础运算符如下:
·算术运算符:十、一、*、/、%。
·关系运算符:>、<、=、!=、>=、<=。
·逻辑运算符:&&、l、!。
·值变运算符: 、–。
·成员运算符:·、[]。
·条件运算符:?:。
·赋值运算符:=。
将一系列的操作数通过运算符连接起来即变成表达式。最简单的表达式是一个常量或
变量,表达式间的运算存在逻辑先后顺序。当不明确表达式之间的具体先后顺序时,可使
用小括号将需优先计算的表达式括起来。
运算符与表达式示例如下:
var a:Int=(3 2-5)*0/5
a:Int=0
提示:上述表达式中会先计算括号内的3 2-5部分然后再计算后面的乘除部分,因此
最终得到的计算结果是0.
13.1.3 条件分支控制
if条件分支语法(else分支是可选的分支):
if(条件表达式){
…………………………条件表达式返回true时执行…………………………
}[else{
…………………………条件表达式返回false时执行…………………………
}]
示例如下(比较两个数的大小):
var a=100;
a:Int=100
var b=200;
b:Int=200
if(a>b){
println(a);
}else{
Println(b)
}
200
提示:a的值小于b将导致if中的条件不成立,所以最终打印出b的值。
13.1.4循环流程控制
1.for循环控制语句
for(迭代变量<-初始值to终点值){
…………………………循环体语句…………………………
}
示例如下(累加1到100的和):
2.while循环控制语句
while(循环条件表达式){
…………………………循环体语句…………………………
}
13.1.5 Scala数据类型
1.Scala基本数据类型

2.集合数据类型

数据读取

1.读取文件数据

在Scala语言的 Source单例对象中 中, 提供了一些非常便捷的方法, 从而使开发者可以快速的从指定数据源(文本文件, URL地址等)中获取数据, 在使用 Source单例对象 之前, 需要先导包, 即 import scala.io.Source .

(1)按行读取

概述
我们可以以 行 为单位, 来读取数据源中的数据, 返回值是一个 迭代器类型的对象 . 然后通过 toArray, toList 方 法, 将这些数据放到数组或者列表中即可.

格式
//1. 获取数据源文件对象.
val source:BufferedSource = Source.fromFile(“数据源文件的路径”,“编码表”)
//2. 以行为单位读取数据.
val lines:Iterator[String] = source.getLines()
//3. 将读取到的数据封装到列表中.
val list1:List[String] = lines.toList
//4. 千万别忘记关闭Source对象.
source.close()

案例
按行读取user.txt文件

import scala.io.{BufferedSource, Source}
object 读取 {
def main(args: Array[String]): Unit = {
//1. 获取数据源对象.
val source: BufferedSource = Source.fromFile(“user.txt”)
//2.通过getLines()方法, 逐行获取文件中的数据.
val lines = source.getLines()
//3. 将获取到的每一条数据都封装到列表中.
val list = lines.toList
list.foreach(println)
}
}

(2)按字符读取

概述
Scala还提供了 以字符为单位读取数据 这种方式, 这种用法类似于迭代器, 读取数据之后, 我们可以通过 hasNext(), next()方法 , 灵活的获取数据.代码片

格式
//1. 获取数据源文件对象.
val source:BufferedSource = Source.fromFile(“数据源文件的路径”,“编码表”)
//2. 以字符为单位读取数据.
val iter:BufferedIterator[Char] = source.buffered
//3. 将读取到的数据封装到列表中.
while(iter.hasNext) {
print(iter.next())
}
//4. 千万别忘记关闭Source对象.
source.close()

注:
如果文件不是很大,我们可以直接把它读到一个字符串中
var str:String = source.mkString

案例
1.在当前项目下创建1.txt文本文件, 文件内容如下:
好好学习, 天天向上!
Hadoop, Zookeeper, Flume, Spark
Flink, Sqoop, HBase
选择黑马, 成就你一生的梦想.
2.以行为单位读取该文本文件中的数据, 并打印结果.
object 按字符读取 {
def main(args: Array[String]): Unit = {
//1. 获取数据源对象.
val source = Source.fromFile(“1.txt”)
//2. 获取数据源文件中的每一个字符.
val iter = source.buffered
//这里, source对象的用法相当于迭代器.
// 3. 通过hasNext(), next()方法获取数据.
while(iter.hasNext) {
print(iter.next()) //细节, 这里不要用println(), 否则输出结果可能不是你想要的.
}
//4. 通过mkString方法, 直接把文件中的所有数据封装到一个字符串中.
val str = source.mkString
//5. 打印结果.
println(str)
//6. 关闭source对象, 节约资源, 提高效率.
source.close()
}
}

2.写入数据

概述
Scala并没有内建的对写入文件的支持, 要写入数据到文件, 还是需要使用Java的类库.

往文件中写入数据
需求
往项目下的3.txt文本文件中, 编写一句话, 内容如下:
好好学习,
天天向上!

代码
object 写入 {
def main(args: Array[String]): Unit = {
val fileOutputStream = new FileOutputStream(“3.txt”)
fileOutputStream.write(“好好学习,\r\n”.getBytes)
fileOutputStream.write(“天天向上”.getBytes)
fileOutputStream.close()
}
}

数据切割

例:
object Test0 {
def main(args: Array[String]): Unit = {
val t1 = “101 main street 123”
val t2 = t1.split( " ")
for(element <- t2){
println(element)
}
}
}
运行后输出
101
main
street
123

数据过滤

Problem
你想要筛选出集合中的一些元素形成一个新的集合,这些元素都是满足你的筛选条件的。
Solution
在10.3节中,“选择一个集合方法来解决问题”,大量的方法可以被用来过滤输入集合的元素然后生成新的集合。这一节中展示了filter方法。那么如何正确使用集合的filter方法呢,首先你需要给filter方法一个判断条件或者返回true/false的函数,这个判断条件(函数)的输入类型要与集合元素类型一致,返回值是布尔型的。filter方法会对集合的每一个元素调用判断条件,当条件为true的时候则元素进入新的集合否则会被过滤掉。你还需要使用一个变量来指向新的集合。
下面这个例子展示了,如何通过取模算法从一个输入集合中筛选出偶数并形成一个新的集合:

正如上面展示的,filter方法返回了所有使假设条件(_ % 2 == 0)为真的集合元素组成的新集合。还有一个方法filterNot,可以返回所有使假设条件返回false的元素组成的新集合。

filter方法对比其他方法的特点有:
• filter方法遍历整个集合,其他的方法都只是遍历一部分元素
• filter方法允许你提供一个判断条件(函数),来过滤集合元素
如何筛选集合元素完全取决于你的算法,接下来的例子展示了一些方法来过滤字符串列表:

当你的判断逻辑复杂,没有办法一行写完,我们可以在filter内部使用多行的判断逻辑:

你同样可以定义一个判断函数,然后把这个函数传给filter方法:

接下来的这个例子告诉你,你可以多次连续调用filter方法:

我们在一个文件中读取所有的行,转换为一个List,每行是一个元素,然后我们把空行过滤掉,然后再把#开头的过滤掉。看起来是一个统计shell脚本代码行数的算法。
使用filter的两个关键点是:
• 你的算法需要能正确判断出你所需要的元素,并返回true,对于你不需要的数据则返回false
• 记得用一个新的变量指向filter方法返回的集合,因为filter方法并不会对原集合做改变

数据解析

主要集合:Map和Tuple是Scala中还是Spark中是使用最频繁、最常用的数据集合
Map特点:
1、默认情况下通过Map构建的集合是不可变的,里面的数据不可修改,一旦修改则会产生新的Map,而原有的Map保持不变,这是和Java中Map一个很大的区别
2、Map的实例是通过调用工厂模式apply方法来构建的, 而需要注意的是Map是接口,在apply中使用了具体的实现类,及如果想实例化Map对象需要使用HashMap等具体的子
3、查询一个Map中的数据一定是采用getOrElse的语法的,一方面是保证key不存在时不报异常,另一方面神奇的作用就是使用默认值(而关于默认值在实际开发中非常重要,在Spark中很多默认配置都是通过getOrElse来实现的)
4、LinkedHashMap可以记住插入数据的顺序(这在实际开发中非常重要)

Tuple特点:
1、Tuple中可以有很多不同类型的数据,例如:(“zhouqt”,“123”,555,“welcome to beijing”)
2、Tuple另外一个非常重要的作用就是可以作为函数的返回值,在Tuple中可以返回若干个值,以SparkContext源码为例:

数据缓存

RDD.cache() 或RDD.persist

1.1. RDD的缓存
Spark速度非常快的原因之一,就是在不同操作中可以在内存中持久化或缓存数据集。当持久化某个RDD后,每一个节点都将把计算的分片结果保存在内存中,并在对此RDD或衍生出的RDD进行的其他动作中重用。这使得后续的动作变得更加迅速。RDD相关的持久化和缓存,是Spark最重要的特征之一。可以说,缓存是Spark构建迭代式算法和快速交互式查询的关键。

1.1.1. RDD缓存方式
RDD通过persist方法或cache方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。

通过查看源码发现cache最终也是调用了persist方法,默认的存储级别都是仅在内存存储一份,Spark的存储级别还有好多种,存储级别在object StorageLevel中定义的。

缓存有可能丢失,或者存储存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。
原文链接:https://blog.csdn.net/huangyinzhao/article/details/80368554

分组聚合

分组 | groupBy

我们如果要将数据按照分组来进行统计分析,就需要使用到分组方法

定义
groupBy表示按照函数将列表分成不同的组

方法签名
def groupBy[K](f: (A) ⇒ K): Map[K, List[A]]

方法解析
groupBy方法 API 说明
泛型 [K] 分组字段的类型
参数 f: (A) ⇒ K 传入一个函数对象
接收集合元素类型的参数
返回一个K类型的key,这个key会用来进行分组,相同的key放在一组中
返回值 Map[K, List[A]] 返回一个映射,K为分组字段,List为这个分组字段对应的一组数据

groupBy执行过程分析

示例
有一个列表,包含了学生的姓名和性别:
“张三”, “男”
“李四”, “女”
“王五”, “男”
请按照性别进行分组,统计不同性别的学生人数

步骤
1.定义一个元组列表来保存学生姓名和性别
2.按照性别进行分组
3.将分组后的Map转换为列表:List((“男” -> 2), (“女” -> 1))

参考代码

scala> val a = List(“张三”->“男”, “李四”->“女”, “王五”->“男”)
a: List[(String, String)] = List((张三,男), (李四,女), (王五,男))

// 按照性别分组
scala> a.groupBy(_._2)
res0: scala.collection.immutable.Map[String,List[(String, String)]] = Map(男 -> List((张三,男), (王五,男)),
女 -> List((李四,女)))

// 将分组后的映射转换为性别/人数元组列表
scala> res0.map(x => x._1 -> x._2.size)
res3: scala.collection.immutable.Map[String,Int] = Map(男 -> 2, 女 -> 1)

聚合操作

聚合操作,可以将一个列表中的数据合并为一个。这种操作经常用来统计分析中

聚合 | reduce
reduce表示将列表,传入一个函数进行聚合计算

定义
方法签名
def reduce[A1 >: A](op: (A1, A1) ⇒ A1): A1

方法解析
reduce方法 API 说明
泛型 [A1 >: A] (下界)A1必须是集合元素类型的子类
参数 op: (A1, A1) ⇒ A1 传入函数对象,用来不断进行聚合操作
第一个A1类型参数为:当前聚合后的变量
第二个A1类型参数为:当前要进行聚合的元素
返回值 A1 列表最终聚合为一个元素

reduce执行流程分析

[!NOTE]
reduce和reduceLeft效果一致,表示从左到右计算
reduceRight表示从右到左计算

案例
1.定义一个列表,包含以下元素:1,2,3,4,5,6,7,8,9,10
2.使用reduce计算所有元素的和

参考代码

scala> val a = List(1,2,3,4,5,6,7,8,9,10)
a: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> a.reduce((x,y) => x y)
res5: Int = 55

// 第一个下划线表示第一个参数,就是历史的聚合数据结果
// 第二个下划线表示第二个参数,就是当前要聚合的数据元素
scala> a.reduce(_ _)
res53: Int = 55

// 与reduce一样,从左往右计算
scala> a.reduceLeft(_ _)
res0: Int = 55

// 从右往左聚合计算
scala> a.reduceRight(_ _)
res1: Int = 55
原文链接:https://blog.csdn.net/weixin_43893397/article/details/104222898

数据可视化Echarts

原文链接:https://blog.csdn.net/weixin_43883917/article/details/113886713

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgabcjc
系列文章
更多 icon
同类精品
更多 icon
继续加载