• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

hadoop复习题

武飞扬头像
梧桐林.
帮助1

1. hadoop常用端口号

  hadoop2.x hadoop3.x
访问hdfs端口 50070 9870
访问MR执行情况端口 8088 8088
历史服务器 19888 19888
客户访问集群端口 9000 8020

2.HDFS

2.1hdfs读流程和写流程

2.1.1. 读流程

学新通
简单描述

 	1.	客户端向NameNode请求下载文件
	2.	NameNode返回目标文件的元数据,客户端创建FSDatainputStream
	3.	客户端根据元数据请求DataNode读取数据block(就近原则,谁离得近就选谁,但是要考虑负载均衡)
	4.	DataNode向客户端传输数据
	5.	重复第三步,直到所有的块传输完成(串行读提高传输效率,不支持并行读)
	6.	客户端根据元数据组装block块完成读取数据#

详细描述:

  1. 首先调用 FileSystem 对象的 open 方法,其实是一个 DistributedFileSystem 的实例
  2. DistributedFileSystem 通过 rpc 获得文件的第一个 block 的 locations(namenode 查出来的),同一 block 按照副本数(一个 block 的文件有三个备份)会返回多个 locations(返回给 open 方法),这些 locations 按照 hadoop 拓扑结构(远近)排序,距离客户端近的排在前面.
  3. 前两步会返回一个(io 流对象,封装成)FSDataInputStream 对象,该对象会被封装成 DFSInputStream 对象,DFSInputStream 可以方便的管理 datanode 和 namenode 数据流。客户端调用 read 方法,DFSInputStream 最会找出离客户端最近的 datanode 并连接。
  4. 数据从 datanode 源源不断的流向客户端。
  5. 如果第一块的数据读完了,就会关闭指向第一块的 datanode 连接,接着读取下一块。(有多个 block 就还继续按照上面的顺序接着读取)这些操作对客户端来说是透明的(也就是用户不需要知道下面都做了什么),客户端的角度看来只是读一个持续不断的流。
  6. 如果第一批 block 都读完了,DFSInputStream 就会去 namenode 拿下一批 blocks 的 location,然后继续读,如果所有的块都读完,这时就会关闭掉所有的流。
    如果在读数据的时候,DFSInputStream 和 datanode 的通讯发生异常,就会尝试正在读的 block 的排第二近的 datanode, 并且会记录哪个 datanode 发生错误,剩余的 blocks 读的时候就会直接跳过该 datanode。DFSInputStream 也会检查 block 数据校验和,如果发现一个坏的 block, 就会先报告到 namenode 节点,然后 DFSInputStream 在其他的 datanode 上读该 block 的镜像
    该设计的方向就是客户端直接连接 datanode 来检索数据并且 namenode 来负责为每一个 block 提供最优的 datanode,namenode 仅仅处理 block location 的请求,这些信息都加载在 namenode 的内存中,hdfs 通过 datanode 集群可以承受大量客户端的并发访问。
2.1.2. 写流程

学新通
简单描述

	1.	客户端向NameNode发送写数据请求(包含待上传文件名和将要上传的路径)
	2.	NameNode检查路径是否存在,文件是否重名等(假设满足上传条件)
	3.	NameNode向客户端响应数据,可以上传文件
	4.	客户端根据文件大小进行切分成一个个block块,并向NameNode发送提交即将上传block1的请求
	5.	NameNode查询DataNode信息,规划block1的存储位置
	6.	NameNode向客户端返回block1可以存储的数据节点ip列表
	7.	客户端直接请求数据节点1上传block1,数据节点1存储block1完毕并根据ip列表将block1发送给数据节点2,
		数据节点2存储完毕block1并根据ip列表将block1发送给数据节点3,数据节点3存储完成响应数据给数据节点2,
		数据节点2将响应数据给数据节点1,数据节点1将存储结果返回给NameNode和客户端
		(传输数据单位是64k的packet,他由多个chunk512byte chunksum4byte组成)
	8.	重复第四步上传下一个block

