Hadoop - MapReduce
通过MapReduce框架,我们可以编写应用程序在商用机器集群上以可靠的方式并行处理大量的数据。
MapReduce是什么?
MapReduce是基于java的分布式计算程序模型和处理技术。 MapReduce算法包含两个重要的任务,即Map和Reduce。 Map接受一组数据并将其转换为另一组数据,这些独立的元素分解成元组(键/值对)。 然后是reduce任务,它接受map的输出作为输入,并将这些数据元组组成一组更小的元组。 就像MapReduce的名字所暗示的那样,reduce任务总是在map之后执行。
MapReduce的主要优势是,它很容易在多个计算节点上作大规模的数据处理。 在MapReduce模式下,数据处理原语被称为mappers和reducers。 将数据处理应用程序分解为mappers和reducers有时是不容易的。 但是,一旦我们以MapReduce形式编写应用程序,那么扩展应用程序,让它运行在成百上千,甚至上万的机器集群中只是一个修改配置的问题。 正是这一点可伸缩性吸引了许多程序员使用MapReduce模型。
算法
-
通常MapReduce范例基于将计算实体发送到数据所在的地方。
-
MapReduce程序执行分三个阶段,即map阶段, shuffle阶段,和reduce阶段。
map阶段 :map或mapper的工作是处理输入数据。 一般输入数据是以文件或目录的形式存在,存储在hadoop文件系统(HDFS)。 输入文件逐行传递给mapper函数。mapper处理数据并创建一些小数据块。
reduce阶段 :这个阶段是Shuffle 阶段和 Reduce阶段的组合。Reducer的工作是处理来自于mapper的数据。 处理完成后,生成一组新的输出存储到HDFS中。
-
MapReduce任务期间,Hadoop 发送Map和Reduce任务给集群中相应的服务器。
-
该框架管理有关数据传递的所有细节,如发布任务,验证任务完成,在集群的节点之间复制数据。
-
大部分的计算发生在本地节点,这些节点在磁盘中存储有数据,这减少了网络流量。
-
给定的任务完成后,由集群归集数据,产生一个适当的结果,并将其发送回Hadoop服务器。
输入和输出(Java视角)
MapReduce框架操作< key,value>对,也就是说,框架将任务的输入视为一组<key,value>对,并生成一组< key,value>对作为任务的输出,只是类型不同。
键和值相关的类应该通过框架进行序列化,需要实现Writable接口。此外,key类必须实现Writable-Comparable接口,使框架方便排序。一个MapReduce任务的输入和输出类型:(Input)< k1,v1 > - >map- > < k2,v2 > - > - >reduce- > < k3,v3 >(Output)。
InputOutputMap<k1, v1>list (<k2, v2>)Reduce<k2, list(v2)>list (<k3, v3>)
术语
-
PayLoad -实现Map和Reduce方法的 应用程序 ,是任务的核心。
-
Mapper - Mapper将输入的键/值对映射为一组中间键/值对。
-
NamedNode -管理Hadoop分布式文件系统(HDFS)的节点。
-
DataNode —在进行任何处理之前提前展示数据的节点。
-
MasterNode —JobTracker运行的节点,并接受来自客户端的任务请求。
-
SlaveNode -Map和Reduce程序运行的节点。
-
JobTracker -调度任务并跟踪分配的任务到任务跟踪器。
-
Task Tracker -跟踪任务,向JobTracker报告任务状态。
-
Job —数据集上执行Mapper和Reducer的程序。
-
Task —在数据块的Mapper或Reducer任务。
-
Task Attempt —尝试在SlaveNode上执行任务的特定实例
示例场景
以下是某机构的用电量的数据。它包含了每月的用电量和几年的平均用电量。
JanFebMarAprMayJunJulAugSepOctNovDecAvg
19792323243242526262626252625
198026272828283031313130303029
198131323232333435363634343434
198439383939394142434039383840
198538393939394141410040393945
如果将上述数据作为输入,我们必须编写应用程序来处理它并产生结果,例如查找最大用电量年、最小用电量年,等等。如果记录数量有限,对程序员来说,这是一个简单的过程。他们将简单地编写逻辑来生成所需的输出,并将数据传递给应用程序。
但是,如果数据展示的是一个特定州的所有大型工业的从开始到现在的电力消耗。
-
当我们编写应用程序来处理这些批量数据时,程序需要大量的时间来执行。
-
当我们将数据从源头转移到网络服务器,将会消耗大量的网络流量。
为了解决这些问题,我们需要MapReduce框架。
输入数据
以下数据保存在sample.txt,作为输入数据:
示例程序
以下的程序使用MapReduce框架处理样本数据
package hadoop; import java.util.*; import java.io.IOException; import java.io.IOException; import org.apache.hadoop.fs.Path;import org.apache.hadoop.conf.*;import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*;import org.apache.hadoop.util.*; public class ProcessUnits { //Mapper class public static class E_EMapper extends MapReduceBase implements Mapper<LongWritable ,/*Input key Type */ Text, /*Input value Type*/ Text, /*Output key Type*/ IntWritable> /*Output value Type*/ { //Map function public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); String lasttoken = null; StringTokenizer s = new StringTokenizer(line,"/t"); String year = s.nextToken(); while(s.hasMoreTokens()) { lasttoken=s.nextToken(); } int avgprice = Integer.parseInt(lasttoken); output.collect(new Text(year), new IntWritable(avgprice)); } } //Reducer class public static class E_EReduce extends MapReduceBase implements Reducer< Text, IntWritable, Text, IntWritable > { //Reduce function public void reduce( Text key, Iterator <IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int maxavg=30; int val=Integer.MIN_VALUE; while (values.hasNext()) { if((val=values.next().get())>maxavg) { output.collect(key, new IntWritable(val)); } } } } //Main function public static void main(String args[])throws Exception { JobConf conf = new JobConf(ProcessUnits.class); conf.setJobName("max_eletricityunits"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(E_EMapper.class); conf.setCombinerClass(E_EReduce.class); conf.setReducerClass(E_EReduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); } }
将上面的程序保存为ProcessUnits.java。 程序的编译和执行解释如下
编译和单元程序的执行过程
假设我们进入Hadoop用户的主目录中(例如,/home/Hadoop)。
按照下面的步骤编译并执行上面的程序。
步骤1
下面的命令是创建一个目录来存储已编译的java类 。
$ mkdir units
步骤2
下载Hadoop-core-1.2.1.jar,用于编译和执行MapReduce程序。访问以下链接http://mvnrepository.com/artifact/org.apache.hadoop/hadoop-core/1.2.1下载jar。假设下载的文件夹是/home/hadoop/。
步骤3
下面的命令用于编译ProcessUnits.java,为程序创建一个jar。
$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java
$ jar -cvf units.jar -C units/ .
步骤4
下面的命令用于在HDFS中创建一个输入目录。
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
步骤5
下面的命令用于复制名为sample的输入文件。txtin是HDFS的输入目录。
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir
步骤6
下面的命令用于验证输入目录中的文件。
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
步骤7
下面的命令是用于从输入的目录获取输入文件,运行eleunit_max应用。
$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir
等待一段时间,直到文件被执行。执行后,如下图所示,输出将包含输入细分的数目、Map任务的数量、reducer任务的数量等。
INFO mapreduce.Job: Job job_1414748220717_0002
completed successfully
14/10/31 06:02:52
INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=61
FILE: Number of bytes written=279400
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=546
HDFS: Number of bytes written=40
HDFS: Number of read operations=9
HDFS: Number of large read operations=0
HDFS: Number of write operations=2 Job Counters
Launched map tasks=2
Launched reduce tasks=1
Data-local map tasks=2
Total time spent by all maps in occupied slots (ms)=146137
Total time spent by all reduces in occupied slots (ms)=441
Total time spent by all map tasks (ms)=14613
Total time spent by all reduce tasks (ms)=44120
Total vcore-seconds taken by all map tasks=146137
Total vcore-seconds taken by all reduce tasks=44120
Total megabyte-seconds taken by all map tasks=149644288
Total megabyte-seconds taken by all reduce tasks=45178880
Map-Reduce Framework
Map input records=5
Map output records=5
Map output bytes=45
Map output materialized bytes=67
Input split bytes=208
Combine input records=5
Combine output records=5
Reduce input groups=5
Reduce shuffle bytes=6
Reduce input records=5
Reduce output records=5
Spilled Records=10
Shuffled Maps =2
Failed Shuffles=0
Merged Map outputs=2
GC time elapsed (ms)=948
CPU time spent (ms)=5160
Physical memory (bytes) snapshot=47749120
Virtual memory (bytes) snapshot=2899349504
Total committed heap usage (bytes)=277684224
File Output Format Counters
Bytes Written=40
步骤8
下面的命令用于验证输出文件夹中的结果文件。
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
步骤9
下面的命令是用来看part- 00000文件输出。这个文件是由HDFS生成。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
下面是MapReduce程序生成的输出。
1981 34
1984 40
1985 45
步骤10
下面的命令用来从HDFS复制输出文件夹到本地文件系统,用作分析。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs get output_dir /home/hadoop
重要命令
所有Hadoop命令都由$HADOOP_HOME/bin/hadoop命令调用。 运行Hadoop脚本不加任何参数会打印所有命令的描述。
Usage : hadoop [--config confdir] COMMAND
下表罗列了可用选项及其描述
OptionsDescriptionnamenode -formatFormats the DFS filesystem.secondarynamenodeRuns the DFS secondary namenode.namenodeRuns the DFS namenode.datanodeRuns a DFS datanode.dfsadminRuns a DFS admin client.mradminRuns a Map-Reduce admin client.fsckRuns a DFS filesystem checking utility.fsRuns a generic filesystem user client.balancerRuns a cluster balancing utility.oivApplies the offline fsimage viewer to an fsimage.fetchdtFetches a delegation token from the NameNode.jobtrackerRuns the MapReduce job Tracker node.pipesRuns a Pipes job.tasktrackerRuns a MapReduce task Tracker node.historyserverRuns job history servers as a standalone daemon.jobManipulates the MapReduce jobs.queueGets information regarding JobQueues.versionPrints the version.jar <jar>Runs a jar file.distcp <srcurl> <desturl>Copies file or directories recursively.distcp2 <srcurl> <desturl>DistCp version 2.archive -archiveName NAME -pCreates a hadoop archive.<parent path> <src>* <dest>classpathPrints the class path needed to get the Hadoop jar and the required libraries.daemonlogGet/Set the log level for each daemon
如何与mapreduce任务交互
以下是Hadoop任务中可用的通用选项。
GENERIC_OPTIONSDescription-submit <job-file>Submits the job.-status <job-id>Prints the map and reduce completion percentage and all job counters.-counter <job-id> <group-name> <countername>Prints the counter value.-kill <job-id>Kills the job.-events <job-id> <fromevent-#> <#-of-events>Prints the events' details received by jobtracker for the given range.-history [all] <jobOutputDir> - history < jobOutputDir>Prints job details, failed and killed tip details. More details about the job such as successful tasks and task attempts made for each task can be viewed by specifying the [all] option.-list[all]Displays all jobs. -list displays only jobs which are yet to complete.-kill-task <task-id>Kills the task. Killed tasks are NOT counted against failed attempts.-fail-task <task-id>Fails the task. Failed tasks are counted against failed attempts.-set-priority <job-id> <priority>Changes the priority of the job. Allowed priority values are VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW
查看job状态
$ $HADOOP_HOME/bin/hadoop job -status <JOB-ID> e.g. $ $HADOOP_HOME/bin/hadoop job -status job_201310191043_0004
查看job输出历史记录
$ $HADOOP_HOME/bin/hadoop job -history <DIR-NAME> e.g.$ $HADOOP_HOME/bin/hadoop job -history /user/expert/output
终止job
$ $HADOOP_HOME/bin/hadoop job -kill <JOB-ID> e.g. $ $HADOOP_HOME/bin/hadoop job -kill job_201310191043_0004
java达人
ID:java_daren
大数据时代,Hadoop培训、大数据培训、培训班,就选光环大数据!
大数据培训、人工智能培训、Python培训、大数据培训机构、大数据培训班、数据分析培训、大数据可视化培训,就选光环大数据!光环大数据,聘请专业的大数据领域知名讲师,确保教学的整体质量与教学水准。讲师团及时掌握时代潮流技术,将前沿技能融入教学中,确保学生所学知识顺应时代所需。通过深入浅出、通俗易懂的教学方式,指导学生更快的掌握技能知识,成就上万个高薪就业学子。 更多问题咨询,欢迎点击------>>>>在线客服!