• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

HADOOP集群大数据词频统计和设计比较完整教程

武飞扬头像
鸷鸟之不群
帮助1

###如若发现错误,或代码敲错,望能评论指正!!!

通过百度网盘分享的文件:Hadoop相关需要的软件
链接:https://pan.百度.com/s/1XzDvyhP4_LQzAM1auQCSrg?pwd=tph5 
提取码:tph5
 

学新通

VMware下安装CentOS

一、先安装一个虚拟机

安装好后要右键,找到用管理员的方式打开

也可以设置成每次打开都是以管理员身份运行

学新通

二、安装一个CentOS,这里使用的是7版本的

学新通

三、打开VMware,创建新的虚拟机

学新通

学新通

学新通

学新通

学新通

学新通

学新通

学新通

学新通

学新通

来到这个界面可以等待60秒,也可以按下tab键下一步。

学新通

我这里选择英文,各位可以选择中文。

学新通

学新通

学新通

学新通

学新通

学新通

学新通

继续添加

学新通

学新通

学新通

学新通

学新通

学新通

学新通

学新通

学新通

学新通

设置密码

学新通

学新通

学新通

学新通

学新通

学新通

学新通

学新通

到这里就安装好啦

四、ping本地与百度的设置

1.重启服务

service network restart

2.修改配置文件

vi /etc/sysconfig/network-scripts/ifcfg-eth0

如果进入的是空表,用cd进入这里,然后用“ll”查看那个带ifcfg的文件,进入编辑

我这里是ifcfg-ens33

cd /etc/sysconfig/network-scripts
ll

学新通

如果你这里是$符号请用这个命令,使用超级用户,变成#

学新通

学新通

DEVICE是指设备名

HWADDR是指网卡地址

ONBOOT是设置系统启动时是否激活网卡

BOOTPROTO的值可以设置为dhcp、none、bootp、static

dhcp 设置网卡绑定的时候通过DHCP协议的方法来获得地址

none 设置网卡绑定的时候不使用任何协议

bootp 设置网卡绑定的时候使用BOOTP协议

static 设置网卡绑定的时候使用静态协议

将该文件的ONBOOT=no改为ONBOOT=yes,将BOOTPROTO=dhcp改为BOOTPROTO=static

添加IP地址IPADDR、子网掩码NETMASK、网关GATEWAY以及DNS1等信息

这是我未修改的样子(不会如何修改的跟保存的请百度)

学新通

学新通

点击编辑里面的虚拟网络编辑器

学新通

要用管理员身份使用软件才能更改设置

学新通

当使用管理员身份后这里会有三个,选NAT模式

学新通

这里最后一段要选不一样的,最好是3位数

学新通

学新通

学新通

这里两个要一样

学新通

学新通

修改完保存退出后重启一下服务

service network restart

学新通

查看IP

ifconfig

3.ping一下百度跟本地,看一下连接是否通畅

ping www.百度.com
ping 192.168.235.233

学新通

学新通

ctel+c退出ping

4.安装yum源

执行cd /etc/yum.repos.d 进入etc/yum.repos.d目录

cd /etc/yum.repos.d

查看yum.repos.d目录下的文件

学新通

CentOS-Base.repo 是网络的

CentOS-Media.repo是本地的

5.执行重命名命令

如果要用网络下载,就改名全部(改名是为了禁用,需要用的话就把名字改回去)

CentOS-Base.repo CentOS-Debuginfo.repo CentOS-fasttrack.repo CentOS-Vault.repo CentOS-Media.repo

mv CentOS-Media.repo CentOS-Media.repo.bak
vi CentOS-Media.repo.bak

将baseurl的值修改为:file:///media/ ,将gpgcheck的值改为0 ,将enabled的值修改为 1

修改前:

学新通

修改后

学新通

6.挂载

执行以下命令进行挂载

mount /dev/dvd /media   #6.8版本
mount /dev/cdrom /media #7版本

如果没有第一条的话,修改第二条

学新通

学新通

7.更新yum源。

yum clean all

学新通

8.用阿里云安装软件

打开阿里云登录

学新通

学新通

学新通

学新通

curl -o /etc/yum.repos.d/CentOS-Base.repo https://mirrors.aliyun.com/repo/Centos-7.repo
wget -O /etc/yum.repos.d/CentOS-Base.repo https://mirrors.aliyun.com/repo/Centos-7.repo

9.安装Java

使用xshell7

使用ctel alt f打开共享文件

学新通

放到opt目录下

学新通

在命令行安装Java

进入opt目录安装

rpm -ivh jdk-7u80-linux-x64.rpm

学新通

五.搭建Hadoop完全分布式集群

1.把Hadoop安装包hadoop-xxxxx / 上传到虚拟机master的/opt目录下

学新通

然后进入opt目录下输入下面语句,将hadoop安装包解压到虚拟机上

tar -zxf hadoop-xxxx -C /usr/local

Hadoop配置涉及的文件都在/usr/local/hadoop-2.6.4/etc/hadoop/目录下

2.修改core-site.xml文件

vi core-site.xml
  1.  
    <configuration>
  2.  
    <property>
  3.  
    <name>fs.defaultFS</name>
  4.  
    <value>hdfs://master:8020</value>
  5.  
    </property>
  6.  
    <property>
  7.  
    <name>hadoop.tmp.dir</name>
  8.  
    <value>/var/log/hadoop/tmp</value>
  9.  
    </property>
  10.  
    </configuration>

学新通

然后在hadoop目录下创建一个tmp文件夹

学新通

mkdir tmp

学新通

3.修改hadoop-env.sh

vi hadoop-env.sh

学新通

4.修改yarn-env.sh文件

vi yarn-env.sh

学新通

5.复制mapred-site.xml.template 并命名为mapred-site.xml并修改

cp mapred-site.xml.template mapred-site.xml

学新通

修改mapred-site.xml文件

vi mapred-site.xml
  1.  
    <configuration>
  2.  
    <property>
  3.  
      <name>mapreduce.framework.name</name>
  4.  
      <value>yarn</value>
  5.  
    </property>
  6.  
    <!-- jobhistory properties -->
  7.  
    <property>
  8.  
      <name>mapreduce.jobhistory.address</name>
  9.  
      <value>master:10020</value>
  10.  
    </property>
  11.  
    <property>
  12.  
      <name>mapreduce.jobhistory.webapp.address</name>
  13.  
      <value>master:19888</value>
  14.  
    </property>
  15.  
    </configuration>
学新通

学新通

6.修改yarn-site.xml文件

