• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

什么是MapReduceMapReduce整体架构搭建使用

武飞扬头像
IT行业小趴菜
帮助1

前言

本文是MapReduced的详细介绍,MapReduce是hadoop体系下的一种计算模型(计算框架|编程框架),主要是用来对存储在hdfs上的数据进行统计,分析的,分布式计算框架,用来解决分布式大数据平台下数据如何计算,资源调度,任务监控 主要用来整合hadoop集群中的资源(CPU 内存),进行统一调度 同时监控任务的执行情况,联合多个服务器节点的硬件,共同完成一个计算。突破单机服务器的计算能力,还介绍了Yarn分布式集群搭建使用,MapReduce工作的原理源码分析

MapReduce

入门

MapReduce是hadoop体系下的一种计算模型(计算框架|编程框架),主要是用来对存储在hdfs上的数据进行统计,分析的。

MapReduce的核心思想

分而治之:大任务拆分小任务。

学新通

MapReduce

概念:分布式计算框架,用来解决分布式大数据平台下数据如何计算。
简单:分而治之
Job
MapTask * 多个并行
ReduceTask

  1. Job(一个大型任务)[Application]
    一组MapReduce又统称为一个Job作业
  2. MapTask(拆分后的小任务)
    局部计算 并行
  3. Reduce(整合任务)
    对局部计算结果进行汇总计算。
yarn

yarn集群核心组成

NodeManager

ResourceManager

学新通

作用(包工队)
资源调度,任务监控 主要用来整合hadoop集群中的资源(CPU 内存),进行统一调度 同时监控任务的执行情况
总结: 联合多个服务器节点的硬件,共同完成一个计算。突破单机服务器的计算能力。

组成部分

  1. ResourceManager(包工头)
    集群计算资源的管理器,也是yarn架构中的主节点。
    功能:
    1. 监控集群资源
    2. 为计算分配资源。
  2. NodeManager(干活的)
    yarn集群计算资源的提供者,也是yarn架构中的从节点。
    功能
    1. 真正执行计算任务的节点。
  3. 监控本节点的资源情况(CPU 内存 网络 硬盘),并通过心跳向RM汇报。

学新通

MapReduce特点

  1. 易于编程:只需要使用hadoop接口进行编程,即可实现多台计算机分布式计算和分布式存储。
  2. 高扩展性:存储空间不足或者计算能力不足,则可以添加计算机完成。
  3. 容错性高:如果某个节点宕机,hadoop可以自动切换讲计算任务转移到其他节点上完成,不会影响计算结果。
    如果计算任务执行了一半失败,出错,内部自动重试机制。
  4. 应用场景:PB级别以上海量数据的离线处理,无法实时处理和流失动态处理。(每日)

Yarn伪分布式搭建

1.准备单机的HDFS架构
要求:安装了并配置了HDFS架构的服务器。
验证:jps

[root@hadoop10 ~]# jps
2224 Jps
2113 SecondaryNameNode
1910 DataNode
1806 NameNode
关闭掉hdfs
	stop-dfs.sh
# 2 初始化配置文件
# 拷贝得到mapred-site.xml
[root@hadoop10 hadoop]# cp mapred-site.xml.template mapred-site.xml
1. mapred-site.xml
	<!--配置yarn框架作为mapreduce的资源调度器-->
	<property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>
2. yarn-site.xml
	<!-- mapreduce计算服务方法。 -->
	<property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
    </property>
	<!--配置resourcemanager的主机ip-->
    <property>
        <name>yarn.resourcemanager.hostname</name>
        <value>Hadoop</value>
    </property>
3. slaves配置文件
	指定:DataNode和NodeManager节点的ip地址。
	① Datanode节点ip
	② Nodemanager的节点ip
学新通
# 3. 启动yarn集群
1. 启动HDFS集群
	start-dfs.sh
2. 启动yarn集群
	start-yarn.sh
	关闭yarn
	stop-yarn.sh
# 验证
1. jps
[root@hadoop11 ~]# jps
    6160 DataNode   --- 数据存储节点
    6513 ResourceManager -- 计算机资源调度节点
    6614 NodeManager -- 局部计算节点
    6056 NameNode  -- 文件元数据存储节点
    6349 SecondaryNameNode -- checkpoint节点。
    6831 Jps
2. 访问yarn的资源调度器web网页。
	http://resourcemanager所在节点ip:8088

学新通

MapReduce编码

需求

学新通

MapReduce2.0工作机制

  • 数据变化(要干什么)
    学新通

  • 工作角色(谁来干)

学新通

MapReduce数据流转机制

学新通> 1. InputFormat(mr自动处理)

讲block文件转化成split,其中每条数据是key-value组成。
key是数据偏移量
value是每条数据
2. Map(程序员编码)
将split逐条输入给map,由map负责,对每条数据进行处理,转化为keyOut-valueOut
3. Shuffle(MR的默认处理器)
对map输出的每条数据的key-value进行排序,分组。
4. Reduce(程序员编码)
对Shuffle分组后的数据的key-value进行处理,转化为新的key-value。
5. OutputFormat
讲reduce产生的数据,存储HDFS文件系统中

MR编码准备
# 导入pom依赖
<!--hadoop公共依赖-->
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>2.9.2</version>
</dependency>

<!--hadoop hdfs 依赖-->
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-hdfs</artifactId>
    <version>2.9.2</version>
</dependency>

<!--junit-->
<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.12</version>
</dependency>

<!--map reduce-->
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-mapreduce-client-core</artifactId>
    <version>2.9.2</version>
</dependency>

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-mapreduce-client-common</artifactId>
    <version>2.9.2</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
    <version>2.9.2</version>
</dependency>
学新通
# 导入log4j配置文件
MR编码
# 编写map程序
/*Mapper:
    * 接受:k(0)-v(yangdd yangdd)
    * 输出:k(name)-v(1)
    *
    * */
/**
     * 继承类上的泛型:
     * Keyin
     * ValueIn
     * KeyOut
     * ValueOut
     *
     */
static class WordCountMapper extends Mapper<LongWritable,Text, Text, IntWritable>{
    /**
         * 执行时机:每读取一行k-v,调用一次map方法
         * @param key 输入k
         * @param value 输入v
         * @param context 输出k-v写出工具。
         * @throws IOException
         * @throws InterruptedException
         */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //1. 接受k-v
        //2. 对v进行拆分
        String sv = value.toString();
        String[] names = sv.split(" ");
        //遍历数组,将得到每个name,作为k输出。
        for (String name : names) {
            //3. 将k(name)-v(1)
            context.write(new Text(name),new IntWritable(1));
        }

    }
}
学新通
# 编写reduce程序
/*Reducer:
    * 对maptask输出后,mapreduce合并后的k-vs中的value之,累加和。
    * keyint
    * valuein
    * keyout
    * valueout
    * */
