• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

RDD的创建方式

武飞扬头像
不懂开发的程序猿
帮助1


原创申明: 未经许可,禁止以任何形式转载,若要引用,请标注链接地址
全文共计4654字,阅读大概需要3分钟
欢迎关注我的个人公众号:不懂开发的程序猿

1. 实验室名称:

大数据实验教学系统

2. 实验项目名称:

RDD的创建方式

3. 实验学时:

4. 实验原理:

在调用任何transformation或action操作之前,必须先要有一个RDD。Spark提供了创建RDDs的三种方法。一种方法是将现有的集合并行化。另一种方法是加载外部存储系统中的数据集,比如文件系统。第三种方法是在现有RDD上进行转换来得到新的RDD。  
  使用程序中的集合创建RDD,主要用于进行测试,可以在实际部署到集群运行之前,自己使用集合构造测试数据,来测试后面的spark应用的流程。
  使用HDFS文件创建RDD,应该是最常用的生产环境处理方式,主要可以针对HDFS上存储的大数据,进行离线批处理操作。

5. 实验目的:

掌握Spark基于内存创建RDD的方法。
  掌握Spark基于外部存储创建RDD的方法。

6. 实验内容:

1、读取内存数据,构造RDD。
  2、读取外部存储数据,构造RDD。

7. 实验器材(设备、虚拟机名称):

硬件:x86_64 ubuntu 16.04服务器
  软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3,zeppelin-0.8.1,Scala-2.11.11

8. 实验步骤:

8.1 启动Spark集群

在终端窗口下,输入以下命令,启动Spark集群:

1.	$ cd /opt/spark
2.	$ ./sbin/start-all.sh

然后使用jps命令查看进程,确保Spark的Master进程和Worker进程已经启动。

8.2 启动zeppelin服务器

在终端窗口下,输入以下命令,启动zeppelin服务器:

1.	$ zeppelin-daemon.sh start

然后使用jps命令查看进程,确保zeppelin服务器已经启动。

8.3 创建notebook文档

1、首先启动浏览器,在地址栏中输入以下url地址,连接zeppelin服务器。
http://localhost:9090
  2、如果zeppelin服务器已正确建立连接,则会看到如下的zeppelin notebook首页。
学新通

3、点击【Create new note】链接,创建一个新的笔记本,并命名为”rdd_demo”,解释器默认使用”spark”,如下图所示。
学新通

8.4 从内存集合构造RDD

创建RDD的第一种方法是将对象集合并行化,这意味着将其转换为可以并行操作的分布式数据集。对象集合的并行化方法是调用SparkContext类的parallelize方法。
  在上一步打开的”rdd_demo” notebook的代码单元中,执行以下代码。
  1、从列表中并行化构造RDD。

1.	// 定义一个集合
2.	val list1 = List(1,2,3,4,5,6,7,8,9,10)
3.	     
4.	// 构造RDD
5.	val rdd1 = sc.parallelize(list1)
6.	     
7.	// 将集合返回到Driver端
8.	rdd1.collect

将光标放在代码单元中任意位置,然后同时按下【shift enter】键,执行以上代码。输出结果如下所示。

list1: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:27
res5: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

从上面的输出可以看出,zeppelin会显示每一行代码的执行结果。
  2、也可以像下面这样快速创建list,然后由list创建RDD。

1.	val list2 = List.range(1,11)
2.	val rdd2 = sc.parallelize(list2)
3.	rdd2.collect

将光标放在代码单元中任意位置,然后同时按下【shift enter】键,执行以上代码。输出结果如下所示:

list2: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:27
res9: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

3、也可以像下面这样使用数组创建RDD。

1.	val strList = Array("明月几时有","把酒问青天","不知天上宫阙","今夕是何年")
2.	val strRDD = sc.parallelize(strList)
3.	strRDD.collect

将光标放在代码单元中任意位置,然后同时按下【shift enter】键,执行以上代码。输出结果如下所示。

strList: Array[String] = Array(明月几时有, 把酒问青天, 不知天上宫阙, 今夕是何年)
strRdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at parallelize at <console>:27
res14: Array[String] = Array(明月几时有, 把酒问青天, 不知天上宫阙, 今夕是何年)