vi yarn-site.xml
  1.  
    <configuration>
  2.  
    <!-- Site specific YARN configuration properties -->
  3.  
    <property>
  4.  
      <name>yarn.resourcemanager.hostname</name>
  5.  
      <value>master</value>
  6.  
    </property>
  7.  
    <property>
  8.  
      <name>yarn.resourcemanager.address</name>
  9.  
      <value>${yarn.resourcemanager.hostname}:8032</value>
  10.  
    </property>
  11.  
    <property>
  12.  
      <name>yarn.resourcemanager.scheduler.address</name>
  13.  
      <value>${yarn.resourcemanager.hostname}:8030</value>
  14.  
    </property>
  15.  
    <property>
  16.  
      <name>yarn.resourcemanager.webapp.address</name>
  17.  
      <value>${yarn.resourcemanager.hostname}:8088</value>
  18.  
    </property>
  19.  
    <property>
  20.  
      <name>yarn.resourcemanager.webapp.https.address</name>
  21.  
      <value>${yarn.resourcemanager.hostname}:8090</value>
  22.  
    </property>
  23.  
    <property>
  24.  
      <name>yarn.resourcemanager.resource-tracker.address</name>
  25.  
      <value>${yarn.resourcemanager.hostname}:8031</value>
  26.  
    </property>
  27.  
    <property>
  28.  
      <name>yarn.resourcemanager.admin.address</name>
  29.  
      <value>${yarn.resourcemanager.hostname}:8033</value>
  30.  
    </property>
  31.  
    <property>
  32.  
      <name>yarn.nodemanager.local-dirs</name>
  33.  
      <value>/data/hadoop/yarn/local</value>
  34.  
    </property>
  35.  
    <property>
  36.  
      <name>yarn.log-aggregation-enable</name>
  37.  
      <value>true</value>
  38.  
    </property>
  39.  
    <property>
  40.  
      <name>yarn.nodemanager.remote-app-log-dir</name>
  41.  
      <value>/data/tmp/logs</value>
  42.  
    </property>
  43.  
    <property>
  44.  
    <name>yarn.log.server.url</name>
  45.  
    <value>http://master:19888/jobhistory/logs/</value>
  46.  
    <description>URL for job history server</description>
  47.  
    </property>
  48.  
    <property>
  49.  
      <name>yarn.nodemanager.vmem-check-enabled</name>
  50.  
      <value>false</value>
  51.  
    </property>
  52.  
    <property>
  53.  
    <name>yarn.nodemanager.aux-services</name>
  54.  
    <value>mapreduce_shuffle</value>
  55.  
    </property>
  56.  
    <property>
  57.  
    <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
  58.  
    <value>org.apache.hadoop.mapred.ShuffleHandler</value>
  59.  
    </property>
  60.  
    <property>
  61.  
    <name>yarn.nodemanager.resource.memory-mb</name>
  62.  
    <value>2048</value>
  63.  
    </property>
  64.  
    <property>
  65.  
    <name>yarn.scheduler.minimum-allocation-mb</name>
  66.  
    <value>512</value>
  67.  
    </property>
  68.  
    <property>
  69.  
    <name>yarn.scheduler.maximum-allocation-mb</name>
  70.  
    <value>4096</value>
  71.  
    </property>
  72.  
    <property>
  73.  
    <name>mapreduce.map.memory.mb</name>
  74.  
    <value>2048</value>
  75.  
    </property>
  76.  
    <property>
  77.  
    <name>mapreduce.reduce.memory.mb</name>
  78.  
    <value>2048</value>
  79.  
    </property>
  80.  
    <property>
  81.  
    <name>yarn.nodmanager.resource.cpu-vcores</name>
  82.  
    <value>1</value>
  83.  
    </property>
  84.  
    </configuration>
学新通

学新通

7.修改slaves文件

vi slaves
  1.  
    slave1
  2.  
    slave2
  3.  
    slave3

学新通

8.修改hdfs-site.xml文件

vi hdfs-site.xml
  1.  
    <configuration>
  2.  
    <property>
  3.  
    <name>dfs.namenode.name.dir</name>
  4.  
    <value>file:///data/hadoop/hdfs/name</value>
  5.  
    </property>
  6.  
    <property>
  7.  
    <name>dfs.datanode.data.dir</name>
  8.  
    <value>file:///data/hadoop/hdfs/data</value>
  9.  
    </property>
  10.  
    <property>
  11.  
    <name>dfs.namenode.secondary.http-address</name>
  12.  
    <value>master:50090</value>
  13.  
    </property>
  14.  
    <property>
  15.  
    <name>dfs.replication</name>
  16.  
    <value>3</value>
  17.  
    </property>
  18.  
    </configuration>
学新通

学新通

9.返回最初目录,进入/etc,修改hosts文件

在最后面添加以下代码,主要符合自己的ip跟主机名

  1.  
    192.168.235.233 master master.centos.com
  2.  
    192.168.235.234 slave1 slave1.centos.com
  3.  
    192.168.235.235 slave2 slave2.centos.com
  4.  
    192.168.235.236 slave3 slave3.centos.com

在自己的电脑上hosts也添加

学新通

学新通

·

10.修改完后关机,进行克隆。

学新通

右击master,管理,克隆

学新通

学新通

学新通

学新通

学新通

11.打开虚拟机slave1

(1)执行以下命令,删除70-persistent-net.rules

rm -rf /etc/udev/rules.d/70-persistent-net.rules

学新通

(2)执行命令:

ifconfig -a 

查看HWADDR,记录HWADDR(每个机器的此值是不一样的)

找不到另一种方法(如果是可视化的)

学新通

学新通

学新通

学新通

(3)修改/etc/sysconfig/network-scripts/ifcfg-eth0文件,修改HWADDR、IPADDR以及注释掉UUID开头的一行代码,并修改HWADDR的内容为实际的地址

vi /etc/sysconfig/network-scripts/ifcfg-eth0

如果进入的是空表,用cd进入这里,然后用“ll”查看那个带ifcfg的文件,进入编辑

我这里是ifcfg-ens33

cd /etc/sysconfig/network-scripts
ll

学新通

如果你这里是$符号请用这个命令,使用超级用户,变成#

学新通

学新通

学新通

图形化界面也可以在这里修改

学新通

修改后重启服务:

service network restart

(4)修改主机名,执行命令

6版本的:

vi /etc/sysconfig/network

修改机器名为 slave1.centos.com

学新通

7版本的:

vi /etc/hostname

修改为 slave1.centos.com

学新通

修改用户名:

(5)使用reboot命令重启虚拟机。

(6)验证slave1是否配置成功。在master下执行ping slave1 如果ping得通,说明配置成功。

注意要master跟slave1两个虚拟机都打开才能互ping

学新通

12.重复(1)~(5)的相关步骤,克隆master到slave2、slave3,并修改slave2、slave3的相关配置。

13.配置SSH免密登录

(1)在master虚拟机上使用ssh-keygen产生公钥(id_rsa.pub)与私钥(id_rsa)两个文件

输入以下命令执行后,接着按三次‘Enter’

ssh-keygen -t rsa

ssh-keygen用来生成RSA类型的密钥以及管理该密钥,参数“ -t ”用于指定要创建的SSH密钥类型为RSA

学新通

三次回车

1.保存密钥

2.不对私钥设口令

3.确认不设口令

(2)用ssh-copy-id将公钥复制到远程机器中(4个都要)

ssh-copy-id -i /root/.ssh/id_rsa.pub master

学新通

学新通

学新通

(3)验证SSH是否能够无密钥登录

在master下分别输入ssh slave1、ssh slave2、ssh slave3,exit 退出

学新通

14.时间同步