static class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
    /**
         * 执行时机:每读取Reduce端合并后的一组数据(k-vs),调用一次reduce方法。
         * @param key 输入k
         * @param values 输入value [1,2,3,1]
         * @param context 输出k-v
         * @throws IOException
         * @throws InterruptedException
         */
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        //1: 接受k-vs
        //2. 对vs 遍历累加
        int sum = 0;
        for (IntWritable value : values) {
            sum = sum value.get();
        }
        //3. 输出
        // k(name)-v(累加和)
        context.write(key,new IntWritable(sum));
    }
}
学新通
# 编写job程序
public static void main(String[] args) throws Exception{
    /*组装Job 启动Job*/
    //1. 初始化hdfs的配置文件 入口
    Configuration conf = new Configuration();
    conf.set("fs.defaultFS","hdfs://192.168.199.10:9000");
    //2. 创建job,未来是要运行在yarn集群中。
    Job job = Job.getInstance(conf);
    job.setJarByClass(JobForWordCount.class);
    //3. 配置job(MapTask一端): TextInputFormat keyout valueout Mapper
    TextInputFormat.addInputPath(job,new Path("/baizhi/mapreduce/demo1/namecount.txt"));
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);
    job.setMapperClass(WordCountMapper.class);
    //4. 配置job(ReduceTask一端): TextOutputFormat keyout valueout Reducer
    TextOutputFormat.setOutputPath(job,new Path("/baizhi/mapreduce/demo1/namecountout"));//最后一集目录不能存在,执行目录。
    job.setReducerClass(WordCountReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    //5. 启动job
    boolean b = job.waitForCompletion(true);
    System.out.println(b);
}
学新通
# 本地直接运行。
	使用本地的方式提交任务,需要HDFS开启写入文件的权限。
	hdfs dfs -chmod -R 777 /hdfs

学新通

MapReduce核心api

学新通

  • InputFormat
  • MapTask
  • maper
  • ReduceTask
  • reducer
  • OutputFormat
Mapreduce补充细节

Hadoop的MapReduce适合做大数据的离线处理,不适合做实时处理。

mapreduce的sort排序,无法取消。

生产中提交MR任务1

学新通

# 打包
# 1. 设置maven的打包的环境
<properties>
    <!--解决编码的GBK的问题-->
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<build>
    <!--指定打包的jar的名字-->
    <finalName>mr1</finalName>
    <!--指定打包的信息-->
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-jar-plugin</artifactId>
            <version>2.4</version>
            <configuration>
                <archive>
                    <!--指定入口主函数所在的类名-->
                    <manifest>
                        <mainClass>demo1.job.WordCountJob</mainClass>
                    </manifest>
                </archive>
            </configuration>
        </plugin>
    </plugins>
</build>
学新通
# 2. 执行打包
	在当前项目所在的目录下执行如下命令
	> mvn package
# 3. 上传jar到hadoop的ResourceManager所在的机器
# 4. 执行程序
	> yarn jar mr1.jar
maven自动化部署插件wagon
# 1. 配置maven远程提交插件
1. 添加maven的ssh扩展
2. 添加maven的远程拷贝插件wagon(货车)
<!--加入maven的扩展ssh-->
<extensions>
    <extension>
        <groupId>org.apache.maven.wagon</groupId>
        <artifactId>wagon-ssh</artifactId>
        <version>2.8</version>
    </extension>
</extensions>
<!--maven的远程拷贝插件-->
<plugin>
    <groupId>org.codehaus.mojo</groupId>
    <artifactId>wagon-maven-plugin</artifactId>
    <version>1.0</version>
    <configuration>
        <!--上传的本地jar的位置-->
        <fromFile>target/${project.build.finalName}.jar</fromFile>
        <!--远程拷贝的地址-->
        <url>scp://用户名:密码@ip:/opt/app</url>
    </configuration>
</plugin>
3. 添加远程执行命令,和参数。
# 清空
	mvn clean
# 打包本地jar
	mvn package
# 远程上传jar
	mvn wagon:upload-single
ApplicationMaster

ResourceManager:任务分配,和nodemanager管理;领导、团队管理[工头]

NodeManager: 负责运行执行MapTask和ReduceTask。具体干活的人。[工人]

MRAppMaster:监控、管理 MapReduce任务的执行(开始-过程-结束)。工地监工。

只有在启动mapreduce程序,才会启动MRAppMaster

学新通

学新通

负责某个任务全部执行过程的监控管理。(监工)

  1. 提交job
    启动ApplicationMaster|MRAppMaster
  2. 管理整个job的运行过程
    ① 向ResourceManager申请资源。
    ② 在NodeManager中启动一个运行环境,执行代码。()
    ③ 跟踪应用job的执行过程和状态
    ④ Job故障管理;
    一旦job任务执行失败(MapTask),AppMaster,自定让NodeManager重启执行任务代码。
配置yarn的日志服务器-Historyserver

Hadoop自带了一个历史服务器,可以通过历史服务器查看已经运行完的Mapreduce作业记录

比如用了多少个Map、用了多少个Reduce、作业提交时间、作业启动时间、作业完成时间等信息。

默认未启动。

学新通

# 1. 配置mapred-site.xml,指定历史日志服务器的地址
<!--job历史日志服务器的服务地址-->
<property>
    <name>mapreduce.jobhistory.address</name>
    <value>hadoop10:10020</value>
</property>
<!--job的历史日志服务器的web地址-->
<property>
    <name>mapreduce.jobhistory.webapp.address</name>
    <value>hadoop10:19888</value>
</property>
# 2. 配置yarn-site.xml,指定开启日志聚合和日志保留时间,使得日志文件保存在hdfs上。
<!--开启日志聚合-->
<property>
    <name>yarn.log-aggregation-enable</name>
    <value>true</value>
</property>
<!--日志保存时间 单位秒 这里是7天-->
<property> 
    <name>yarn.log-aggregation.retain-seconds</name>
    <value>604800</value>
</property>
# 3. 启动历史日志服务器
0. 启动hdfs
	
1. 重启yarn
	[root@hadoop10 ~]# stop-yarn.sh
	[root@hadoop10 ~]# start-yarn.sh
2. 启动
	[root@hadoop10 ~]# mr-jobhistory-daemon.sh start historyserver
	如果需要关闭执行如下命令
	[root@hadoop10 ~]# mr-jobhistory-daemon.sh stop historyserver
# 4. 查看日志
	1. 访问http://ip:8088(访问yarn集群,看到执行过的job信息)
	2. 点击"Applications"找到刚才执行的job的"history"
	3. 点击logs

学新通
学新通

MapReduce详解

Hadoop序列化

案例数据

手机使用的流量数据,每次手机上网记录一条信息。

需求:统计每个手机号的 上传总流量 下载总流量 总流量

分析核心点:

希望那些数据相同的合并在一起,map端就以它为key输出即可。

