|   | 1 |  = 將 wordcount2 改成 0.20 版 = | 
                  
                          |   | 2 |  | 
                  
                          |   | 3 |  == 前言 == | 
                  
                          |   | 4 | 按照hadoop 0.20 官方網頁的 wordcount v2 . | 
                  
                          |   | 5 | [[http://hadoop.apache.org/common/docs/r0.20.1/mapred_tutorial.html#Example%3A+WordCount+v1.0 ]] | 
                  
                          |   | 6 |  | 
                  
                          |   | 7 | 最需要給的地方是 ''' " extends MapReduceBase implements Mapper" ''' 原因是在hadoop 0.20時,mapreducebase 此class已經被deprecated, | 
                  
                          |   | 8 |  | 
                  
                          |   | 9 | 因此應改寫如 ''' " extends Mapper" ''' | 
                  
                          |   | 10 |  | 
                  
                          |   | 11 | 然而最主要不能改變的原因是,程式中很重要的功能 [http://hadoop.apache.org/common/docs/r0.20.1/api/org/apache/hadoop/filecache/DistributedCache.html DistributedCache ]  以及 -Dwordcount.skip.patterns 等功能寫於 configure() 函數內。 此configure() 繼承自 MapReduceBase, | 
                  
                          |   | 12 |  | 
                  
                          |   | 13 | 因此若整個程式改成hadoop 0.20 的 " extends Mapper" ''',則有些功能將不知是否能使用 | 
                  
                          |   | 14 |  | 
                  
                          |   | 15 |  | 
                  
                          |   | 16 |  == 原始程式碼 == | 
                  
                          |   | 17 |  | 
                  
                          |   | 18 | {{{ | 
                  
                          |   | 19 | #!java | 
                  
                          |   | 20 | public class WordCountV2 extends Configured implements Tool { | 
                  
                          |   | 21 |  | 
                  
                          |   | 22 |         public static class Map extends MapReduceBase implements | 
                  
                          |   | 23 |                         Mapper<LongWritable, Text, Text, IntWritable> { | 
                  
                          |   | 24 |  | 
                  
                          |   | 25 |                 static enum Counters { | 
                  
                          |   | 26 |                         INPUT_WORDS | 
                  
                          |   | 27 |                 } | 
                  
                          |   | 28 |  | 
                  
                          |   | 29 |                 private final static IntWritable one = new IntWritable(1); | 
                  
                          |   | 30 |                 private Text word = new Text(); | 
                  
                          |   | 31 |  | 
                  
                          |   | 32 |                 private boolean caseSensitive = true; | 
                  
                          |   | 33 |                 private Set<String> patternsToSkip = new HashSet<String>(); | 
                  
                          |   | 34 |  | 
                  
                          |   | 35 |                 private long numRecords = 0; | 
                  
                          |   | 36 |                 private String inputFile; | 
                  
                          |   | 37 |  | 
                  
                          |   | 38 |                 public void configure(JobConf job) { | 
                  
                          |   | 39 |                         caseSensitive = job.getBoolean("wordcount.case.sensitive", true); | 
                  
                          |   | 40 |                         inputFile = job.get("map.input.file"); | 
                  
                          |   | 41 |  | 
                  
                          |   | 42 |                         if (job.getBoolean("wordcount.skip.patterns", false)) { | 
                  
                          |   | 43 |                                 Path[] patternsFiles = new Path[0]; | 
                  
                          |   | 44 |                                 try { | 
                  
                          |   | 45 |                                         patternsFiles = DistributedCache.getLocalCacheFiles(job); | 
                  
                          |   | 46 |                                 } catch (IOException ioe) { | 
                  
                          |   | 47 |                                         System.err | 
                  
                          |   | 48 |                                                         .println("Caught exception while getting cached files: " | 
                  
                          |   | 49 |                                                                         + StringUtils.stringifyException(ioe)); | 
                  
                          |   | 50 |                                 } | 
                  
                          |   | 51 |                                 for (Path patternsFile : patternsFiles) { | 
                  
                          |   | 52 |                                         parseSkipFile(patternsFile); | 
                  
                          |   | 53 |                                 } | 
                  
                          |   | 54 |                         } | 
                  
                          |   | 55 |                 } | 
                  
                          |   | 56 |  | 
                  
                          |   | 57 |                 private void parseSkipFile(Path patternsFile) { | 
                  
                          |   | 58 |                         try { | 
                  
                          |   | 59 |                                 BufferedReader fis = new BufferedReader(new FileReader( | 
                  
                          |   | 60 |                                                 patternsFile.toString())); | 
                  
                          |   | 61 |                                 String pattern = null; | 
                  
                          |   | 62 |                                 while ((pattern = fis.readLine()) != null) { | 
                  
                          |   | 63 |                                         patternsToSkip.add(pattern); | 
                  
                          |   | 64 |                                 } | 
                  
                          |   | 65 |                         } catch (IOException ioe) { | 
                  
                          |   | 66 |                                 System.err | 
                  
                          |   | 67 |                                                 .println("Caught exception while parsing the cached file '" | 
                  
                          |   | 68 |                                                                 + patternsFile | 
                  
                          |   | 69 |                                                                 + "' : " | 
                  
                          |   | 70 |                                                                 + StringUtils.stringifyException(ioe)); | 
                  
                          |   | 71 |                         } | 
                  
                          |   | 72 |                 } | 
                  
                          |   | 73 |  | 
                  
                          |   | 74 |                 public void map(LongWritable key, Text value, | 
                  
                          |   | 75 |                                 OutputCollector<Text, IntWritable> output, Reporter reporter) | 
                  
                          |   | 76 |                                 throws IOException { | 
                  
                          |   | 77 |                         String line = (caseSensitive) ? value.toString() : value.toString() | 
                  
                          |   | 78 |                                         .toLowerCase(); | 
                  
                          |   | 79 |  | 
                  
                          |   | 80 |                         for (String pattern : patternsToSkip) { | 
                  
                          |   | 81 |                                 line = line.replaceAll(pattern, ""); | 
                  
                          |   | 82 |                         } | 
                  
                          |   | 83 |  | 
                  
                          |   | 84 |                         StringTokenizer tokenizer = new StringTokenizer(line); | 
                  
                          |   | 85 |                         while (tokenizer.hasMoreTokens()) { | 
                  
                          |   | 86 |                                 word.set(tokenizer.nextToken()); | 
                  
                          |   | 87 |                                 output.collect(word, one); | 
                  
                          |   | 88 |                                 reporter.incrCounter(Counters.INPUT_WORDS, 1); | 
                  
                          |   | 89 |                         } | 
                  
                          |   | 90 |  | 
                  
                          |   | 91 |                         if ((++numRecords % 100) == 0) { | 
                  
                          |   | 92 |                                 reporter.setStatus("Finished processing " + numRecords | 
                  
                          |   | 93 |                                                 + " records " + "from the input file: " + inputFile); | 
                  
                          |   | 94 |                         } | 
                  
                          |   | 95 |                 } | 
                  
                          |   | 96 |         } | 
                  
                          |   | 97 |  | 
                  
                          |   | 98 |         public static class Reduce extends MapReduceBase implements | 
                  
                          |   | 99 |                         Reducer<Text, IntWritable, Text, IntWritable> { | 
                  
                          |   | 100 |                 public void reduce(Text key, Iterator<IntWritable> values, | 
                  
                          |   | 101 |                                 OutputCollector<Text, IntWritable> output, Reporter reporter) | 
                  
                          |   | 102 |                                 throws IOException { | 
                  
                          |   | 103 |                         int sum = 0; | 
                  
                          |   | 104 |                         while (values.hasNext()) { | 
                  
                          |   | 105 |                                 sum += values.next().get(); | 
                  
                          |   | 106 |                         } | 
                  
                          |   | 107 |                         output.collect(key, new IntWritable(sum)); | 
                  
                          |   | 108 |                 } | 
                  
                          |   | 109 |         } | 
                  
                          |   | 110 |  | 
                  
                          |   | 111 |         public int run(String[] args) throws Exception { | 
                  
                          |   | 112 |  | 
                  
                          |   | 113 |  | 
                  
                          |   | 114 |                 JobConf conf = new JobConf(getConf(), WordCount.class); | 
                  
                          |   | 115 |                 conf.setJobName("wordcount"); | 
                  
                          |   | 116 |  | 
                  
                          |   | 117 |                 conf.setOutputKeyClass(Text.class); | 
                  
                          |   | 118 |                 conf.setOutputValueClass(IntWritable.class); | 
                  
                          |   | 119 |  | 
                  
                          |   | 120 |                 conf.setMapperClass(Map.class); | 
                  
                          |   | 121 |                 conf.setCombinerClass(Reduce.class); | 
                  
                          |   | 122 |                 conf.setReducerClass(Reduce.class); | 
                  
                          |   | 123 |  | 
                  
                          |   | 124 |                 conf.setInputFormat(TextInputFormat.class); | 
                  
                          |   | 125 |                 conf.setOutputFormat(TextOutputFormat.class); | 
                  
                          |   | 126 |  | 
                  
                          |   | 127 |                 List<String> other_args = new ArrayList<String>(); | 
                  
                          |   | 128 |                 for (int i = 0; i < args.length; ++i) { | 
                  
                          |   | 129 |                         if ("-skip".equals(args[i])) { | 
                  
                          |   | 130 |                                 DistributedCache | 
                  
                          |   | 131 |                                                 .addCacheFile(new Path(args[++i]).toUri(), conf); | 
                  
                          |   | 132 |                                 conf.setBoolean("wordcount.skip.patterns", true); | 
                  
                          |   | 133 |                         } else { | 
                  
                          |   | 134 |                                 other_args.add(args[i]); | 
                  
                          |   | 135 |                         } | 
                  
                          |   | 136 |                 } | 
                  
                          |   | 137 |  | 
                  
                          |   | 138 |                 FileInputFormat.setInputPaths(conf, new Path(other_args.get(0))); | 
                  
                          |   | 139 |                 FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1))); | 
                  
                          |   | 140 |                 CheckAndDelete.checkAndDelete(other_args.get(1), conf); | 
                  
                          |   | 141 |                 JobClient.runJob(conf); | 
                  
                          |   | 142 |                 return 0; | 
                  
                          |   | 143 |         } | 
                  
                          |   | 144 |  | 
                  
                          |   | 145 |         public static void main(String[] args) throws Exception { | 
                  
                          |   | 146 |                 String[] argv = { "-Dwordcount.case.sensitive=false", | 
                  
                          |   | 147 |                                 "/user/waue/input", "/user/waue/output-wc2", "-skip", | 
                  
                          |   | 148 |                                 "/user/waue/patterns/patterns.txt" }; | 
                  
                          |   | 149 |                 args = argv; | 
                  
                          |   | 150 |                 int res = ToolRunner.run(new Configuration(), new WordCountV2(), args); | 
                  
                          |   | 151 |                 System.exit(res); | 
                  
                          |   | 152 |         } | 
                  
                          |   | 153 | } | 
                  
                          |   | 154 | }}} | 
                  
                          |   | 155 |  | 
                  
                          |   | 156 |  == 原始程式碼 == |