(1)在每个节点输入以下代码安装NTP

yum install -y ntp

学新通

学新通

安装成功

(2)设置master节点为NTP服务主节点,那么配置如下

使用如下命令来打开/etc/ntp.conf文件,注释掉server开头的就行,并添以下内容。

vim /etc/ntp.conf
  1.  
    restrict 192.168.0.0 mask 255.255.255.0 nomodify notrap
  2.  
    server 127.127.1.0
  3.  
    fudge 127.127.1.0 stratum 10

学新通

学新通

(3)分别在slave1 slave2 slave3 中配置NTP,同样修改/etc/ntp.conf文件,注释掉server开头的,并添加以下内容

server master

学新通

(4)执行命令,永久性关闭防火墙,主从节点都要关闭

6版本:

service iptables stop & chkconfig iptables off

7版本:

查看防火墙状态

systemctl status firewalld.service

绿的running表示防火墙开启

学新通

执行开机禁用防火墙自启命令 :

 systemctl disable firewalld.service 

执行后当前状态下还是启动的,需要重启才永久关闭。

学新通

重启后查看

学新通

注意:主从节点都要关闭

slave1

学新通

关机后重启查看slave1:

学新通

重复上面步骤把slave2、slave3的防火墙也永久关闭

关于防火墙的其他命令:

执行关闭命令: systemctl stop firewalld.service (开机启动)

再次执行查看防火墙命令:systemctl status firewalld.service

执行开机禁用防火墙自启命令 : systemctl disable firewalld.service (开机不启动)

启动:systemctl start firewalld.service

防火墙随系统开启启动 : systemctl enable firewalld.service

(5)启动ntp服务

①.在master结点执行命令

service ntpd start & chkconfig ntpd on

学新通

查看是否启动成功

systemctl status ntpd

学新通

重启后查看

systemctl status ntpd

学新通

②.在slave1、slave2、slave3上分别执行下面代码,即可同步时间

ntpdate master

slave1:

学新通

slave2:

学新通

slave3:

学新通

③.在slave1、slave2、slave3上分别执行下面命令,即可永久启动NTP服务。

service ntpd start & chkconfig ntpd on

学新通

重启后查看

学新通

15.启动关闭集群

做完Hadoop的所有配置后,即可执行格式化NameNode操作,该操作会在NameNode所在机器初始化一些HDFS的相关配置,并且该操作在集群搭建过程中只需执行一次,执行格式化之前可以先配置环境变量。

(1)配置环境变量,在master、slave1、slave2、slave3上修改 文件,添加以下内容,保存退出后执行source /etc/profile来使配置生效

  1.  
    export HADOOP_HOME=/usr/local/hadoop-2.6.4
  2.  
    export PATH=$HADOOP_HOME/bin:$PATH:/usr/java/jdk1.7.0_80/bin

master:

学新通

学新通

slave1:

学新通

slave2:

学新通

slave3:

学新通

格式化只需执行命令 hdfs namenode -format ,若出现 Storage directory /data/hadoop/hdfs/name has been successfully formatted 提示,则格式化成功。

hdfs namenode -format

学新通

格式化完成之后即可启动集群,启动集群只需要在master节点直接进入Hadoop安装目录,分别执行以下命令即可。

(2)启动命令集群

进入Hadoop安装目录

cd $HADOOP_HOME

学新通

启动HDFS相关服务

sbin/start-dfs.sh

学新通

启动YARN相关服务

sbin/start-yarn.sh

学新通

启动日志相关服务

sbin/mr-jobhistory-daemon.sh start historyserver

学新通

集群启动之后,在主节点master,子节点slave1、slave2、slave3分别执行 jps 命令,出现以下信息,表示集群启动成功。

学新通

slave1

学新通

slave2

学新通

slave3

学新通

(3)关闭集群命令

同理,关闭集群只需要在master节点直接进入Hadoop安装目录,分别执行下面命令即可。

进入Hadoop安装目录

cd $HADOOP_HOME

学新通

关闭YARN相关服务

sbin/stop-yarn.sh

学新通

关闭HDFS相关服务

sbin/stop-dfs.sh

学新通

关闭日志相关服务

sbin/mr-jobhistory-deamon.sh stop historyserver

学新通

(4)创建一键启动程序

(hadoop自带的一键启动关闭)

学新通

在/usr/local/hadoop-2.6.4/sbin 目录下创建一个脚本

vi myfb.sh

里面填写

  1.  
    #!/bin/bash
  2.  
    case $1 in
  3.  
    "start"){
  4.  
    echo "-----------start my hdfs---------------"
  5.  
    $HADOOP_HOME/sbin/start-dfs.sh
  6.  
    echo "-----------my hdfs started-------------"
  7.  
    echo "------------start my yarn--------------"
  8.  
    $HADOOP_HOME/sbin/start-yarn.sh
  9.  
    echo "------------my yarn started-------------"
  10.  
    echo "---------------start history server------------"
  11.  
    $HADOOP_HOME/sbin/mr-jobhistory-daemon.sh start historyserver
  12.  
    echo "---------------my history server started------------"
  13.  
    };;
  14.  
    "stop"){
  15.  
    echo "--------------stop my dfs-----------------"
  16.  
    $HADOOP_HOME/sbin/stop-dfs.sh
  17.  
    echo "--------------my dfs stopped-----------------"
  18.  
    echo "-------------stop yarn---------------"
  19.  
    $HADOOP_HOME/sbin/stop-yarn.sh
  20.  
    echo "-------------my yarn stopped---------------"
  21.  
    echo "-------------------stop history server------------- "
  22.  
    $HADOOP_HOME/sbin/mr-jobhistory-daemon.sh stop historyserver
  23.  
    echo "-------------------my history server stopped------------- "
  24.  
    };;
  25.  
    esac
学新通

学新通

学新通

给权限(给到权限后会变颜色)

学新通

一键启动集群

学新通

一键关闭集群

学新通

16.监控集群

服务                           	Web接口                                    默认端口
NameNode                          http://namenode_host:port/                50070
ResourceManager                   http://resourcemanager_host:port/         8088
MapReduce JobHistory Server       http://jobhistoryserver_host:port/        19888


为了能够顺利在浏览器打开Hadoop集群相关服务的监控界面,需要朽败本地hosts文件。hosts是一个没有扩展名的系统文件,其作用就是将一些常用的网址域名与其对应的IP地址建立一个关联“数据库”,当用户在浏览器中输入一个网址时,系统会首先自动从hosts文件中寻找对应的IP地址。一旦找到,系统会立即打开对应网页。添加下面内容。

192.168.235.233 master master.centos.com
192.168.235.234 slave1 slave1.centos.com
192.168.235.235 slave2 slave2.centos.com
192.168.235.236 slave3 slave3.centos.com


学新通

(1)HDFS监控

在浏览器的地址栏输入“http://master:50070”,按回车键即可看到HDFS的监控界面(需要先启动集群)

http://master:50070
#或者192.168.235.233:50070


①Overview记录了NameNode的启动时间、版本号、编译版本等一些信息。