# 案例数据
id				手机号		 						 ip地址					上传	  下载	状态码
1363157985066	13726230503	00-FD-07-A4-72-B8:CMCC	120.196.100.82	24	27	2481	24681	200
1363157995052	13826544101	5C-0E-8B-C7-F1-E0:CMCC	120.197.40.4	4	0	264	0	200
1363157991076	13926435656	20-10-7A-28-CC-0A:CMCC	120.196.100.99	2	4	132	1512	200
1363154400022	13926251106	5C-0E-8B-8B-B1-50:CMCC	120.197.40.4	4	0	240	0	200
1363157985066	13726230503	00-FD-07-A4-72-B8:CMCC	120.196.100.82	24	27	2481	24681	200
1363157995052	13826544101	5C-0E-8B-C7-F1-E0:CMCC	120.197.40.4	4	0	264	0	200
1363157991076	13926435656	20-10-7A-28-CC-0A:CMCC	120.196.100.99	2	4	132	1512	200
1363154400022	13926251106	5C-0E-8B-8B-B1-50:CMCC	120.197.40.4	4	0	240	0	200
# 期望结果
13726230503	 上传流量:4962  下载流量:49362  总数据流量:  54324
13826544101	 上传流量:528  下载流量:0  总数据流量:  528
13926251106	 上传流量:480  下载流量:0  总数据流量:  480
13926435656	 上传流量:264  下载流量:3024  总数据流量:  3288
# hadoop序列化
	mapreduce执行过程中,被处理的key-value数据,需要在网络中传输,就需要对象转化为字节,字节转化为对象,这就是序列化和反序列化过程;
	key和value都要经过序列化传输。
1. Java序列化(序列化数据 对象描述信息)
	序列化会包含java的继承关系,验证信息,验证信息。(重量级)
	不便于在网络中传输。
2. Hadoop序列化(仅关注数据序列化)
	空间紧凑
	传输快速,网络开销小。
结论:
	mapreduce中所有key-value都要支持序列化。

学新通

hadoop内置可序列化类型

Java类型 Hadoop Writable类型
boolean BooleanWritable
byte ByteWritable
int IntWritable
long LongWritable
float FloatWritable
double DoubleWritable
string Text
array ArrayWritable
map MapWritable
null NullWritable

自定义序列化类型
将要封装的数据,放在一个类中。
自定义一个类实现WritableComparable

  1. 可以被hadoop序列化传输。
  2. 可以支持排序。

注意序列化和反序列化的属性操作顺序要完全一致

学新通

//序列化示例代码
public class PhoneLogWritable implements WritableComparable<PhoneLogWritable> {
    private Logger log = Logger.getLogger(PhoneLogWritable.class);
    private int upload;
    private int download;
    private int sum;
    public PhoneLogWritable(int upload, int download, int sum) {
        this.upload = upload;
        this.download = download;
        this.sum = sum;
    }
    public PhoneLogWritable() {
        log.info("----对象创建----");
    }
    public int compareTo(PhoneLogWritable o) {
        log.info("--比较--");
        return this.sum-o.sum;
    }
    public void write(DataOutput dataOutput) throws IOException {
        log.info("------write---");
        dataOutput.writeInt(upload);
        dataOutput.writeInt(download);
        dataOutput.writeInt(sum);
    }
    public void readFields(DataInput dataInput) throws IOException {
        log.info("--read---");
        upload = dataInput.readInt();
        download = dataInput.readInt();
        sum = dataInput.readInt();
    }
    @Override
    public boolean equals(Object o) {
        System.out.println("--equals---");
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        PhoneLogWritable that = (PhoneLogWritable) o;
        return upload == that.upload &&
                download == that.download &&
                sum == that.sum;
    }
    @Override
    public int hashCode() {
        System.out.println("--hashcode---");
        return Objects.hash(upload, download, sum);
    }
    public int getUpload() {
        return upload;
    }
    public void setUpload(int upload) {
        this.upload = upload;
    }
    public int getDownload() {
        return download;
    }
    public void setDownload(int download) {
        this.download = download;
    }
    public int getSum() {
        return sum;
    }
    public void setSum(int sum) {
        this.sum = sum;
    }
    @Override
    public String toString() {
        return "PhoneLogWritable{"  
                "upload="   upload  
                ", download="   download  
                ", sum="   sum  
                '}';
    }
}
学新通

数据清洗

业务: 将原始数据文件中的脏东西(无效数据,无用数据)洗掉。

1. 将原始的数据文件(日志文件)中,无效的行数据,去除。
2. 将本次功能处理,不需要的业务数据,去除。
# mapreduce中可以没有reduce
效果:之进行map阶段的执行。执行完毕后即输出到文件中。
# 代码实现
1. 取消mapreduce的job有关reduce的所有设置
2. 保留并设置如下
	job.setNumReduceTasks(0);//取消reduce
	FileOutputFormat.setOutputPath(job,new Path("/app/mapreduce/demo3/out"));
# 原数据
id				手机号		 						 ip地址					上传	  下载	状态码
1363157985066	13726230503	00-FD-07-A4-72-B8:CMCC	120.196.100.82	24	27	2481	24681	200
1363157995052	13826544101	5C-0E-8B-C7-F1-E0:CMCC	120.197.40.4	4	0	264	0	200
1363157991076	13926435656	20-10-7A-28-CC-0A:CMCC	120.196.100.99	2	4	132	1512	200
1363154400022	13926251106	5C-0E-8B-8B-B1-50:CMCC	120.197.40.4	4	0	240	0	200
1363157985066	13726230503	00-FD-07-A4-72-B8:CMCC	120.196.100.82	24	27	2481	24681	200
1363157995052	13826544101	5C-0E-8B-C7-F1-E0:CMCC	120.197.40.4	4	0	264	0	200
1363157991076	13926435656	20-10-7A-28-CC-0A:CMCC	120.196.100.99	2	4	132	1512	200
1363154400022	13926251106	5C-0E-8B-8B-B1-50:CMCC	120.197.40.4	4	0	240	0	200
1363157995052	13826544109	5C-0E-8B-C7-F1-E0:CMCC	120.197.40.4	4	0
1363157995052	null	5C-0E-8B-C7-F1-E0:CMCC	120.197.40.4	4	0	240	0	200
1363157991076	13926435659	20-10-7A-28-CC-0A:CMCC	120.196.100.99	2	4	null	null	null
# 测试案例
	删除其中手机号不符合要求,上传流量确实和下载流量确实的数据,并仅保留手机号 上传流量 下载流量。(效果如下)
    13726230503	2481	24681
    13826544101	264	0
    13926435656	132	1512
    13926251106	240	0
    13726230503	2481	24681
    13826544101	264	0
    13926435656	132	1512
    13926251106	240	0

计数器Counter

用来记录hadoop执行过程的工具,可以理解为hadoop的日志。
形式
group1
name1 数量

代码
context.getCounter(“map阶段”,“map输出”).increment(1L);

  • 效果如下

学新通

排序

  1. 简介
    Shuffle期间,MapReduce会对map输出的数据,对key进行排序。
  2. 时机:
  3. map输出之后,shuffle过程中。
  4. map输出之后的map端。
  5. 规则:
  6. key如果是Text类型按照字典顺序,进行字符串排序。
  7. key如果是IntWritable LongWritable 则按照数字大小进行升序排序。
    学新通
# 测试数据
用户id	观众人数
团团	300
小黑	200
哦吼	400
卢本伟	100
八戒	250
悟空	100
唐僧	100
# 需求:按照观众人数升序排序?
悟空	100
唐僧	100
卢本伟	100
小黑	200
默认排序
# 默认排序
 	默认按照数据类型内置CompareTo方法规则进行排序
