注册 登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

还东国的博客

行之苟有恒,久久自芬芳

 
 
 

日志

 
 

云计算和云存储之十一Hadoop的MapReduce及倒序例程  

2016-09-25 13:47:17|  分类: |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |

t云计算和云存储之十一Hadoop的MapReduce及倒序例程


 


继上周在完成初步的HDFS的介绍后,今天介绍一下Hadoop的核心内容MapReduce,那么MapReduce的编程模型是什么样子的呢?


大家都知道在计算机学里有一个非常有名的算法叫“分治法”,它也用得这种方法。说简单一些,就是把复杂的东西拆开成一个个简单的东西,如果还是复杂就再拆,直到拆成非常简单的东西。那么


再让人来做这些东西就不复杂了。


MapReduce抽象出来Map 和reduce。前者负责分解任务,后者负责将分解后多任务处理的结果汇总起来。其它的内部实现,都由HADOOP自己来实现,大大减轻了开发人员的工作强度,提高了开发和工作


的效率。


Map负责将输入数据分割成段(Split)并创建一个Map任务,并创建相应的键值对做为输入<K1,V1>,接着再按一定的要求排序,再产生对应不同的Reducer任务。


而Reducer则为每个输入进行处理得到相应的键值对并输出到HDFS上。这里有一个小问题,Reducer的数量如何确定?一般来说,它的默认值是1,但是在实际情况中不可能将大批量的任务都压到一个


Reducer上,那样程序根本就是什么大数据了。


HADOOP提供了job.setNumReduceTasks()这个方法来重新设置这个值。当然,也可以在Hadoop的配置文件里修改这个值。


知道了它的原理,将这个原理应用到集群上就非常容易理解了。MapReduce任务由一个JobTracker和多个TaskTracker两类节点来控制完成。JobTracker负责将Mappers和Reducers分配给空闲的


TaskTracker,让其把这些任务执行成功。当然,JobTracker也负责监控整个工作状态,一旦有问题,就及时处理。不过,要是它自己挂了,这可就不好说了。


MapReduce会尽量保证那些存储了数据节点的机器来执行计算任务,免得浪费过多的时间在带宽上。因此,split通常情况下应该小于或等于HDFS数据块的大小(默认64M),从而保证数据不会发生跨机


器存储。以方便本地计算。


然后有一个Shuffle过程,MapReduce会将Mapper的输出结果按KEY分成R份,一般来说哈希的效率是最高的。当然如何更高效率的使用哈希并保证相应的扩展性,这也是一个重要的问题,但这个不在这


里讨,有兴趣看“高并发网站”相关书籍。


下来就是对输出结果的合并(Combine)。也就是说把蹭结果相同的KEY的值多组<key,value>合并成一对。记住,Combine过程是Mapper的一 部分,在Map函数后执行。Hadoop并不保证对一个Mapper输


出执行多少次Combine过程,即是说开发人员必须保证无论合并多少次,得到的结果都保持一致。


再下来就是上面说的写到本地磁盘中去,通知JobTracker中间结果的位置,再由它告知Reducer向哪个DataNode上取。上面提到过,它们是按KEY分成区域的。Reducer需要向多个Mapper节点取得其KEY


值区间的中间结果,然后执行reduce函数形成最终的结果。


如果还需要将R个结果统一,就利用任务管道,将其合并即可。


这里面需要注意以下几个类:


InputFormat类,Mapper类,Combiner类,Partioner类和Reducer类、OutputFormat类。当然,其它一些重要的方法和属性也是相当重要的需要大家在开发的时候儿认真对待。


然后这就又引出一个新问题,到底多少个Reducer为最好呢?正如线程的启动需要是CPU*2,一般来说。那么这个呢?


两个原则,一个是小于总的Slots(这个不展开了,大家看书),另外一个是Reducer数量要少于Mappers。


看下面的倒排索引


public class InvertedIndex {


 


         public static class InvertedIndexMapper extends Mapper<Object, Text, Text, Text>{


                   private Text keyInfo = new Text();  //存储单词和URI的组合


                   private Text valueInfo = new Text();//存储词频


                   private FileSplit split;            //存储Split对象


                  


                   public void map(Object key, Text value, Context context) throws IOException, InterruptedException{


                            split = (FileSplit)context.getInputSplit();//获得<key,value>对所属的FileSplit对象


                            StringTokenizer itr = new StringTokenizer(value.toString());


                            while(itr.hasMoreTokens()){


                                     //key值由单词和URI组成,如“MapReduce:1.txt”


                                     //keyInfo.set(itr.nextToken() + ":" + split.getPath().toString());


                                     keyInfo.set(itr.nextToken() + ":" + split.getPath().getName());


                                    


                                     //The file containing this split's data.


                                     //System.out.println(split.getPath().toString());//得到的是包含该split的文件路径


                                     //System.out.println(split.getPath().getName());//得到的是包含该split的文件名


                                    


                                     //词频初始为1


                                     valueInfo.set("1");


                                     context.write(keyInfo, valueInfo);


                                     System.out.println("key=" + keyInfo + "\t" + "value=" + valueInfo);


                            }


                   }


         }


        