②Summary是集群信息,提供了当前集群环境的一些有用信息,从图中可知所有DataNode节点的基本存储信息,例如硬盘大小以及有多少被HDFS使用等一些数据信息,同时还标注了当前集群环境中DataNode的信息,对活动状态的DataNode也专门做了记录。

③NameNode Storage提供了NameNode的信息,最后的State标示此节点为活动节点,可正常提供服务。

http://master:50070 HDFS监控1

学新通

如上图所示,选择Utilities >> Browse the file system 菜单命令可以查看HDFS上的文件信息

学新通

在浏览器中访问“http://master:50070/dfshealth.jsp”得到下图所示的页面,

http://master:50070/dfshealth.jsp


学新通

单击“Browse the filesystem”超链接可以打开文件存储目录,如下图所示。

学新通

“NameNode Logs”为用户提供NameNode节点的log信息,如下图所示。

学新通

(2)YARN监控

在浏览器的地址栏输入“http://master:8088”,即可看到YARN的监控界面,如下图所示。

http://master:8088


学新通

(3)日志监控

在浏览器的地址栏输入“http://master:19888”,即可看到日志的监控界面,如下图所示。

http://master:19888


学新通

17HDFS文件处理

可以通过Hadoop dfs –help命令来查看HDFS Shell命令的说明。大部分的HDFS Shell和Linux的shell相似。

一般的shell命令格式为:

bin/hadoop commandgenericOptions

command 是命令

genericOptions 是一般的参数

commandOptions 是命令参数

在hadoop dfs –ls /input 这条命令中,command是dfs,genericOptions对应-ls等参数,commandOptions对应于/input 这个路径参数。

更多相关命令如下:

1.-appendToFile

(1)将本地文件附加到集群 本地文件一个一个地定要存在 集群上可以没有 集群帮用户创建,在/usr/local/hadoop-2.6.4/etc/hadoop路径下编辑一个文件vi bbb.txt 并写下面内容“this is bbb.txt”,然后使用下面命令上传到集群(前提是已启动了集群)。

hdfs dfs -appendToFile /bbb.txt /bbb.txt

学新通

(2)将多个本地文件附加到集群文件

学新通

最后一个路径 是集群文件的路径 其他前面的都是本地文件

查看结果

学新通

2.-cat

查看刚刚上传到集群的文件

hdfs dfs -cat /bbb.txt

学新通

4.-Chmod

修改权限

学新通

4.-copyFromLocal

从本地复制一份到集群上

学新通

5.-copyFromLocal

从集群复制到本地

学新通

6.-get

从集群上获得到本地

学新通

7.-cp

复制

学新通

8.-count

查看文件数量

学新通

9.-df

查看使用空间

学新通

10.-du

查看文件长度(显示hdfs对应路径下每个文件夹和文件的大小)

学新通

11.-moveFromLocal

从本地移动到集群

学新通

12.-rm /-rmdir(删除空文件夹)

删除集群上的文件

学新通

13.- tail

显示文件最后1k内容

学新通

14.-test -d/-e

查看文件或者目录的反馈(这里能正常执行,说明命令是正确的)

学新通

15.-mkdir

创建文件夹

学新通

16.-mv

移动文件

学新通

17.touchz

创建文件

学新通

18.-text

将文本文件或某些格式的非文本文件通过文本格式输出

学新通

19.-stat

返回对应路径的状态信息

%b(文件大小),%o(Block大小),%n(文件名),%r(副本个数),%y(最后一次修改日期和时间)

学新通

20.-put

将本地文件放到hdfs某个目录

学新通

21.-getmerge

把两个文件的内容合并起来(注:合并后的文件位于当前目录,不在hdfs中,是本地文件)

学新通

22.-grep

从hdfs上过滤包含某个字符的行内容

学新通

六、Hadoop实现MapReduce执行wordcount与使用Eclipse创建MapReduce工程执行wordcount

1.Hadoop实现MapReduce执行wordcount

(1)VMware里面启动集群,在根目录下面创建一个文本,里面填写一些文本

学新通

(2)在集群里面创建一个input文件夹

学新通

(3)然后把本地的a.txt文本上传到集群上的input文件夹里面

hdfs dfs -put /a.txt /input/a.txt

(4)输入命令进行词频统计

hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.4.jar wordcount /input /output

学新通

(5)然后就会在集群里面产生一个output文件,里面有两个文本

学新通

(6)执行命令查看词频统计结果

hdfs dfs -cat /output/part-r-00000

注意:误删了output里面的文本的话的不能恢复的,可以删掉原来的output文件,重新创建一个,重新进行词频统计即可。

(7)如果想要做另一个文本的词频统计,需要把后面的output改个名字,就是从新统计的意思

hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.4.jar wordcount /input /output1

学新通

学新通

2.使用Eclipse创建MapReduce工程执行wordcount

(1)添加Hadoop插件

学新通

找到eclipse的安装路径,然后将插件移动到这个路径下

学新通

(2)增加Map/Reduce功能区

打开eclipse进行以下操作

学新通

学新通

(3)增加Hadoop集群的连接

单击下图所示界面右下方的蓝色小象图标(其右上方右 号),就会弹出连接Hadoop集群的配置窗口。

学新通

在VMware的Hadoop集群里输入以下代码,查看hdfs端口号(先启动集群)

hdfs getconf -confKey fs.default.name

学新通

配置namenode节点的ip(自己的虚拟机IP)地址及端口。

学新通

相关的Hadoop集群的连接信息有以下各项。

Location name:命名新建的Hadoop连接名称,如Hadoop Cluster。

Map/Reduce Master:填写Hadoop集群的ResourceManager的IP和端口。

DFS Master:填写Hadoop集群的NameNode的IP地址和连接端口。

填写完以上信息后,单击“Finish”按钮。

(4)浏览HDFS上的目录及文件

在配置完Hadoop集群连接后,确认Hadoop集群已经启动,就可以在Eclipse界面浏览HDFS上的目录及文件,如下图所示。还可以通过鼠标来执行文件操作,例如文件的上传和删除等。需要注意的事,每次执行操作后,需要刷新HDFS列表,从而获得文件目录的最新状态。

学新通

3.新建MapReduce工程

(1)导入MapReduce运行依赖的相关JAR包

在主菜单上单击“Window”并选择“Preferences”,例如下图所示Preference界面中,选择“Hadoop Map/Reduce”,单击“Browse...”按钮,再选中Hadoop的安装文件夹路径(相应版本的Hadoop安装包需要预先解压再本地电脑上)。最后单击“Apply”按钮并单击确定。

学新通

学新通

(2)创建MapReduce工程

从菜单栏开始,单击“File”菜单,选择“New”命令,在出现的选项中单击“Project”项,再选择“Map/Reduce Project”选项。

学新通

学新通

在"MapReduce Project"的创建界面中,填写工程名“MemberCount”,然后单击“Finish”按钮。

学新通

在主界面左侧的“Project Explorer”栏,可以看到已经创建好的工程MemberCount,Map Reduce编程所需要的JAR包已经全部自动导入。新工程已创建完成,接下来就可以正式进行MapReduce编程工作了。

学新通

创建完成MapReduce工程