# 案例
用户id	观众人数
团团	300
小黑	200
哦吼	400
卢本伟	100
八戒	250
悟空	100
唐僧	100
# 期望结果
卢本伟	100
悟空	100
唐僧	100
小黑	200
八戒	250
团团	300
哦吼	400
学新通

编码:

1:将需要排序的值作为MapTask输出的key

2:默认排序规则,是Hadoop API的内置类型写好的排序规则

自定义排序
# 自定义排序
# 案例
团团	300
小黑	200
哦吼	400
卢本伟	100
八戒	250
悟空	100
唐僧	100
# 期望
哦吼	400
团团	300
八戒	250
小黑	200
卢本伟	100
悟空	100
唐僧	100
学新通

学新通

学新通

二次排序(二排)

本质上在排序时候,调用了排序key的两个属性参数排序

二次排序
核心:排序所依据的字段作为map输出的key。

# 测试数据
用户id	观众人数	直播时长
团团	300	1000
小黑	200	2000
哦吼	400	7000
卢本伟	100	6000
八戒	250	5000
悟空	100	4000
唐僧	100	3000
# 需求:按照观众人数降序排序,如果观众人数相同,按照直播时长降序。

核心思路:

  1. 排序所依据的字段,要作为key。
  2. 实现hadoop的序列化。
    重写WritableComparable的compareTo方法
/**
 * 1 可序列化(write readFiled)
 * 2 可排序(compareTo)
 */
public class LivePlayLog implements WritableComparable<LivePlayLog> {
    private int viewer;//观众人数
    private long length;//直播时长
    public LivePlayLog(){}

    /**
     * 作用:将该对象作为map输出的key
     *   1. mapreduce执行过程中,排序时候会调用该方法
     *   2. 默认在合并操作时候,会将key相同的value合并在一起。
     * 返回值:
     *      1 升序排序(this-o)
     *      -1 降序排序
     *      0 key相同的。
     * @param o
     * @return
     */
    @Override
    public int compareTo(LivePlayLog o) {
        if(this.viewer != o.viewer){
            return -(this.viewer-o.viewer);
        }else if(this.length != o.length){
            return -(int)(this.length-o.length);
        }else{
            return 0;
        }
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
            dataOutput.writeInt(viewer);
            dataOutput.writeLong(length);

    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
            viewer = dataInput.readInt();
            length = dataInput.readLong();
    }
   ......
学新通

MapReduce优化

MapTask并行度

MapTask并行度,是不是越大越好?

  1. MapTask的并行度的产生
  2. inputformat根据配置信息,获得hdfs中文件的split大小和位置。
  3. 每个split就会启动一个MapTask,进行处理。
  4. 总结MapTask并行度决定机制
    split的个数。
  5. 概念:
    block:hdfs文件的最小单元。
    split:文件切分信息,虚拟的文件切片。
  6. 默认:blocksize的大小就是split的大小128M,也就是一个MapTask执行的任务。
    这样能够减少多个节点的MapTask之间的网络IO。
  7. 切片操作是针对1个文件,多个文件的切片不会合并。

学新通

InputFormat(优化1)
  1. 作用
  1. 对hdfs中的文件进行split切片,计算。(逻辑切分 start end)
  2. 读入的结果交给MapTask进行处理。
# 2. TextInputFormat
	接口:org.apache.hadoop.mapreduce.InputFormat
	实现类: 
		org.apache.hadoop.mapreduce.lib.input.TextInputFormat
		特点:逐行读入,并形成key(偏移量)-value(行数据),key是偏移量,value是当前行的数据。
	1. 指定一个输入文件
		TextInputFormat.addInputPath(job,new Path("/hdfs文件"));
	2. 指定一个输入目录
		TextInputFormat.addInputPath(job,new Path("/hdfs目录"));
	3. 指定多个输入文件
		job.setInputFormatClass(TextInputFormat.class);
        FileInputFormat.addInputPath(job,new Path("/hdfs/文件1.txt"));
        FileInputFormat.addInputPath(job,new Path("/hdfs/文件2.txt"));
        FileInputFormat.addInputPath(job,new Path("/hdfs/文件3.txt"));
# 3. CombineTextInputFormat
	特点: 将多个小block合并成1个split处理,设置切片大小为10M。
	应用: 海量的小数据文件产生海量小block,合并成大的split,减少split数量,减少MapTask数量,提高MapReduce性能。
    代码:
		job.setInputFormatClass(CombineTextInputFormat.class);
		CombineTextInputFormat.setMaxInputSplitSize(job,10485760);//10M,只要加起来不超过10M的block数据,都会合并成1个split处理。
        FileInputFormat.addInputPath(job,new Path("/hdfs/目录"));

Combiner合并(优化2)

# 案例
# 测试案例(消费记录)
姓名	消费金额
张三	100
王五	200
李四	300
张三	400
王五	500
张三	600
# 期望结果
张三	1100
李四	300
王五	700
# 案例实现思路中存在的效率问题
问题:
   累加的计算任务,几乎全部压在Reduce程序,程序只有1个,压力过大,效率太低。
解决方案:
   核心:将任务尽可能前置
   方案:将累加(Reduce)的操作在Map端提前执行好
   优势:
       ① MapTask本地存放执行结果大大减少。
       ② Reduce下载MapTask执行结果,效率提升。
       ③ ReduceTask归并排序和合并数据操作效率提升了。
       ④ ReduceTask执行数据量大大减少,效率提升。
# Combiner说明
   概念:合并汇总
        MapTask端局部Reduce操作(合并merge、执行Reducer.reduce)
   时机:MapTask输出后,排序之后,执行Combiner操作,之后将结果存放在本地。
   代码:
        job.setCombinerClass(Reducer的类.class);
   应用场景:
        适合:累加、统计总数、排名 (支持可迭代性)
        不适合:平均值。

学新通

ReduceTask并行度(优化3)

# 测试案例:商品浏览日志
日期			域名					商品url					  商品名       pid     驻留时间
2020年3月3日	www.baizhiedu.com	/product/detail/10001.html	iphoneSE	10001	30
2020年3月3日	www.baizhiedu.com	/product/detail/10001.html	iphoneSE	10001	60
2020年3月3日	www.baizhiedu.com	/product/detail/10001.html	iphoneSE	10001	100
2020年3月3日	www.baizhiedu.com	/product/detail/10002.html	xps15	10002	10
2020年3月3日	www.baizhiedu.com	/product/detail/10003.html	thinkpadx390	10003	200
2020年3月3日	www.baizhiedu.com	/product/detail/10004.html	iphoneX	10004	100
2020年3月3日	www.baizhiedu.com	/product/detail/10003.html	thinkpadx390	10003	100
2020年3月3日	www.baizhiedu.com	/product/detail/10001.html	iphoneSE	10001	120
2020年3月4日	www.baizhiedu.com	/product/detail/10001.html	iphoneSE	10001	200
2020年3月5日	www.baizhiedu.com	/product/detail/10001.html	iphoneSE	10001	25
2020年3月6日	www.baizhiedu.com	/product/detail/10001.html	iphoneSE	10001	20

# 期望结果
pid    访问次数
10001	7
10002	1
10003	2
10004	1
学新通
# 提高ReduceTask的数量,提高Reduce的并行度,提高效率
	1. 增加ReduceTask的并行度(数量) ,可以启动多个ReduceTask程序处理mapTask的汇总结果,可以提高效率。
	2. 每个ReduceTask输出结果,都会单独的输出到1个文件。(注意)
# ReduceTask的数量是可以在程序中手动指定
		默认数量为:  1个 Reduce
		修改代码:    job.setNumReduceTasks(数字);  0 就是没有   数字是几就是几个
		            job.setNumReduceTasks(0)
		            job.setNumReduceTasks(2)

默认分区partition