详细描述

  1. 客户端通过调用 DistributedFileSystem 的 create 方法创建新文件
  2. DistributedFileSystem 通过 RPC 调用 namenode 去创建一个没有 blocks 关联的新文件(这时候还没有决定存在哪个位置),创建前,namenode 会做各种校验,比如文件是否存在,客户端有无权限去创建等。如果校验通过,namenode 就会记录下新文件,否则就会抛出 IO 异常.
  3. 前两步结束后会返回 FSDataOutputStream 的对象,像读文件的时候相似,FSDataOutputStream 被封装成 DFSOutputStream.DFSOutputStream 可以协调 namenode 和 datanode。客户端开始写数据到 DFSOutputStream,DFSOutputStream 会把数据切成一个个小 packet,然后排成队列 data quene(理解为缓存,将数据进行排列,有序的,方便切分,比如数据到了 128MB,可以进行切分)。
  4. DataStreamer 会去处理接受 data queue,他先问询 namenode 这个新的 block 最适合存储的在哪几个 datanode 里,比如副本数是 3,那么就找到 3 个最适合的 datanode(namenode 告诉了数据流,存储到哪个地方比较合适),把他们排成一个 pipeline(类似于队列中的出栈,出去就没了).DataStreamer 把 packet 按队列输出到管道的第一个 datanode 中,第一个 datanode 又把 packet 输出到第二个 datanode 中,以此类推。
  5. DFSOutputStream 还有一个对列叫 ack queue(一个队列是传输数据,一个队列是验证),也是由 packet 组成,等待 datanode 的收到响应,当 pipeline 中的所有 datanode 都表示已经收到的时候,这时 ack queue 才会把对应的 packet 包移除掉。(ack 就是把数据搞成了两个队列,防止数据发生错误;一个队列是出栈的,存储到 datanode,一个队列是验证的,当验证没有问题之后,ack 才会把自己的数据丢掉,留着就是为了防止出现问题,找不到数据了)
    如果在写的过程中某个 datanode 发生错误,会采取以下几步:1) pipeline 被关闭掉;2) 为了防止丢包 ack queue 里的 packet 会同步到 data queue 里;3) 把产生错误的 datanode 上当前在写但未完成的 block 删掉;4)block 剩下的部分被写到剩下的两个正常的 datanode 中;5)namenode 找到另外的 datanode 去创建这个块的复制(也是三个的)。当然,这些操作对客户端来说是无感知的。
  6. 客户端完成写数据后调用 close 方法关闭写入流
  7. DataStreamer 把剩余得包都刷到 pipeline 里然后等待 ack 信息,收到最后一个 ack 后,通知 namenode 把文件标示为已完成

2.2. HDFS小文件处理

	1)过多小文件会有什么影响
		(1)存储层面:
			1个文件块,占用namenode150字节内存
			1亿个小文件*150字节
			1个文件块 * 150字节
			128G能存储多少文件块?   128 * 1024*1024*1024byte/150字节 = 9亿文件块
		(2)计算层面:
			每个小文件都会起到一个MapTask,占用了大量计算资源
	2)怎么解决
	(1)采用har归档方式,将小文件归档
	(2)采用CombineTextInputFormat
	(3)有小文件场景开启JVM重用;如果没有小文件,不要开启JVM重用,因为会一直占用使用到的task卡槽,直到任务完成才释放。
	JVM重用可以使得JVM实例在同一个job中重新使用N次,N的值可以在Hadoop的mapred-site.xml文件中进行配置。通常在10-20之间
		<property>
		    <name>mapreduce.job.jvm.numtasks</name>
		    <value>10</value>
		    <description>How many tasks to run per jvm,if set to -1 ,there is  no limit</description>
		</property>   

2.3HDFS的NameNode内存

1)Hadoop2.x系列,配置NameNode默认2000m
2)Hadoop3.x系列,配置NameNode内存是动态分配的
NameNode内存最小值1G,每增加100万个block,增加1G内存。
当NameNode启动时先滚动edits并生成一个空的edits.inprogress会将Fsimage和edits文件加载到内存中进行合并,之后的操作(增删)将追加到edits.inprogress中
根据NameNode的工作机制,当edits的操作记录记录过多时不仅会降低追加效率,同时断电恢复时会花费大量时间,因此2NN将针对此问题进行解决,将触发检查条件时,2NN首先通知NameNode滚动edits生成新的eidts.inprogress(之后的操作记录将写在此文件)并通过http get的形式将磁盘的Fsimage和edits复制过来并加载到内存中进行合并,生成Fsimage.chkpoint文件,并通过http post形式拷贝给NameNode重命名为Fsimage后替换原来的Fsimage。
学新通