4.编写wordcount的代码 ,描述他是怎么工作的。并且上传到集群完成一次完整的Wordcount

(1)创建一个class文件并命名为WordCount

学新通

(2)内容代码与解析如下

①应用程序Driver分析

这里的Driver程序主要指的是main函数,在main函数里面进行MapReduce程序的一些初始化设置,并提交任务,等待程序运行完成。总结为MapReduce任务初始化的通用代码。

  1.  
    public static void main(String[] args) throws Exception {
  2.  
    Configuration conf = new Configuration();
  3.  
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  4.  
    if (otherArgs.length < 2) {
  5.  
    System.err.println("Usage: wordcount <in> [<in>...] <out>");
  6.  
    System.exit(2);
  7.  
    }
  8.  
    Job job = new Job(conf, "word count");
  9.  
    job.setJarByClass(WordCount.class);
  10.  
    job.setMapperClass(TokenizerMapper.class);
  11.  
    job.setCombinerClass(IntSumReducer.class);
  12.  
    job.setReducerClass(IntSumReducer.class);
  13.  
    job.setOutputKeyClass(Text.class);
  14.  
    job.setOutputValueClass(IntWritable.class);
  15.  
    for (int i = 0; i < otherArgs.length - 1; i) {
  16.  
    FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
  17.  
    }
  18.  
    FileOutputFormat.setOutputPath(job,
  19.  
    new Path(otherArgs[otherArgs.length - 1]));
  20.  
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  21.  
    }
学新通

②Mapper模式分析

在MapReduce程序中,最重要的代码实现就是Mapper模块中的map函数以及Reducer模块中的reduce函数。这里先看Mapper,也就是源码中的TokenizerMapper

  1.  
    public static class TokenizerMapper
  2.  
    extends Mapper<Object, Text, Text, IntWritable>{
  3.  
     
  4.  
    private final static IntWritable one = new IntWritable(1);
  5.  
    private Text word = new Text();
  6.  
     
  7.  
    public void map(Object key, Text value, Context context
  8.  
    ) throws IOException, InterruptedException {
  9.  
    StringTokenizer itr = new StringTokenizer(value.toString());
  10.  
    while (itr.hasMoreTokens()) {
  11.  
    word.set(itr.nextToken());
  12.  
    context.write(word, one);
  13.  
    }
  14.  
    }
  15.  
    }
学新通

③Reducer模式分析

继续分析Reduce,即源码中的IntSumReducer

  1.  
    public static class IntSumReducer
  2.  
    extends Reducer<Text,IntWritable,Text,IntWritable> {
  3.  
    private IntWritable result = new IntWritable();
  4.  
     
  5.  
    public void reduce(Text key, Iterable<IntWritable> values,
  6.  
    Context context
  7.  
    ) throws IOException, InterruptedException {
  8.  
    int sum = 0;
  9.  
    for (IntWritable val : values) {
  10.  
    sum = val.get();
  11.  
    }
  12.  
    result.set(sum);
  13.  
    context.write(key, result);
  14.  
    }
  15.  
    }
学新通

这是wordcount的完整代码

  1.  
    /**
  2.  
    * Licensed to the Apache Software Foundation (ASF) under one
  3.  
    * or more contributor license agreements. See the NOTICE file
  4.  
    * distributed with this work for additional information
  5.  
    * regarding copyright ownership. The ASF licenses this file
  6.  
    * to you under the Apache License, Version 2.0 (the
  7.  
    * "License"); you may not use this file except in compliance
  8.  
    * with the License. You may obtain a copy of the License at
  9.  
    *
  10.  
    * http://www.apache.org/licenses/LICENSE-2.0
  11.  
    *
  12.  
    * Unless required by applicable law or agreed to in writing, software
  13.  
    * distributed under the License is distributed on an "AS IS" BASIS,
  14.  
    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  
    * See the License for the specific language governing permissions and
  16.  
    * limitations under the License.
  17.  
    */
  18.  
     
  19.  
    import java.io.IOException;
  20.  
    import java.util.StringTokenizer;
  21.  
    import org.apache.hadoop.conf.Configuration;
  22.  
    import org.apache.hadoop.fs.Path;
  23.  
    import org.apache.hadoop.io.IntWritable;
  24.  
    import org.apache.hadoop.io.Text;
  25.  
    import org.apache.hadoop.mapreduce.Job;
  26.  
    import org.apache.hadoop.mapreduce.Mapper;
  27.  
    import org.apache.hadoop.mapreduce.Reducer;
  28.  
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  29.  
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  30.  
    import org.apache.hadoop.util.GenericOptionsParser;
  31.  
     
  32.  
    public class WordCount {
  33.  
     
  34.  
    public static class TokenizerMapper
  35.  
    extends Mapper<Object, Text, Text, IntWritable>{
  36.  
     
  37.  
    private final static IntWritable one = new IntWritable(1);
  38.  
    private Text word = new Text();
  39.  
     
  40.  
    public void map(Object key, Text value, Context context
  41.  
    ) throws IOException, InterruptedException {
  42.  
    StringTokenizer itr = new StringTokenizer(value.toString());
  43.  
    while (itr.hasMoreTokens()) {
  44.  
    word.set(itr.nextToken());
  45.  
    context.write(word, one);
  46.  
    }
  47.  
    }
  48.  
    }
  49.  
     
  50.  
    public static class IntSumReducer
  51.  
    extends Reducer<Text,IntWritable,Text,IntWritable> {
  52.  
    private IntWritable result = new IntWritable();
  53.  
     
  54.  
    public void reduce(Text key, Iterable<IntWritable> values,
  55.  
    Context context
  56.  
    ) throws IOException, InterruptedException {
  57.  
    int sum = 0;
  58.  
    for (IntWritable val : values) {
  59.  
    sum = val.get();
  60.  
    }
  61.  
    result.set(sum);
  62.  
    context.write(key, result);
  63.  
    }
  64.  
    }
  65.  
     
  66.  
    public static void main(String[] args) throws Exception {
  67.  
    Configuration conf = new Configuration();
  68.  
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  69.  
    if (otherArgs.length < 2) {
  70.  
    System.err.println("Usage: wordcount <in> [<in>...] <out>");
  71.  
    System.exit(2);
  72.  
    }
  73.  
    Job job = new Job(conf, "word count");
  74.  
    job.setJarByClass(WordCount.class);
  75.  
    job.setMapperClass(TokenizerMapper.class);
  76.  
    job.setCombinerClass(IntSumReducer.class);
  77.  
    job.setReducerClass(IntSumReducer.class);
  78.  
    job.setOutputKeyClass(Text.class);
  79.  
    job.setOutputValueClass(IntWritable.class);
  80.  
    for (int i = 0; i < otherArgs.length - 1; i) {
  81.  
    FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
  82.  
    }
  83.  
    FileOutputFormat.setOutputPath(job,
  84.  
    new Path(otherArgs[otherArgs.length - 1]));
  85.  
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  86.  
    }
  87.  
    }
学新通

④以下为待会任务需要修改后的完整代码

