Changes between Initial Version and Version 1 of wc2


Ignore:
Timestamp:
Jul 31, 2011, 7:20:16 AM (13 years ago)
Author:
waue
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • wc2

    v1 v1  
     1
     2{{{
     3#!java
     4package waue;
     5
     6import java.io.BufferedReader;
     7import java.io.FileReader;
     8import java.io.IOException;
     9import java.util.ArrayList;
     10import java.util.HashSet;
     11import java.util.Iterator;
     12import java.util.List;
     13import java.util.Set;
     14import java.util.StringTokenizer;
     15
     16import org.apache.hadoop.conf.Configuration;
     17import org.apache.hadoop.conf.Configured;
     18import org.apache.hadoop.filecache.DistributedCache;
     19import org.apache.hadoop.fs.Path;
     20import org.apache.hadoop.io.IntWritable;
     21import org.apache.hadoop.io.LongWritable;
     22import org.apache.hadoop.io.Text;
     23import org.apache.hadoop.mapred.OutputCollector;
     24import org.apache.hadoop.mapred.Reporter;
     25import org.apache.hadoop.mapreduce.Job;
     26import org.apache.hadoop.mapreduce.Mapper;
     27import org.apache.hadoop.mapreduce.Reducer;
     28import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     29import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
     30import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     31import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
     32import org.apache.hadoop.util.GenericOptionsParser;
     33import org.apache.hadoop.util.StringUtils;
     34import org.apache.hadoop.util.Tool;
     35import org.apache.hadoop.util.ToolRunner;
     36
     37public class WordCountV2 extends Configured implements Tool {
     38
     39        public static class Map extends
     40                        Mapper<LongWritable, Text, Text, IntWritable> {
     41
     42                static enum Counters {
     43                        INPUT_WORDS
     44                }
     45
     46                private final static IntWritable one = new IntWritable(1);
     47                private Text word = new Text();
     48
     49                private boolean caseSensitive = true;
     50                private Set<String> patternsToSkip = new HashSet<String>();
     51
     52                private long numRecords = 0;
     53                private String inputFile;
     54
     55                public void setup(Mapper.Context context) {
     56                        Configuration conf = context.getConfiguration();
     57                        caseSensitive = conf.getBoolean("wordcount.case.sensitive", true);
     58                        inputFile = conf.get("map.input.file");
     59
     60                        if (conf.getBoolean("wordcount.skip.patterns", false)) {
     61                                Path[] patternsFiles = new Path[0];
     62                                try {
     63                                        patternsFiles = DistributedCache.getLocalCacheFiles(conf);
     64                                } catch (IOException ioe) {
     65                                        System.err
     66                                                        .println("Caught exception while getting cached files: "
     67                                                                        + StringUtils.stringifyException(ioe));
     68                                }
     69                                for (Path patternsFile : patternsFiles) {
     70                                        parseSkipFile(patternsFile);
     71                                }
     72                        }
     73                }
     74
     75                private void parseSkipFile(Path patternsFile) {
     76                        try {
     77                                BufferedReader fis = new BufferedReader(new FileReader(
     78                                                patternsFile.toString()));
     79                                String pattern = null;
     80                                while ((pattern = fis.readLine()) != null) {
     81                                        patternsToSkip.add(pattern);
     82                                }
     83                        } catch (IOException ioe) {
     84                                System.err
     85                                                .println("Caught exception while parsing the cached file '"
     86                                                                + patternsFile
     87                                                                + "' : "
     88                                                                + StringUtils.stringifyException(ioe));
     89                        }
     90                }
     91
     92                public void map(LongWritable key, Text value,
     93                                OutputCollector<Text, IntWritable> output, Reporter reporter)
     94                                throws IOException {
     95                        String line = (caseSensitive) ? value.toString() : value.toString()
     96                                        .toLowerCase();
     97
     98                        for (String pattern : patternsToSkip) {
     99                                line = line.replaceAll(pattern, "");
     100                        }
     101
     102                        StringTokenizer tokenizer = new StringTokenizer(line);
     103                        while (tokenizer.hasMoreTokens()) {
     104                                word.set(tokenizer.nextToken());
     105                                output.collect(word, one);
     106                                reporter.incrCounter(Counters.INPUT_WORDS, 1);
     107                        }
     108
     109                        if ((++numRecords % 100) == 0) {
     110                                reporter.setStatus("Finished processing " + numRecords
     111                                                + " records " + "from the input file: " + inputFile);
     112                        }
     113                }
     114        }
     115
     116        public static class Reduce extends
     117                        Reducer<Text, IntWritable, Text, IntWritable> {
     118                public void reduce(Text key, Iterator<IntWritable> values,
     119                                OutputCollector<Text, IntWritable> output, Reporter reporter)
     120                                throws IOException {
     121                        int sum = 0;
     122                        while (values.hasNext()) {
     123                                sum += values.next().get();
     124                        }
     125                        output.collect(key, new IntWritable(sum));
     126                }
     127        }
     128
     129        public int run(String[] args) throws Exception {
     130
     131                // JobConf conf = new JobConf(getConf(), WordCount.class);
     132                Configuration conf = new Configuration();
     133                Job job = new Job();
     134                job.setJobName("wordcount");
     135                String[] otherArgs = new GenericOptionsParser(conf, args)
     136                                .getRemainingArgs();
     137                if (otherArgs.length < 2) {
     138                        System.out
     139                                        .println("WordCountV2 [-Dwordcount.case.sensitive=<false|true>] \\ ");
     140                        System.out
     141                                        .println("            <inDir> <outDir> [-skip Pattern_file]");
     142                        return 0;
     143                }
     144                job.setOutputKeyClass(Text.class);
     145                job.setOutputValueClass(IntWritable.class);
     146
     147                job.setMapperClass(Map.class);
     148                job.setCombinerClass(Reduce.class);
     149                job.setReducerClass(Reduce.class);
     150
     151                job.setInputFormatClass(TextInputFormat.class);
     152                job.setOutputFormatClass(TextOutputFormat.class);
     153
     154                List<String> other_args = new ArrayList<String>();
     155                for (int i = 0; i < args.length; ++i) {
     156                        if ("-skip".equals(args[i])) {
     157                                DistributedCache
     158                                                .addCacheFile(new Path(args[++i]).toUri(), conf);
     159                                conf.setBoolean("wordcount.skip.patterns", true);
     160                        } else {
     161                                other_args.add(args[i]);
     162                        }
     163                }
     164
     165                FileInputFormat.setInputPaths(job, new Path(other_args.get(0)));
     166                FileOutputFormat.setOutputPath(job, new Path(other_args.get(1)));
     167                job.waitForCompletion(true);
     168                return 0;
     169        }
     170
     171        public static void main(String[] args) throws Exception {
     172                // String[] argv = { "-Dwordcount.case.sensitive=false",
     173                // "/user/hadoop/input",
     174                // "/user/hadoop/output-wc2", "-skip",
     175                // "/user/hadoop/patterns/patterns.txt" };
     176                // args = argv;
     177
     178                int res = ToolRunner.run(new Configuration(), new WordCountV2(), args);
     179                System.exit(res);
     180        }
     181}
     182}}}