  • 默认分区规则
    学新通

学新通

  • 分区流程(发生时机)

MapReduce分区的整个流程

  1. 当MapTask任务中的mapper.map()输出结果后,会先根据map输出的key判断分区。(默认按照key.hashcode%reduceTasks)
    不同的key-value进入不同的分区。(从此分道扬镳)
  2. 对分区后数据各自做排序。(免去了分区之间数据的比较交换排序操作)
  3. 如果设置Combiner,会自动对各自分区做本地reduce汇总操作。
  4. 将结果输出mapTask机器本地。(分区存放:分区0、分区1)
  5. ReduceTask阶段拷贝MapTask输出结果,按照分区拷贝。
    a: ReduceTask0 从所有MapTask阶段拷贝所有的分区0的数据。(n多个分区0数据)
    b: 合并所有远程拷贝到的分区0的文件数据,排序(归并排序)
    c: 合并当前分区0中的key的value。(merge)[k-v1,v2,v3]
    d: 启动1个执行ReduceTask,输出到文件中。
  6. ReduceTask阶段拷贝MapTask输出结果,按照分区拷贝。
    a: ReduceTask1 从所有MapTask阶段拷贝所有的分区1的数据。(n多个分区1数据)
    b: 合并所有远程拷贝到的分区1的文件数据,排序(归并排序)
    c: 合并当前分区1中的key的value。(merge)[k-v1,v2,v3]
    d: 启动1个执行ReduceTask,输出到文件中。
    5和6 reduce阶段各自处理各自分区的数据

学新通

自定义Partition(优化4)

# 自定义partition
将下面数据分区处理:
人名  科目 成绩
张三	语文	10
李四	数学	30
王五	语文	20
赵6	英语	40
张三	数据	50
李四	语文	10
张三	英语	70
李四	英语	80
王五	英语	45
王五	数学	10
赵6	数学	10
赵6	语文	100
思路:
	1:分区依据要作为key
	2:排序字段也要作为key。
	3:避免合并,key要唯一,不重复(所有key都不一样)
学新通

学新通

思路:通过修改Reduce的个数,设置分区的个数。

自定义分区编码

① 定义分区类

执行时机:

Map输出key-value,后,会调用getPartition方法,决定当前key-value进入哪个分区。
注意:
分区号0开始。
当前key属于那个分区,就返回对应分区的编号。

学新通

② 使用分区类

job.setPartitionerClass(自定义Partitioner.class);

③ 设定reducer个数(开启分区)

job.setNumReduceTasks(数字);//reduceTask数量要和分区数量一样。

MapReduce工作原理(终极版)

Spill溢写

环形缓冲区:
1. map输出的结果k-v会存入环形缓冲区(从start下标开始写,写到80%,则触发溢写程序的线程。环形缓冲区继续写入)
溢写过程:
2. 当环形缓冲区中的数据,达到80%,则开始溢写。(每次写够80%/MapTask处理完毕,就开始溢写。)
	mapred-site.xml中修改mapreduce.map.sort.spill.percent的值。
3. 从缓冲区中读取key-value,自带分区号。
4. 每次溢写,对本次溢写范围内的数据做排序。
5. 如果设置combiner,则执行map端的reduce合并处理(可选,未必有)
6. 将本次溢写的数据(key有序)写入到本地的磁盘上,产生一个溢写文件。
7. 循环2~6,将产生多个溢写文件在本地磁盘中。
mapper处理完毕后
8. 将各个分区中,多次溢写的文件,再进行一次合并排序,每个分区,多次溢写产生的多个有序文件,合并成1个整体key有序的文件。

学新通
学新通

MapReduce排序
1:maptask输出环形缓冲区,缓冲区每次溢写,发生一轮排序。--每次溢写
2:Maptask多次溢写产生的多个溢写文件,要做归并排序(整体排序)---map端本地产生多个溢写文件。
3:ReduceTask汇总多个MapTask的结果文件,归并排序(整体排序)--- reduce下载maptask处理结果产生的多个文件,归并排序。

Shuffle(混洗-洗牌)

过程,人为对MapReduce整体中部分过程,做了称呼。

总结:站在数据角度,k-v从Mapper离开,一直到传给Reducer方法,中间过程,叫做shuffle

  • map阶段
  1. mapper输出结果key-value
    ① 获得ko-vo获得分区号。(Partitioner.getpartion())
    ② 将ko-vo写出到环形缓冲区中。
  2. 一旦环形缓冲区中数据达到溢写条件(80%,写完了),触发溢写的线程2。
    ① 读取环形缓冲区中的数据,本次溢写对应的范围内数据80%;
    ② 根据分区号,分区内排序、(Combiner)
    ③ 将分区内,排序后的key-value数据,写入本地磁盘的文件中。(一次溢写产生一个文件)
    ④ 每次达到溢写条件(80%,写完了),①~③,在mapTask本地磁盘形成溢写文件。(多次溢写,产生多个溢写文件)
    ⑤ 最后在本地完成一次分区内多个溢写文件 归并排序,产生1个文件(maptask处理结果)。

reduce阶段

3. 根据分区号,启动ReduceTask,下载多个MapTask处理结果中的对应分区文件
	MapTaskA(分区0)----ReduceTask0
	MapTaskB(分区0)----ReduceTask0
4. 将当前分区中,来自不同MapTask的分区文件,归并排序。(为了reduce的merge操作效率)
	产生1个大的本分区的文件,且内容key有序。
5. merge操作: 将多个maptsk下载文件合并、排序、分组、合并。
6. 逐步读取k-vs,调用reducer.reduce();

源码分析

  • 启动mapreduce任务 yarn jar xx.jar

    1. 对源文件进行逻辑切片,切分多个splits。
    2. 代码执行从job.job.waitForCompletion(true)开始;
  • MapTask阶段

    入口:MapTask.run()开始

    1. 创建MapTask,创建mapper对象,获得inputformat。
    2. 循环读取key-value,交给mapper.map(key,value)
    3. mapper.map(){context.write(key,value)},进入到环形缓冲区。
      缓冲区放入(key,value,分区号)
  • 溢写过程:

    读key-value
    排序
    combiner(可选)
    写入本地磁盘文件中。
    合并分区内的多个溢写文件。(mapper的所有map执行结束后)
    学新通

reduceTask阶段源码