注意class名称与包名是否一致

  1.  
    import java.io.IOException;
  2.  
    import org.apache.hadoop.conf.Configuration;
  3.  
    import org.apache.hadoop.fs.Path;
  4.  
    import org.apache.hadoop.io.IntWritable;
  5.  
    import org.apache.hadoop.io.Text;
  6.  
    import org.apache.hadoop.mapreduce.Job;
  7.  
    import org.apache.hadoop.mapreduce.Mapper;
  8.  
    import org.apache.hadoop.mapreduce.Reducer;
  9.  
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  10.  
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  11.  
    import org.apache.hadoop.util.GenericOptionsParser;
  12.  
     
  13.  
    public class dailyAccessCount {
  14.  
     
  15.  
    public static class MyMapper
  16.  
    extends Mapper<Object, Text, Text, IntWritable>{
  17.  
     
  18.  
    private final static IntWritable one = new IntWritable(1);
  19.  
     
  20.  
    public void map(Object key, Text value, Context context
  21.  
    ) throws IOException, InterruptedException {
  22.  
    String line = value.toString();
  23.  
    String array[] = line.split(","); //指定空格为分隔符,组成数组
  24.  
    String keyOutput = array[1]; //提取数组中的访问日期做为Key
  25.  
    context.write(new Text(keyOutput), one); //组成键值对
  26.  
    }
  27.  
    }
  28.  
     
  29.  
    public static class MyReducer
  30.  
    extends Reducer<Text,IntWritable,Text,IntWritable> {
  31.  
    private IntWritable result = new IntWritable();
  32.  
     
  33.  
    public void reduce(Text key, Iterable<IntWritable> values,
  34.  
    Context context
  35.  
    ) throws IOException, InterruptedException {
  36.  
    int sum = 0; //定义累加器,初始值为0
  37.  
    for (IntWritable val : values) {
  38.  
    sum = val.get(); // 将相同键的所有值进行累加
  39.  
    }
  40.  
    result.set(sum);
  41.  
    context.write(key,result);
  42.  
    }
  43.  
    }
  44.  
     
  45.  
    public static void main(String[] args) throws Exception {
  46.  
    Configuration conf = new Configuration();
  47.  
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  48.  
    if (otherArgs.length < 2) {
  49.  
    System.err.println("Usage: wordcount <in> [<in>...] <out>");
  50.  
    System.exit(2);
  51.  
    }
  52.  
    Job job = new Job(conf, "Daily Access Count");
  53.  
    job.setJarByClass(dailyAccessCount.class);
  54.  
    job.setMapperClass(MyMapper.class);
  55.  
    job.setReducerClass(MyReducer.class);
  56.  
    job.setMapOutputKeyClass(Text.class);
  57.  
    job.setMapOutputValueClass(IntWritable.class);
  58.  
     
  59.  
    job.setOutputKeyClass(Text.class);
  60.  
    job.setOutputValueClass(IntWritable.class);
  61.  
    for (int i = 0; i < otherArgs.length - 1; i) {
  62.  
    FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
  63.  
    }
  64.  
    FileOutputFormat.setOutputPath(job,
  65.  
    new Path(otherArgs[otherArgs.length - 1]));
  66.  
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  67.  
    }
  68.  
    }
学新通

⑤注意:确保自己电脑上的java版本跟Hadoop上的版本一致,我这里的是jdk-7u80-windows-x64(1.7)

写完代码后,把项目代码调整为1.7版本

学新通

学新通

⑥编译生成JAR包文件,并提交Hadoop集群执行

学新通

学新通

学新通

学新通

学新通

学新通

学新通

学新通

学新通

上传到Hadoop集群服务器节点

这里我放到这个路径 /usr/local/hadoop-2.6.4

学新通

需要做统计的文本也上传到Hadoop,并上传到集群(130万行数据的文本)

学新通

学新通

在Hadoop集群服务器的终端,以hadoop jar命令提交任务

注意输出的文件名每次不能相同

hadoop jar $HADOOP_HOME/wordcount1.jar /user/root/user_login.txt /user/root/AccessCount

学新通

学新通

可以看到数据量的达到130万行的量级

做10次

学新通

学新通

学新通

⑦Combiner

Combiner是一个运行在Map端的“迷你Reduce”过程,它只处理单台机器生成的数据。声明的Combiner继承的是Reduce,其方法实现原理和Reduce的实现原理基本相同,不同的是,Combiner操作发生在Map端,或者说Combiner运行在每一个运行Map任务的节点上。它会接收特定节点上的Map输出作为输入,对Map输出的数据先做一次合并,再把结果发送到Reducer。需要注意的是,Combiner的加入不影响原逻辑,即Combiner不影响最终运行结果,影响的只是效率。

值得一提的是,并非所有的MapReduce程序都可以加入Combiner,仅当Reduce输入的键值对类型与Reduce输出的键值对类型一样,并且计算逻辑不影响最终计算结果时,才可以在MapReduce程序中加入Combiner。

前面讲过Combiner继承的是Reducer,所以声明Combiner的时候必须继承Reducer,在Combiner类里面重写Reduce方法。

除了声明Combiner类外,还需要在驱动类里面配置Combiner类

job.setCombinerClass(LogCountCombiner.class);


有时候甚至不必特意声明一个Combiner类。当Combiner和Reduce的实现逻辑相同的时候,可以不用声明Combiner类,而在驱动类里面添加代码

job.setCombinerClass(IntSumReducer.class);


完整代码如下:(注意修改包名和类名是否相同)

  1.  
    package test;
  2.  
    import java.io.IOException;
  3.  
    import org.apache.hadoop.conf.Configuration;
  4.  
    import org.apache.hadoop.fs.Path;
  5.  
    import org.apache.hadoop.io.IntWritable;
  6.  
    import org.apache.hadoop.io.Text;
  7.  
    import org.apache.hadoop.mapreduce.Job;
  8.  
    import org.apache.hadoop.mapreduce.Mapper;
  9.  
    import org.apache.hadoop.mapreduce.Reducer;
  10.  
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  11.  
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  12.  
    import org.apache.hadoop.util.GenericOptionsParser;
  13.  
     
  14.  
    public class dailyAccessCount {
  15.  
     
  16.  
    public static class MyMapper
  17.  
    extends Mapper<Object, Text, Text, IntWritable>{
  18.  
     
  19.  
    private final static IntWritable one = new IntWritable(1);
  20.  
     
  21.  
    public void map(Object key, Text value, Context context
  22.  
    ) throws IOException, InterruptedException {
  23.  
    String line = value.toString();
  24.  
    String array[] = line.split(","); //指定空格为分隔符,组成数组
  25.  
    String keyOutput = array[1]; //提取数组中的访问日期做为Key
  26.  
    context.write(new Text(keyOutput), one); //组成键值对
  27.  
    }
  28.  
    }
  29.  
     
  30.  
    public static class MyReducer
  31.  
    extends Reducer<Text,IntWritable,Text,IntWritable> {
  32.  
    private IntWritable result = new IntWritable();
  33.  
     
  34.  
    public void reduce(Text key, Iterable<IntWritable> values,
  35.  
    Context context
  36.  
    ) throws IOException, InterruptedException {
  37.  
    int sum = 0; //定义累加器,初始值为0
  38.  
    for (IntWritable val : values) {
  39.  
    sum = val.get(); // 将相同键的所有值进行累加
  40.  
    }
  41.  
    result.set(sum);
  42.  
    context.write(key,result);
  43.  
    }
  44.  
    }
  45.  
     
  46.  
    public static void main(String[] args) throws Exception {
  47.  
    Configuration conf = new Configuration();
  48.  
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  49.  
    if (otherArgs.length < 2) {
  50.  
    System.err.println("Usage: wordcount <in> [<in>...] <out>");
  51.  
    System.exit(2);
  52.  
    }
  53.  
    Job job = new Job(conf, "Daily Access Count");
  54.  
    job.setJarByClass(dailyAccessCount.class);
  55.  
    job.setMapperClass(MyMapper.class);
  56.  
    job.setReducerClass(MyReducer.class);
  57.  
    job.setMapOutputKeyClass(Text.class);
  58.  
    job.setMapOutputValueClass(IntWritable.class);
  59.  
    job.setCombinerClass(MyReducer.class); //Combiner逻辑与Reducer相同时配置Combiner类
  60.  
    job.setOutputKeyClass(Text.class);
  61.  
    job.setOutputValueClass(IntWritable.class);
  62.  
    for (int i = 0; i < otherArgs.length - 1; i) {
  63.  
    FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
  64.  
    }
  65.  
    FileOutputFormat.setOutputPath(job,
  66.  
    new Path(otherArgs[otherArgs.length - 1]));
  67.  
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  68.  
    }
  69.  
    }
  70.  
     