(1)第一阶段:NameNode 启动
 	 (1)第一次启动 NameNode 格式化后,创建 Fsimage 和 Edits 文件。
    如果不是第一次启动,直接加载编辑日志和镜像文件到内存。 
    (2)客户端对元数据进行增删改的请求。
    (3)NameNode记录操作日志,更新滚动日志。 
    (4)NameNode 在内存中对元数据进行增删改。 

(2)第二阶段:Secondary NameNode工作 
    (1)Secondary NameNode 询问 NameNode 是否需要 CheckPoint。直接带回 NameNode是否检查结果。 
    (2)Secondary NameNode 请求执行 CheckPoint。 
    (3)NameNode 滚动正在写的Edits 日志。 
    (4)将滚动前的编辑日志和镜像文件拷贝到 Secondary NameNode。
    (5)SecondaryNameNode 加载编辑日志和镜像文件到内存,并合并。 
    (6)生成新的镜像文件 fsimage.chkpoint。 
    (7)拷贝fsimage.chkpoint 到 NameNode。 
    (8)NameNode 将 fsimage.chkpoint 重新命名成fsimage。
学新通

思考:为什么块的大小不能设置太小,也不能设置太大?
HDFS 的块设置太小,会增加寻址时间,程序一直在找块的开始位置
如果块设置的太大,从磁盘传输数据的时间会明显大于定位这个块开始位置所需的时间。导致程序在处理这块数据的时候,或非常慢。
总结:HDFS 块的大小设置主要取决于磁盘传输速率。

2.4 datanode工作机制

学新通

3.MapReduce

3.1 Map 阶段

学新通

  1. 框架使用 InputFormat 类的子类把输入文件 (夹) 划分为很多 InputSplit,默认,每个 HDFS 的 block 对应一个 InputSplit。通过 RecordReader 类,把每个 InputSplit 解析成一个个 < k1,v1>。默认,框架对每个 InputSplit 中的每一行,解析成一个 < k1,v1>。
  2. 框架调用 Mapper 类中的 map (…) 函数,map 函数的形参是 < k1,v1 > 对,输出是 < k2,v2 > 对。一个 InputSplit 对应一个 map task。程序员可以覆盖 map 函数,实现自己的逻辑。
  3. (假设 reduce 存在) 框架对 map 输出的 < k2,v2 > 进行分区。不同的分区中的 < k2,v2 > 由不同的 reduce task 处理。默认只有 1 个分区。
    (假设 reduce 不存在) 框架对 map 结果直接输出到 HDFS 中。
  4. (假设 reduce 存在) 框架对每个分区中的数据,按照 k2 进行排序、分组。分组指的是相同 k2 的 v2 分成一个组。注意:分组不会减少 < k2,v2 > 数量。
  5. (假设 reduce 存在,可选) 在 map 节点,框架可以执行 reduce 归约。
  6. (假设 reduce 存在) 框架会对 map task 输出的 < k2,v2 > 写入到 linux 的磁盘文件中。
    至此,整个 map 阶段结束

3.2 Reduce 阶段

学新通

  1. 框架对多个 map 任务的输出,按照不同的分区,通过网络 copy 到不同的 reduce 节点。这个过程称作 shuffle。
  2. 框架对 reduce 端接收的 [map 任务输出的] 相同分区的 < k2,v2 > 数据进行合并、排序、分组。
  3. 框架调用 Reducer 类中的 reduce 方法,reduce 方法的形参是 <k2,{v2…}>,输出是 < k3,v3>。一个 < k2,{v2…}> 调用一次 reduce 函数。程序员可以覆盖 reduce 函数,实现自己的逻辑。
  4. 框架把 reduce 的输出保存到 HDFS 中。
    至此,整个 reduce 阶段结束。

3.3 Shuffle 阶段