  1. ReduceTask调用Reducer.reduce方法逻辑
    学新通

全工作流程总结

学新通

  • mapreduce工作流程(终极版)

提交job的准备
1. 创建InputFormat,读取数据
① 获得文件split

MapTask过程
1. InputFormat—LineRecordReader.
② 读取split范围内的数据,k-v。
2. 调用Mapper.run(),来对split范围内数据进行处理
Mapper.run(){
setup(context);
while(xxx){
// 每读取一条key-value,调用一次map方法。
map(key,value,context);
}
cleanup(context);
}
3. mapper输出结果
① 获得ko-vo获得分区号。(Partitioner.getpartion(key,value,num))
② 将ko-vo,连带分区号,一块写出到环形缓冲区中。
4. 一旦环形缓冲区中数据达到溢写条件(80%,写完了)—溢写过程。
① 读取环形缓冲区中的数据(key-value-分区号)
② 根据分区号,分区排序、(Combiner)
③ 将处理结果溢写到磁盘中文件中。
④ 每次达到溢写条件(80%,写完了),①~③,在mapTask本地磁盘形成各分区内的一个溢写文件。(多个溢写文件。)
⑤ 最后mapper处理完毕后边,触发一次溢写,产生一个溢写文件。
⑥ 最后,对MapTask本地,分区内,的多个溢写文件,合并成一个大文件。(发生一次归并排序)

ReduceTask过程:
1. 根据分区号,启动ReduceTask,下载多个MapTask处理结果中的对应分区文件
MapTaskA(分区0)----ReduceTask0
MapTaskB(分区0)----ReduceTask0
2. 将当前分区中,来自不同MapTask的分区文件,归并排序。
产生1个大的本分区的文件,且内容key有序。
3. merge操作,将有序的结果,合并key的value。
4. 循环调用reducer的reduce方法,处理汇总的数据
Reducer.run(){
setup(context);
while(xxx){
reduce.reduce(key,values);
context.write(k,v)
}
cleanup(context);
}
5. ReduceTask调用OutputFormat将结果写入到hdfs文件中。

Shuffle:
# map阶段
3. mapper输出结果
① 获得ko-vo获得分区号。(Partitioner.getpartion(key,value,num))
② 将ko-vo,连带分区号,一块写出到环形缓冲区中。
4. 一旦环形缓冲区中数据达到溢写条件(80%,写完了)—溢写过程。
① 读取环形缓冲区中的数据(key-value-分区号)
② 根据分区号,分区排序、(Combiner)
③ 将处理结果溢写到磁盘中文件中。
④ 每次达到溢写条件(80%,写完了),①~③,在mapTask本地磁盘形成各分区内的一个溢写文件。(多个溢写文件。)
⑤ 最后mapper处理完毕后边,触发一次溢写,产生一个溢写文件。
⑥ 最后,对MapTask本地,分区内,的多个溢写文件,合并成一个大文件。(发生一次归并排序)
# reduce阶段
1. 根据分区号,启动ReduceTask,下载多个MapTask处理结果中的对应分区文件
MapTaskA(分区0)----ReduceTask0
MapTaskB(分区0)----ReduceTask0
2. 将当前分区中,来自不同MapTask的分区文件,归并排序。
产生1个大的本分区的文件,且内容key有序。
3. merge操作,将有序的结果,合并key的value。

源码分析

  • MapTask
public class MapTask{
    // 1. 启动一个新的Mapper程序。
	private <INKEY,INVALUE,OUTKEY,OUTVALUE> void runNewMapper(){
        创建inputFormat
        创建split
        创建mapper
        执行mapper处理数据。mapper.run(mapperContext);
            context.write(key,value)---分区--放入环形缓冲区内。
        关闭输出(NewOutputCollector),准备溢写。output.close(mapperContext);
            1. 排序
            2. combiner
            3. 到文件
    }
    //2. 收集key-value进入环形缓冲区
    public void collect(K key, V value)
        先分区再进入缓冲区
    //3. sortAndSpill() 溢写过程
    
}
学新通

学新通

# mapper对象的代码机制
	1. 每启动MapTask,执行一次runNewMapper方法,创建一个mapper类。
	2. 每读取key-value,调用mapper.map();
# 2. InputFormat
public abstract class InputFormat<K, V>
    //切片方法
	public abstract List<InputSplit> getSplits()...
    //根据split信息返回一个RecordReader(用来读取数据)
	public abstract RecordReader<K,V> createRecordReader(InputSplit split...
public abstract class FileInputFormat<K, V> extends InputFormat<K, V>{
    public List<InputSplit> getSplits(JobContext job){
		long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));//获得最小值 1
    	long maxSize = getMaxSplitSize(job);//获得最大值 LongMax
        ...
        long splitSize = computeSplitSize(blockSize, minSize, maxSize);//获取split的切片大小。对应配置文件(split.minsize)
        ...
        //当文件剩余大小大于split大小的1.1倍时,进行分片
        while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            //获取block块的索引位置
          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
          //分片
          splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                      blkLocations[blkIndex].getHosts(),
                      blkLocations[blkIndex].getCachedHosts()));
          //源文件减去已经分片大小
          bytesRemaining -= splitSize;
}
学新通

学新通

学新通