学新通

或者

  1.  
    /**
  2.  
    * Licensed to the Apache Software Foundation (ASF) under one
  3.  
    * or more contributor license agreements. See the NOTICE file
  4.  
    * distributed with this work for additional information
  5.  
    * regarding copyright ownership. The ASF licenses this file
  6.  
    * to you under the Apache License, Version 2.0 (the
  7.  
    * "License"); you may not use this file except in compliance
  8.  
    * with the License. You may obtain a copy of the License at
  9.  
    *
  10.  
    * http://www.apache.org/licenses/LICENSE-2.0
  11.  
    *
  12.  
    * Unless required by applicable law or agreed to in writing, software
  13.  
    * distributed under the License is distributed on an "AS IS" BASIS,
  14.  
    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  
    * See the License for the specific language governing permissions and
  16.  
    * limitations under the License.
  17.  
    */
  18.  
    package org.apache.hadoop.examples; //
  19.  
     
  20.  
    import java.io.IOException; //数据的输入与输出
  21.  
    import java.util.StringTokenizer; //String字符处理
  22.  
     
  23.  
    import org.apache.hadoop.conf.Configuration; //基础conf
  24.  
    import org.apache.hadoop.fs.Path; //文件系统
  25.  
    import org.apache.hadoop.io.IntWritable;
  26.  
    import org.apache.hadoop.io.Text; //数据输入输出(IO)
  27.  
    import org.apache.hadoop.mapreduce.Job;
  28.  
    import org.apache.hadoop.mapreduce.Mapper;
  29.  
    import org.apache.hadoop.mapreduce.Reducer; //MapReduce(对数据的拆分,对数据的归类)
  30.  
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  31.  
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  32.  
    import org.apache.hadoop.util.GenericOptionsParser;
  33.  
     
  34.  
    public class WordCount {
  35.  
     
  36.  
    public static class TokenizerMapper
  37.  
    extends Mapper /*泛型类*/
  38.  
    <Object, Text, Text, IntWritable>{
  39.  
     
  40.  
    private final static IntWritable one = new IntWritable(1); //写整型的时候,初值为1
  41.  
    private Text word = new Text(); //文本对象word
  42.  
     
  43.  
    public void map(Object key, Text value, Context context
  44.  
    ) throws IOException, InterruptedException {
  45.  
    StringTokenizer itr = new StringTokenizer(value.toString());
  46.  
     
  47.  
    /*字符串分隔,把单词拆出来
  48.  
    Hello Hadoop Hello world
  49.  
     
  50.  
    hello 1 hadoop 1 hello 1 world 1
  51.  
    (用空格替换逗号跟句号)或者重写Tokenizer,把逗号句号替换添加进去
  52.  
    */
  53.  
     
  54.  
    while (itr.hasMoreTokens()) { //做一个循环判断,是否还有Token(分隔符)
  55.  
    word.set(itr.nextToken()); //如果有,就再读一个,从当前分隔符读到下一个分//隔符,实际就是读了一个单词
  56.  
    context.write(word, one); //输出这个单词
  57.  
    }
  58.  
    }
  59.  
    }
  60.  
     
  61.  
    public static class IntSumReducer
  62.  
    extends Reducer<Text,IntWritable,Text,IntWritable> {
  63.  
    private IntWritable result = new IntWritable();
  64.  
     
  65.  
    public void reduce(Text key, Iterable<IntWritable> values,
  66.  
    Context context
  67.  
    ) throws IOException, InterruptedException {
  68.  
    int sum = 0;
  69.  
    for (IntWritable val : values) {
  70.  
    sum = val.get();
  71.  
    }
  72.  
    result.set(sum);
  73.  
    context.write(key, result); //输出结果 hello 2 hadoop 1 world 1
  74.  
    }
  75.  
    }
  76.  
     
  77.  
    public static void main(String[] args) throws Exception {
  78.  
    Configuration conf = new Configuration(); //实例化conf(读取命令行的命令,以数组的方式返回路径)
  79.  
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  80.  
    if (otherArgs.length < 2) {
  81.  
    System.err.println("Usage: wordcount <in> [<in>...] <out>");
  82.  
    System.exit(2);
  83.  
    } //判断IO的路径,如果只有一个路径,长度是小于2的,你的输出(1)包含两//个(输入和输出IO),防止出现读取的异常。
  84.  
    Job job = new Job(conf, "word count"); //所有的工作都是用Job来完成的,实例化Job
  85.  
    job.setJarByClass(WordCount.class); //打jar包
  86.  
    job.setMapperClass(TokenizerMapper.class); //指定Map类型
  87.  
    job.setCombinerClass(IntSumReducer.class); //指定Combiner
  88.  
    job.setReducerClass(IntSumReducer.class); //指定Reduce的类型
  89.  
    job.setOutputKeyClass(Text.class); //设置最终key的类型
  90.  
    job.setOutputValueClass(IntWritable.class); //设置最终value的类型
  91.  
     
  92.  
    for (int i = 0; i < otherArgs.length - 1; i) {
  93.  
    FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
  94.  
    }
  95.  
    FileOutputFormat.setOutputPath(job,
  96.  
    new Path(otherArgs[otherArgs.length - 1]));
  97.  
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  98.  
    } //任务提交
  99.  
    }
学新通

同样需要进行打包

学新通

设置接口

学新通

学新通

上传到集群

学新通

继续做在Hadoop集群服务器的终端,以hadoop jar命令提交任务(本次添加了Combiner)