上面的流程是整个 MapReduce 最全工作流程,但是 Shuffle 过程只是从第 7 步开始到第16 步结束,具体 Shuffle 过程详解,如下:

  1. MapTask 收集我们的 map()方法输出的 kv 对,放到内存缓冲区中
  2. 从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件
  3. 多个溢出文件会被合并成大的溢出文件
  4. 在溢出过程及合并的过程中,都要调用 Partitioner 进行分区和针对 key 进行排序
  5. ReduceTask 根据自己的分区号,去各个 MapTask 机器上取相应的结果分区数据
  6. ReduceTask 会抓取到同一个分区的来自不同 MapTask 的结果文件,ReduceTask 会
    将这些文件再进行合并(归并排序)
  7. 合并成大文件后,Shuffle 的过程也就结束了,后面进入 ReduceTask 的逻辑运算过
    程(从文件中取出一个一个的键值对 Group,调用用户自定义的 reduce()方法)

注意:
(1)Shuffle 中的缓冲区大小会影响到 MapReduce 程序的执行效率,原则上说,缓冲区
越大,磁盘 io 的次数越少,执行速度就越快。
(2)缓冲区的大小可以通过参数调整,参数:mapreduce.task.io.sort.mb 默认 100M。

  1. 每个 map 有一个环形内存缓冲区,用于存储 map 的输出。默认大小 100MB(io.sort.mb 属性),一旦达到阀值 0.8(io.sort.spill.percent), 一个后台线程把内容溢写到 (spilt) 磁盘的指定目录(mapred.local.dir)下的一个新建文件中。
  2. 写磁盘前,要 partition,sort。如果有 combiner,combine 排序后数据。
  3. 等最后记录写完,合并全部文件为一个分区且排序的文件。

1.Reducer 通过 Http 方式得到输出文件的特定分区的数据。
2. 排序阶段合并 map 输出。然后走 Reduce 阶段。
3.reduce 执行完之后,写入到 HDFS 中。

mapreduce 中,map 阶段处理的数据如何传递给 reduce 阶段,是 mapreduce 框架中最关键的一个流程,这个流程就叫 shuffle;

具体来说:就是将 maptask 输出的处理结果数据,分发给 reducetask,并在分发的过程中,对数据按 key 进行了分区和排序;
shuffle过程如下:

学新通
你可以这么说:

  1. Map方法之后Reduce方法之前这段处理过程叫「Shuffle」

  2. Map方法之后,数据首先进入到分区方法,把数据标记好分区,然后把数据发送到环形缓冲区;环形缓冲区默认大小100m,环形缓冲区达到80%时,进行反向溢写;溢写前对数据进行排序,排序按照对key的索引进行字典顺序排序,排序的手段「快排」;溢写产生大量溢写文件,需要对溢写文件进行「归并排序」;对溢写的文件也可以进行Combiner操作,前提是汇总操作,求平均值不行。最后将文件按照分区存储到磁盘,等待Reduce端拉取。

  3. 每个Reduce拉取Map端对应分区的数据。拉取数据后先存储到内存中,内存不够了,再存储到磁盘。拉取完所有数据后,采用归并排序将内存和磁盘中的数据都进行排序。在进入Reduce方法前,可以对数据进行分组操作。

提问:reduce怎么知道去哪里拉去map结果集?
map任务成功后,它们会使用心跳机制通知它们的application master。因此,对于指定作业,application master 知道map输出和主机位置之间的映射关系。reduce中的一个线程定期询问master以便于获取map输出主机的位置,直到获得所有输出位置。

3.4MapReduce的优化

学新通
学新通

4.Yarn

4.1 yarn简略版工作流程

学新通
步骤如下:

  1. client向RM提交应用程序,其中包括启动该应用的ApplicationMaster的必须信息,例如ApplicationMaster程序、启动ApplicationMaster的命令、用户程序等

  2. ResourceManager启动一个container用于运行ApplicationMaster

  3. 启动中的ApplicationMaster向ResourceManager注册自己,启动成功后与RM保持心跳

  4. ApplicationMaster向ResourceManager发送请求,申请相应数目的container

  5. 申请成功的container,由ApplicationMaster进行初始化。container的启动信息初始化后,AM与对应的NodeManager通信,要求NM启动container

  6. NM启动container

  7. container运行期间,ApplicationMaster对container进行监控。container通过RPC协议向对应的AM汇报自己的进度和状态等信息

  8. 应用运行结束后,ApplicationMaster向ResourceManager注销自己,并允许属于它的container被收回
    详细过程
    学新通