# 3. ReduceTask
private <INKEY,INVALUE,OUTKEY,OUTVALUE> void runNewReducer(JobConf job...{
    创建reducer
        调用reducer的run
        调用setup 循环调用reduce方法,cleanup
}

学新通

# 4. OutputFormat
public class TextOutputFormat{
    // 将key-value写出到文件中。
    public synchronized void write(K key, V value)
}
# 作业
1. 完成分区代码任务:
1363157985066	13726230503	00-FD-07-A4-72-B8:CMCC	120.196.100.82	24	27	2481	24681	200
1363157995052	13826544101	5C-0E-8B-C7-F1-E0:CMCC	120.197.40.4	4	0	264	0	200
1363157991076	13926435656	20-10-7A-28-CC-0A:CMCC	120.196.100.99	2	4	132	1512	200
1363154400022	13926251106	5C-0E-8B-8B-B1-50:CMCC	120.197.40.4	4	0	240	0	200
1363157985066	13726230503	00-FD-07-A4-72-B8:CMCC	120.196.100.82	24	27	2481	24681	200
1363157995052	13826544101	5C-0E-8B-C7-F1-E0:CMCC	120.197.40.4	4	0	264	0	200
1363157991076	13926435656	20-10-7A-28-CC-0A:CMCC	120.196.100.99	2	4	132	1512	200
1363154400022	13926251106	5C-0E-8B-8B-B1-50:CMCC	120.197.40.4	4	0	240	0	200
	要求:统计每个手机号的总上传流量,总下载流量 总流量。,按照手机区放在不同的文件内。
	(只考虑手机号开头  137 138 139 其他)
2. 整理MapTask ReduceTask Shuffle执行流程===(MapReduce完整流程)
	文字
	图
3. 思考题(非必做)
	张三	10
	李四	20
	王五	30
	悟空	15
	八戒	90
	沙僧	100
	李旭	150
	统计,其中的最大值?
	结果:
	李旭	150
学新通

Yarn分布式集群搭建

# 0-1保证HDFS分布式集群搭建环境确保正确。
1. jps看到如下结果
	NameNode
	DataNode
	SecondaryNameNode
2. 查看hadoop11:50070.
	在datanode标签页看到3个正常的datanode节点信息。
# 0-2关闭所有NameNode节点和DataNode节点
	stop-dfs.sh
# 1:初始化yarn相关配置
1. mapred-site.xml
	<property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>
2. yarn-site.xml
	<property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
    </property>
	<!--配置resourcemanager的主机ip-->
    <property>
        <name>yarn.resourcemanager.hostname</name>
        <value>Hadoop</value>
    </property>
3. slaves
	配置nodemanager(从机)所在的ip。
	配置datanode(从机)所在的ip
# 2:同步该配置到其他节点服务器上。
[root@hadoop11 etc]# scp -r hadoop/ root@hadoop12:/opt/installs/hadoop2.9.2/etc/
[root@hadoop11 etc]# scp -r hadoop/ root@hadoop13:/opt/installs/hadoop2.9.2/etc/
[root@hadoop11 etc]# scp -r hadoop/ root@hadoop14:/opt/installs/hadoop2.9.2/etc/
学新通
# 3:启动yarn集群
# 在namenode所在主机上
1. 启动HDFS集群
	start-dfs.sh
# 在Resourcemanager所在主机上
2. 启动yarn集群
	start-yarn.sh
# 4:验证
1. jps
[root@hadoop11 ~]# jps
    6160 DataNode
    6513 ResourceManager
    6614 NodeManager
    6056 NameNode
    6349 SecondaryNameNode
    6831 Jps
2. 访问yarn的资源调度器web网页。
	http://主节点ResourceManager节点的ip:8088
# 关闭集群
	1. 先关闭yarn
		stop-yarn.sh
	2. 在关闭hdfs
		stop-hdfs.sh

经典案例

TOPN

典型:最火主播、最畅销书、最热门的商品----TopN

需求:获得主播观众人数前3名的信息。
# 原始数据
主播id 观众人数  时长
团团	2345	1000
小黑	67123	2000
哦吼	3456	7000
卢本伟	912345	6000



八戒	1234	5000
悟空	456	4000
唐僧	123345	3000
# 期望结果
卢本伟  912345
唐僧    123345
小黑    67123
学新通
# 方案1 傻×方案
1. 按照观众人数,降序排序。
2. reduce端输出前3个。
# 方案2 牛×方案
1. 在每个MapTask端先各自计算Top3,并只输出top3.
2. Reduce端只需要统计多个MapTask的Top3结果,只输出前3个。

学新通

MapReduce源码 笔记总结

验证MapReduce执行流程

提交job

//4. 启动job
boolean b = job.waitForCompletion(true);
|-
    // 提交job
    submit();
	// 如果参数为true,在监控job的执行,并打印日志。
	monitorAndPrintJob();
|-
    // 提交job
    return submitter.submitJobInternal(Job.this, cluster);
	|- 
        // 为Job 创建Split:对本次job操作的HDFS文件,进行split切片。
     	int maps = writeSplits(job, submitJobDir);
			|- 
                // 使用新API,创建split
      			maps = writeNewSplits(job, jobSubmitDir);
				|-
                    // 反射获得当前job绑定的InputFormat对象:TextInputFormat
    				InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
    				// 根据inputformat,获得split切片。(这里进入FileInputFormat类):
    				// InputSplit:offset length  host
    				List<InputSplit> splits = input.getSplits(job);
 					|- 自此,进入split具体切片操作---->FileInputFormat
		// 真正的提交job。
      	status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
	// 修改job的状态为RUNING
    state = JobState.RUNNING;
学新通

FileInputFormat

 /** 
   * 对文件生成逻辑上的切片Split,对应InputSplit,多个split对应List集合。
   * @param job the job context
   * @throws IOException
   */
  public List<InputSplit> getSplits(JobContext job) throws IOException{
	// 最小值  1,本质上就是【mapreduce.input.fileinputformat.split.minsize】
	long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    // 最大值 LongMax, 本质上对应:【mapreduce.input.fileinputformat.split.maxsize】
    long maxSize = getMaxSplitSize(job);
      ...
    // 创建空的InputSplit的List,一会切一个,放里面放一个Split信息。
    List<InputSplit> splits = new ArrayList<InputSplit>();
     ...
    // 获得blockSize=128MB
    long blockSize = file.getBlockSize();//128MB
    // 获得splitSize=128MB【计算方式:minSize blockSize maxSize 在三者取其中,可以通过调节参数,修改split的大小】
    long splitSize = computeSplitSize(blockSize, minSize, maxSize);
    
    // 循环条件:如果剩余的字节大小 > splitSize的1.1倍。
      while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
          // block的序号
          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
          // 构造一个split(文件路径 start length host 内存host)
          splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                               blkLocations[blkIndex].getHosts(),
                               blkLocations[blkIndex].getCachedHosts()));
          // 切一刀,减去当前split的字节数。
          bytesRemaining -= splitSize;
      }
  }
学新通
# 计算splitSize

学新通

# split逻辑切片的源码

学新通

MapTask

Map任务的启动
MapTask类