8.5 读取外部存储数据,构造RDD

创建RDD的第二种方法是从存储系统读取数据集,存储系统可以是本地计算机文件系统、HDFS、Cassandra、Amazon S3等等。
  请按下面的步骤操作,加载HDFS中数据存储并构造RDD。
  1. 启动HDFS集群。在Linux终端窗口下,键入以下命令,启动HDFS集群。

1.	$ start-dfs.sh

然后使用jps命令查看进程,确保HDFS集群已经启动。
  2、将本地数据上传至HDFS上。在Linux终端窗口下,键入以下命令。

1.	$ hdfs dfs -mkdir -p /data/dataset/
2.	$ hdfs dfs -put /data/dataset/resources /data/dataset/

以上命令会将/data/dataset/下的resources文件夹上传到HDFS的/data/spark_demo/目录下。在resources文件夹中包含有不同类型的数据文件。可以使用以下命令查看。

1.	$ hdfs dfs -ls /data/dataset/resources

可以看到,在HDFS的/data/spark_demo/resources/下已经有了各种类型的数据文件:

Found 9 items
-rw-r—r— 1 root supergroup 130 2019-07-05 10:40 /data/dataset/resources/employees.json
-rw-r—r— 1 root supergroup 240 2019-07-05 10:40 /data/dataset/resources/full_user.avsc
-rw-r—r— 1 root supergroup 5812 2019-07-05 10:41 /data/dataset/resources/kv1.txt
-rw-r—r— 1 root supergroup 49 2019-07-05 10:41 /data/dataset/resources/people.csv
-rw-r—r— 1 root supergroup 73 2019-07-05 10:41 /data/dataset/resources/people.json
-rw-r—r— 1 root supergroup 32 2019-07-05 10:41 /data/dataset/resources/people.txt
-rw-r—r— 1 root supergroup 185 2019-07-05 10:41 /data/dataset/resources/user.avsc
-rw-r—r— 1 root supergroup 334 2019-07-05 10:41 /data/dataset/resources/users.avro
-rw-r—r— 1 root supergroup 615 2019-07-05 10:41 /data/dataset/resources/users.parquet

3、加载HDFS上存储的txt文本文件,构造一个RDD。在zeppelin中执行如下代码。

1.	// 定义HDFS上文件的存储路径
2.	val hdfsFile = "/data/dataset/resources/people.txt"
3.	     
4.	// 使用SparkContext的textFile方法,加载文本文件并构造RDD
5.	// 注意:传入的参数是要加载的HDFS上的数据文件路径
6.	val lines = sc.textFile(hdfsFile)
7.	     
8.	// 返回RDD所有元素到Driver端
9.	lines.collect()

将光标放在代码单元中任意位置,然后同时按下【shift enter】键,执行以上代码。输出结果如下所示:

hdfsFile: String = /data/dataset/resources/people.txt
lines: org.apache.spark.rdd.RDD[String] = /data/dataset/resources/people.txt MapPartitionsRDD[4] at textFile at <console>:30
res14: Array[String] = Array(Michael, 29, Andy, 30, Justin, 19)

9. 实验结果及分析:

实验结果运行准确,无误

10. 实验结论:

经过本节实验的学习,通过学习RDD的创建方式,进一步巩固了我们的Spark基础。

11. 总结及心得体会:

在调用任何transformation或action操作之前,必须先要有一个RDD。Spark提供了创建RDDs的三种方法。一种方法是将现有的集合并行化。另一种方法是加载外部存储系统中的数据集,比如文件系统。第三种方法是在现有RDD上进行转换来得到新的RDD。
  本次实验演示了前两种构造RDD的方法。

12、 实验知识测试

1、SparkContext通过哪个方法来加载外部的数据集(){C}
A、map()
B、text()
C、textFile()
D、read()

13、实验拓展

1. 在本实验环境的HDFS中,存在如下的数据文件:/data/spark_demo/resources/people.csv。请在Zeppelin中编写Spark代码,加载该CSV文件构造RDD。

学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgaciki
系列文章
更多 icon
同类精品
更多 icon
继续加载