4.2 Yarn调度器

  1. Hadoop调度器重要分为三类:
    FIFO 、Capacity Scheduler(容量调度器)和Fair Sceduler(公平调度器)。
    Apache默认的资源调度器是容量调度器;
    CDH默认的资源调度器是公平调度器。
  2. 区别:
    FIFO调度器:支持单队列 、先进先出 生产环境不会用。
    容量调度器:支持多队列。队列资源分配,优先选择资源占用率最低的队列分配资源;作业资源分配,按照作业的优先级和提交时间顺序分配资源;容器资源分配,本地原则(同一节点/同一机架/不同节点不同机架)
    公平调度器:支持多队列,保证每个任务公平享有队列资源。资源不够时可以按照缺额分配。
  3. 在生产环境下怎么选择?
    大厂:如果对并发度要求比较高,选择公平,要求服务器性能必须OK;
    中小公司,集群服务器资源不太充裕选择容量。
  4. 在生产环境怎么创建队列?
    (1)调度器默认就1个default队列,不能满足生产要求。
    (2)按照框架:hive /spark/ flink 每个框架的任务放入指定的队列(企业用的不是特别多)
    (3)按照业务模块:登录注册、购物车、下单、业务部门1、业务部门2
  5. 创建多队列的好处?
    (1)因为担心员工不小心,写递归死循环代码,把所有资源全部耗尽。
    (2)实现任务的降级使用,特殊时期保证重要的任务队列资源充足。
    业务部门1(重要)=》业务部门2(比较重要)=》下单(一般)=》购物车(一般)=》登录注册(次要)

5 项目经验之基准测试

搭建完Hadoop集群后需要对HDFS读写性能和MR计算能力测试。测试jar包在hadoop的share文件夹下。
集群总吞吐量 = 带宽*集群节点个数/副本数
例如:100m/s * 10台/ 3= 333m/s
注意:如果测试数据在本地,那副本数-1。因为这个副本不占集群吞吐量。如果数据在集群外,向该集群上传,需要占用带宽。本公式就不用减1。

6 Hadoop宕机

1)如果MR造成系统宕机。此时要控制Yarn同时运行的任务数,和每个任务申请的最大内存。调整参数:yarn.scheduler.maximum-allocation-mb(单个任务可申请的最多物理内存量,默认是8192MB)
2)如果写入文件过快造成NameNode宕机。那么调高Kafka的存储大小,控制从Kafka到HDFS的写入速度。例如,可以调整Flume每批次拉取数据量的大小参数batchsize。

7 Hadoop解决数据倾斜方法

  1. 提前在map进行combine,减少传输的数据量
    在Mapper加上combiner相当于提前进行reduce,即把一个Mapper中的相同key进行了聚合,减少shuffle过程中传输的数据量,以及Reducer端的计算量。
    如果导致数据倾斜的key大量分布在不同的mapper的时候,这种方法就不是很有效了。
  2. 导致数据倾斜的key 大量分布在不同的mapper
    1. 局部聚合加全局聚合。
      第一次在map阶段对那些导致了数据倾斜的key 加上1到n的随机前缀,这样本来相同的key 也会被分到多个Reducer中进行局部聚合,数量就会大大降低。
      第二次mapreduce,去掉key的随机前缀,进行全局聚合。
      思想:二次mr,第一次将key随机散列到不同reducer进行处理达到负载均衡目的。第二次再根据去掉key的随机前缀,按原key进行reduce处理。
      这个方法进行两次mapreduce,性能稍差。
    2. 增加Reducer,提升并行度
      JobConf.setNumReduceTasks(int)
    3. 实现自定义分区
      根据数据分布情况,自定义散列函数,将key均匀分配到不同Reducer

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgjabaj
系列文章
更多 icon
同类精品
更多 icon
继续加载