// map程序入口
public void run(final JobConf job, final TaskUmbilicalProtocol umbilical){
    // 使用newMapper的API运行MapTask
    runNewMapper(job, splitMetaInfo, umbilical, reporter);
}
private <INKEY,INVALUE,OUTKEY,OUTVALUE> void runNewMapper(final JobConf job,
                    final TaskSplitIndex splitIndex,
                    final TaskUmbilicalProtocol umbilical,
                    TaskReporter reporter
                    ){
    // 创建1一个Mapper对象
    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
      (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
        ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
    // 创建一个输入工具,InputFormat
    org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
      (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
        ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
    // 获得split信息。
    split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
        splitIndex.getStartOffset());
     // 创建一个RecordReader,inputformat中负责读取数据的。//LineRecordReader
    org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
      new NewTrackingRecordReader<INKEY,INVALUE>
        (split, inputFormat, reporter, taskContext);
    
    // 初始化当前split范围的数据读取:开启输入流。
      input.initialize(split, mapperContext);
    
    //启动Mapper的run方法的执行。-------------------------【数据处理核心位置】
    mapper.run(mapperContext);
    // job 状态更新()
    mapPhase.complete();
    
    // 进入 NewOutputCollector?????
    output.close(mapperContext);// 将清空环形缓冲区中的数据,最后溢写一次。
    |-
        // 进入--->MapOutputBuffer:环形缓冲区对象
        collector.flush();
    	|-
            // spill finished
          	resetSpill();
    		// 排序并溢写。下面继续
          	sortAndSpill();
    		// 合并溢写文件。到此MapTask阶段基本结束。
      		mergeParts();
    	// 关闭缓冲区的流
    	collector.close();
}
学新通
  • Mapper输出结果的环形缓冲区
// 一个split切片处理,创建一个Mapper对象
class 自定义Mapper extends Mapper{
	public void run(Context context) throws IOException, InterruptedException{
		//1: 调用setup 一次。:一般用来覆盖后,天加初始化资源操作。---调用1次。
    	setup(context);
    	 // 循环读取 行数据 k(偏移量)-v(行)
          while (context.nextKeyValue()) {
            //2: 每读1行,调用1次map方法。
            map(context.getCurrentKey(), context.getCurrentValue(), context);
          }
          //3: 调用cleanup一次:一般覆盖,重写一些释放资源的代码----调用1次。
     	 cleanup(context);
	}
	
	// 数据处理map方法:被子类覆盖。
	protected void map(KEYIN key, VALUEIN value, 
                     Context context) throws IOException, InterruptedException {
                     }
}

// 自定义Mapper的子类。
static class NameCountMap extends Mapper<LongWritable, Text,Text, IntWritable> {
        /**
         * @param key      输入map数据的key
         * @param value   输入map数据的value
         * @param context map处理完毕后输出的每条数据 包含key-value
         * @throws IOException
         * @throws InterruptedException
         * @do 接收输入的keyvalue转化为输出的keyvalue,交给shuffle分组排序
         */
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException, IOException {
            //局部计算
            String[] names = value.toString().split(" ");
            for (String name : names) {
                context.write(new Text(name),new IntWritable(1));
                // context写出key-value,写入到了环形缓冲区中:NewOutputCollector
            }
        }
    }
//MapTask内部类:环形缓冲区操作类类(从context.write方法进入这里)
private class NewOutputCollector<K,V>
    extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
    // map写出key-value,调用该方法。
    @Override
    public void write(K key, V value){
       // 从Maper写出去:这里用的partition类,默认是HashPartition
      // ----> MapOutputBuffer:缓冲区,暂时存放map的输出key-value
      collector.collect(key, value,
                        partitioner.getPartition(key, value, partitions));
    }
}
//环形缓冲区类
public static class MapOutputBuffer<K extends Object, V extends Object>{
    // 将key value partition 存入环形缓冲区中。
    public synchronized void collect(key,value){
        // bufferRemaining参考:上面 spillper 和 softLimit
        bufferRemaining -= METASIZE;// 环形缓冲区80%容量内,剩余多少。
        // 如果缓冲区80% 用完了,多线程环形溢写线程,启动spill操作。
        if (bufferRemaining <= 0) {
            // spill finished, reclaim space
                resetSpill();
            // 开始溢写。
                startSpill();
            spillLock.unlock();// 唤醒SpillThread.run(),进入SpillTread类的run方法。
        }
        ...
        // 序列化key到缓冲区中。
        int keystart = bufindex;
        keySerializer.serialize(key);
        
        ...
            
        // 序列化value到缓冲区中。
        final int valstart = bufindex;
        valSerializer.serialize(value);
    }
}

学新通
  • 溢写过程
// 溢写操作的线程类:MapTask内部类
protected class SpillThread extends Thread{
    public void run() {
        // 进入一次溢写。
        sortAndSpill();// 一旦唤醒,就开始执行溢写操作。(溢写过程中,进行排序。)
        |--- 进入环形缓冲区操作类的sortAndSpill方法内部
            // 新建一个溢写文件,参数接受了一个分区数。
        	final SpillRecord spillRec = new SpillRecord(partitions);
        	// 快速排序,对环形缓冲区中的元素进行排序
        	sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);
        	// 判断是否需要combine
            if (combinerRunner == null) {
                // 直接写入文件
            }else{
                // 先做combine再溢写。
            }
    }
}
学新通

ReduceTask

public class ReduceTask extends Task{
	public void run(JobConf job, final TaskUmbilicalProtocol umbilical){
        runNewReducer(job, umbilical, reporter, rIter, comparator, 
                    keyClass, valueClass);
    }
    private <INKEY,INVALUE,OUTKEY,OUTVALUE> void runNewReducer(JobConf job,
                     final TaskUmbilicalProtocol umbilical,
                     final TaskReporter reporter,
                     RawKeyValueIterator rIter,
                     RawComparator<INKEY> comparator,
                     Class<INKEY> keyClass,
                     Class<INVALUE> valueClass
                     ) {
        // 根据job绑定的Reducer的类,反射创建出Reducer对象。
    org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =
      (org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)
        ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
        // 进入Reducer的run方法
        reducer.run(reducerContext);
    }
}
学新通

Reducer的方法和子类覆盖的方法
每个reduce任务,创建1个Reducer对象。

/**
   * Advanced application writers can use the 
   * {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
   * control how the reduce task works.
   * ReduceTask处理一个合并结果,调用run
   */
  public void run(Context context) throws IOException, InterruptedException {
    //1. 调用setup方法。 1次。
    setup(context);
    try {
      while (context.nextKey()) {
        // 2:循环读取一组k-vs,调用reduce方法处理:循环调用。
        reduce(context.getCurrentKey(), context.getValues(), context);
        // If a back up store is used, reset it
        Iterator<VALUEIN> iter = context.getValues().iterator();
        if(iter instanceof ReduceContext.ValueIterator) {
          ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();        
        }
      }
    } finally {
      // 3:调用cleanup方法。 reduce方法之后调用一次。
      cleanup(context);
    }
  }
学新通

ReduceTask

public class ReduceTask extends Task{
	public void run(JobConf job, final TaskUmbilicalProtocol umbilical){
        runNewReducer(job, umbilical, reporter, rIter, comparator, 
                    keyClass, valueClass);
    }
    private <INKEY,INVALUE,OUTKEY,OUTVALUE> void runNewReducer(JobConf job,
                     final TaskUmbilicalProtocol umbilical,
                     final TaskReporter reporter,
                     RawKeyValueIterator rIter,
                     RawComparator<INKEY> comparator,
                     Class<INKEY> keyClass,
                     Class<INVALUE> valueClass
                     ) {
        // 根据job绑定的Reducer的类,反射创建出Reducer对象。
    org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =
      (org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)
        ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
        // 进入Reducer的run方法
        reducer.run(reducerContext);
    }
}
学新通

Reducer的方法和子类覆盖的方法
每个reduce任务,创建1个Reducer对象。

/**
   * Advanced application writers can use the 
   * {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
   * control how the reduce task works.
   * ReduceTask处理一个合并结果,调用run
   */
  public void run(Context context) throws IOException, InterruptedException {
    //1. 调用setup方法。 1次。
    setup(context);
    try {
      while (context.nextKey()) {
        // 2:循环读取一组k-vs,调用reduce方法处理:循环调用。
        reduce(context.getCurrentKey(), context.getValues(), context);
        // If a back up store is used, reset it
        Iterator<VALUEIN> iter = context.getValues().iterator();
        if(iter instanceof ReduceContext.ValueIterator) {
          ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();        
        }
      }
    } finally {
      // 3:调用cleanup方法。 reduce方法之后调用一次。
      cleanup(context);
    }
  }
学新通

总结:仰天大笑出门去,我辈岂是蓬蒿人

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhfehiak
系列文章
更多 icon
同类精品
更多 icon
继续加载