注意输出的文件名每次不能相同

做10次

hadoop jar $HADOOP_HOME/wordcount2.jar /user/root/user_login.txt /user/root/AccessCount

学新通

这里可以看到所使用的文本跟上面没加Combiner的文本是同一个,所以数据量是130万行

学新通

学新通

学新通

学新通

学新通

总结:wordcount加了combiner的代码后,运行效率提高了。

代码总结:

  1.  
    /**
  2.  
    * Licensed to the Apache Software Foundation (ASF) under one
  3.  
    * or more contributor license agreements. See the NOTICE file
  4.  
    * distributed with this work for additional information
  5.  
    * regarding copyright ownership. The ASF licenses this file
  6.  
    * to you under the Apache License, Version 2.0 (the
  7.  
    * "License"); you may not use this file except in compliance
  8.  
    * with the License. You may obtain a copy of the License at
  9.  
    *
  10.  
    * http://www.apache.org/licenses/LICENSE-2.0
  11.  
    *
  12.  
    * Unless required by applicable law or agreed to in writing, software
  13.  
    * distributed under the License is distributed on an "AS IS" BASIS,
  14.  
    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  
    * See the License for the specific language governing permissions and
  16.  
    * limitations under the License.
  17.  
    */
  18.  
    package org.apache.hadoop.examples; //
  19.  
     
  20.  
    import java.io.IOException; //数据的输入与输出
  21.  
    import java.util.StringTokenizer; //String字符处理
  22.  
     
  23.  
    import org.apache.hadoop.conf.Configuration; //基础conf
  24.  
    import org.apache.hadoop.fs.Path; //文件系统
  25.  
    import org.apache.hadoop.io.IntWritable;
  26.  
    import org.apache.hadoop.io.Text; //数据输入输出(IO)
  27.  
    import org.apache.hadoop.mapreduce.Job;
  28.  
    import org.apache.hadoop.mapreduce.Mapper;
  29.  
    import org.apache.hadoop.mapreduce.Reducer; //MapReduce(对数据的拆分,对数据的归类)
  30.  
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  31.  
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  32.  
    import org.apache.hadoop.util.GenericOptionsParser;
  33.  
     
  34.  
    public class WordCount {
  35.  
     
  36.  
    //在MapReduce程序中,最重要的代码实现就是Mapper模块中的map函数以及Reducer模块中的reduce函数。这里先看Mapper,也就是源码中的TokenizerMapper
  37.  
    public static class TokenizerMapper
  38.  
    extends Mapper /*泛型类*/
  39.  
    <Object, Text, Text, IntWritable>{
  40.  
     
  41.  
    private final static IntWritable one = new IntWritable(1); //写整型的时候,初值为1
  42.  
    private Text word = new Text(); //文本对象word
  43.  
     
  44.  
    public void map(Object key, Text value, Context context
  45.  
    ) throws IOException, InterruptedException {
  46.  
    StringTokenizer itr = new StringTokenizer(value.toString());
  47.  
     
  48.  
    /*字符串分隔,把单词拆出来
  49.  
    Hello Hadoop Hello world
  50.  
     
  51.  
    hello 1 hadoop 1 hello 1 world 1
  52.  
    (用空格替换逗号跟句号)或者重写Tokenizer,把逗号句号替换添加进去
  53.  
    */
  54.  
     
  55.  
    while (itr.hasMoreTokens()) { //做一个循环判断,是否还有Token(分隔符)
  56.  
    word.set(itr.nextToken()); //如果有,就再读一个,从当前分隔符读到下一个分//隔符,实际就是读了一个单词
  57.  
    context.write(word, one); //输出这个单词
  58.  
    }
  59.  
    }
  60.  
    }
  61.  
     
  62.  
    //继续分析Reduce,即源码中的IntSumReducer
  63.  
    public static class IntSumReducer
  64.  
    extends Reducer<Text,IntWritable,Text,IntWritable> {
  65.  
    private IntWritable result = new IntWritable();
  66.  
     
  67.  
    public void reduce(Text key, Iterable<IntWritable> values,
  68.  
    Context context
  69.  
    ) throws IOException, InterruptedException {
  70.  
    int sum = 0;
  71.  
    for (IntWritable val : values) {
  72.  
    sum = val.get();
  73.  
    }
  74.  
    result.set(sum);
  75.  
    context.write(key, result); //输出结果 hello 2 hadoop 1 world 1
  76.  
    }
  77.  
    }
  78.  
     
  79.  
    //这里的Driver程序主要指的是main函数,在main函数里面进行MapReduce程序的一些初始化设置,并提交任务,等待程序运行完成。总结为MapReduce任务初始化的通用代码。
  80.  
    public static void main(String[] args) throws Exception {
  81.  
    Configuration conf = new Configuration(); //实例化conf(读取命令行的命令,以数组的方式返回路径)
  82.  
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  83.  
    if (otherArgs.length < 2) {
  84.  
    System.err.println("Usage: wordcount <in> [<in>...] <out>");
  85.  
    System.exit(2);
  86.  
    } //判断IO的路径,如果只有一个路径,长度是小于2的,你的输出(1)包含两//个(输入和输出IO),防止出现读取的异常。
  87.  
    Job job = new Job(conf, "word count"); //所有的工作都是用Job来完成的,实例化Job
  88.  
    job.setJarByClass(WordCount.class); //打jar包
  89.  
    job.setMapperClass(TokenizerMapper.class); //指定Map类型
  90.  
     
  91.  
    //有时候甚至不必特意声明一个Combiner类。当Combiner和Reduce的实现逻辑相同的时候,可以不用声明Combiner类,而在驱动类里面添加代码
  92.  
    job.setCombinerClass(IntSumReducer.class); //指定Combiner
  93.  
     
  94.  
    job.setReducerClass(IntSumReducer.class); //指定Reduce的类型
  95.  
    job.setOutputKeyClass(Text.class); //设置最终key的类型
  96.  
    job.setOutputValueClass(IntWritable.class); //设置最终value的类型
  97.  
     
  98.  
    for (int i = 0; i < otherArgs.length - 1; i) {
  99.  
    FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
  100.  
    }
  101.  
    FileOutputFormat.setOutputPath(job,
  102.  
    new Path(otherArgs[otherArgs.length - 1]));
  103.  
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  104.  
    } //任务提交
  105.  
    }
学新通
  1.  
    /***
  2.  
    * ,%%%%%%%%,
  3.  
    * ,%%/\%%%%/\%%
  4.  
    * ,%%%\c "" J/%%%
  5.  
    * %. %%%%/ o o \%%%
  6.  
    * `%%. %%%% _ |%%%
  7.  
    * `%% `%%%%(__Y__)%%'
  8.  
    * // ;%%%%`\-/%%%'
  9.  
    * (( / `%%%%%%%'
  10.  
    * \\ .' |
  11.  
    * \\ / \ | |
  12.  
    * \\/ ) | |
  13.  
    * \ /_ | |__
  14.  
    * (___________))))))) 攻城湿
  15.  
    */
  16.  
     
学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhghbcbg
系列文章
更多 icon
同类精品
更多 icon
继续加载