         public static class InvertedIndexCombiner extends Reducer<Text, Text, Text, Text>{


                   private Text info = new Text();


                  


                   public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{


                            int sum = 0;// 统计词频


                            for(Text value:values){


                                     sum += Integer.parseInt(value.toString());


                            }


                           


                            int splitIndex = key.toString().indexOf(":");


                            //重新设置value值由RUI和词频组成


                            info.set(key.toString().substring(splitIndex + 1) + ":" + sum);


                            //重新设置key值为单词


                            key.set(key.toString().substring(0, splitIndex));


                            context.write(key, info);


                   }


         }


        


         public static class InvertedIndexReducer extends Reducer<Text, Text, Text, Text>{


                   private Text result = new Text();


                  


                   public void reduce(Text key, Iterable<Text>values, Context context) throws IOException, InterruptedException{


                            //生成文档列表


                            String fileList = new String();


                            for(Text value:values){


                                     fileList += value.toString() + ";";


                            }


                            result.set(fileList);


                            context.write(key, result);


                   }


         }


        


         public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {


                   Configuration conf = new Configuration();


                   String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();


                   if(otherArgs.length != 2){


                            System.out.println("Usage:invertedIndex<in><out>");


                            System.exit(2);


                   }


                  


                    /*判断输出目录文件是否存在,如果存在则先删除它*/


                   FileSystem hdfs = FileSystem.get(conf);


                   Path path = new Path(otherArgs[1]);


                   if(hdfs.exists(path))


                            hdfs.delete(path, true);//不递归删除


                  


                   Job job = new Job(conf, "invertedIndex");


                   job.setJarByClass(InvertedIndex.class);


                  


                   job.setMapperClass(InvertedIndexMapper.class);


                   job.setMapOutputKeyClass(Text.class);


                   job.setMapOutputValueClass(Text.class);


                  


                   job.setCombinerClass(InvertedIndexCombiner.class);


                   job.setReducerClass(InvertedIndexReducer.class);


                  


                   job.setOutputKeyClass(Text.class);


                   job.setMapOutputValueClass(Text.class);


                  


                   FileInputFormat.addInputPath(job, new Path(otherArgs[0]));


                   FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));


                  


                   System.exit(job.waitForCompletion(true)?0:1);


         }


}


这里第一次少了“System.exit(job.waitForCompletion(true)?0:1);”,结果报一个简单的错误“息: Initializing JVM Metrics with processName=JobTracker, sessionId=”就啥信息都没有了。


然后其它几个错误都是缺少相关的JAR包如下面:


hadoop  java.lang.NoClassDefFoundError: org/apache/commons/httpclient/HttpMethod  添加HTTPCLINET3.1JAR包就OK了。


另外还少了CLI和日志包。


还犯了一个超低级错误:路径写错了写了int


在解决问题时,网上有好多的版本解决方法,它们是不同的,一定要加以区分。


比如0.2.0和1.1.20HADOOP和使用就会有不同的解决方法。


然后就是成功运行代码:


运行作业所得到的输出提供了一些有用的信息。无法找到作业JAR 文件的警告信息(No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).)


是意料之中的,因为我们没有使用JAR文件在本地模式下运行。在集群上运行时,将不会看到这个警告。例如,我们可以看到,这个作业有指定的标识,即job_local_0001,并且执行了一个map 任务和


一个reduce 任务(使用attempt_local_0001_m_000000_0和 attempt_local_0001_r_000000_0两个ID)。在调试MapReduce作业时,知道作业和任务的ID 是非常有用的。


输出的最后一部分,以Counters为标题,显示在Hadoop 上运行的每个作业的一些统计信息。这些信息对检查这些大量的数据是否按照预期进行处理非常有用。例如,我们查看系统输出的记录信息可知


:5 个map 输入产生了5 个map 的输出,然后5 个reduce 输入产生2个reduce 输出。


输出数据写入output目录,其中每个reducer都有一个输出文件。我们的例子中包含一个 reducer,所以我们只能找到一个文件,名为part-00000:


fjf      test2.txt:1;test1.txt:1;


hadoop     test2.txt:1;


hello test2.txt:2;test1.txt:2;


inverted   test1.txt:1;test2.txt:1;


is      test1.txt:1;


world        test1.txt:2;


不知道什么原因,这次的Eclipse生成后,竟然没有运行选项,只好自己全新的配置,然后在“运行配置”里的“Java Application”里NEW 了一个新的。配置好就可以了。


这一上午,全把时间浪费“2016-9-24 21:09:13 org.apache.hadoop.metrics.jvm.JvmMetrics init


信息: Initializing JVM Metrics with processName=JobTracker, sessionId=


2016-9-24 21:09:13 org.apache.hadoop.mapred.JobClient configureCommandLineOptions


警告: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).”,亏得最后找到了上面的说明。

  评论这张
 
阅读(127)| 评论(0)
推荐 转载

历史上的今天

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2017