Changes between Initial Version and Version 1 of waue/wordCountII


Ignore:
Timestamp:
Dec 17, 2011, 12:40:30 PM (12 years ago)
Author:
waue
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • waue/wordCountII

    v1 v1  
     1
     2{{{
     3#!java
     4
     5import java.io.BufferedReader;
     6import java.io.FileReader;
     7import java.io.IOException;
     8import java.util.HashSet;
     9import java.util.Set;
     10import java.util.StringTokenizer;
     11
     12import org.apache.hadoop.conf.Configuration;
     13import org.apache.hadoop.filecache.DistributedCache;
     14import org.apache.hadoop.fs.Path;
     15import org.apache.hadoop.io.IntWritable;
     16import org.apache.hadoop.io.LongWritable;
     17import org.apache.hadoop.io.Text;
     18import org.apache.hadoop.mapreduce.Job;
     19import org.apache.hadoop.mapreduce.Mapper;
     20import org.apache.hadoop.mapreduce.Reducer;
     21import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     22import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     23import org.apache.hadoop.util.StringUtils;
     24
     25public class WordCountV20 {
     26
     27  public static class Map extends
     28      Mapper<LongWritable, Text, Text, IntWritable> {
     29    static enum Counters {
     30      INPUT_WORDS
     31    }
     32
     33    private final static IntWritable one = new IntWritable(1);
     34    private Text word = new Text();
     35
     36    private boolean caseSensitive = true;
     37    private Set<String> patternsToSkip = new HashSet<String>();
     38
     39    private void parseSkipFile(Path patternsFile) {
     40      try {
     41        BufferedReader fis = new BufferedReader(new FileReader(
     42            patternsFile.toString()));
     43        String pattern = null;
     44        while ((pattern = fis.readLine()) != null) {
     45          patternsToSkip.add(pattern);
     46        }
     47      } catch (IOException ioe) {
     48        System.err
     49            .println("Caught exception while parsing the cached file '"
     50                + patternsFile
     51                + "' : "
     52                + StringUtils.stringifyException(ioe));
     53      }
     54    }
     55
     56    @Override
     57    public void setup(Context context) {
     58      Configuration conf = context.getConfiguration();
     59      caseSensitive = conf.getBoolean("wordcount.case.sensitive", true);
     60
     61      if (conf.getBoolean("wordcount.skip.patterns", false)) {
     62        Path[] patternsFiles = new Path[0];
     63        try {
     64          patternsFiles = DistributedCache.getLocalCacheFiles(conf);
     65        } catch (IOException ioe) {
     66          System.err
     67              .println("Caught exception while getting cached files: "
     68                  + StringUtils.stringifyException(ioe));
     69        }
     70        for (Path patternsFile : patternsFiles) {
     71          parseSkipFile(patternsFile);
     72        }
     73      }
     74    }
     75
     76    @Override
     77    public void map(LongWritable key, Text value, Context context)
     78        throws IOException, InterruptedException {
     79
     80      String line = (caseSensitive) ? value.toString() : value.toString()
     81          .toLowerCase();
     82
     83      for (String pattern : patternsToSkip) {
     84        line = line.replaceAll(pattern, "");
     85      }
     86
     87      StringTokenizer tokenizer = new StringTokenizer(line);
     88      while (tokenizer.hasMoreTokens()) {
     89        word.set(tokenizer.nextToken());
     90        context.write(word, one);
     91
     92      }
     93
     94    }
     95  }
     96
     97  public static class Reduce extends
     98      Reducer<Text, IntWritable, Text, IntWritable> {
     99    private IntWritable result = new IntWritable();
     100
     101    @Override
     102    public void reduce(Text key, Iterable<IntWritable> values,
     103        Context context) throws IOException, InterruptedException {
     104      int sum = 0;
     105      for (IntWritable val : values) {
     106        sum += val.get();
     107      }
     108      result.set(sum);
     109      context.write(key, result);
     110    }
     111  }
     112
     113  public static void main(String[] args) throws Exception {
     114    String[] argv = { "/home/nchc/input", "/home/nchc/output-wc2", "-c",  "-skip",
     115        "/home/nchc/patterns/patterns.txt" };
     116
     117    args = argv;
     118    Configuration conf = new Configuration();
     119    conf.set("mapred.job.tracker", "local"); // for single
     120    conf.set("fs.default.name", "file:///"); // for single
     121
     122    if (args.length < 2) {
     123      System.err
     124          .println("Usage: hadoop jar WordCount.jar <input> <output> [-c] [-skip <path>]");
     125      System.exit(2);
     126    }
     127
     128    for (int i = 0; i < args.length; ++i) {
     129      if ("-skip".equals(args[i])) {
     130        DistributedCache
     131            .addCacheFile(new Path(args[++i]).toUri(), conf);
     132        conf.setBoolean("wordcount.skip.patterns", true);
     133      }
     134      if ("-c".equals(args[i])){
     135        conf.setBoolean("wordcount.case.sensitive", false);
     136      }
     137    }
     138
     139    CheckAndDelete.checkAndDelete(args[1], conf);
     140    Job job = new Job(conf, "Word Count");
     141    job.setJarByClass(WordCountV20.class);
     142    job.setMapperClass(Map.class);
     143    job.setCombinerClass(Reduce.class);
     144    job.setReducerClass(Reduce.class);
     145    job.setOutputKeyClass(Text.class);
     146    job.setOutputValueClass(IntWritable.class);
     147    FileInputFormat.addInputPath(job, new Path(args[0]));
     148    FileOutputFormat.setOutputPath(job, new Path(args[1]));
     149
     150    System.exit(job.waitForCompletion(true) ? 0 : 1);
     151  }
     152}
     153
     154
     155}}}