Changes between Initial Version and Version 1 of waue/2010/0417


Ignore:
Timestamp:
Apr 16, 2010, 5:41:14 PM (14 years ago)
Author:
waue
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • waue/2010/0417

    v1 v1  
     1{{{
     2#!java
     3import java.io.BufferedReader;
     4import java.io.FileReader;
     5import java.io.IOException;
     6import java.util.ArrayList;
     7import java.util.HashSet;
     8import java.util.Iterator;
     9import java.util.List;
     10import java.util.Set;
     11import java.util.StringTokenizer;
     12
     13import org.apache.hadoop.conf.Configuration;
     14import org.apache.hadoop.conf.Configured;
     15import org.apache.hadoop.filecache.DistributedCache;
     16import org.apache.hadoop.fs.Path;
     17import org.apache.hadoop.io.IntWritable;
     18import org.apache.hadoop.io.LongWritable;
     19import org.apache.hadoop.io.Text;
     20import org.apache.hadoop.mapred.OutputCollector;
     21import org.apache.hadoop.mapred.Reporter;
     22import org.apache.hadoop.mapreduce.Job;
     23import org.apache.hadoop.mapreduce.Mapper;
     24import org.apache.hadoop.mapreduce.Reducer;
     25import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     26import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
     27import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     28import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
     29import org.apache.hadoop.util.StringUtils;
     30import org.apache.hadoop.util.Tool;
     31import org.apache.hadoop.util.ToolRunner;
     32
     33public class WordCount2for020 extends Configured implements Tool {
     34
     35        public static class Map extends
     36                        Mapper<LongWritable, Text, Text, IntWritable> {
     37
     38                static enum Counters {
     39                        INPUT_WORDS
     40                }
     41
     42                private final static IntWritable one = new IntWritable(1);
     43                private Text word = new Text();
     44
     45                private boolean caseSensitive = true;
     46                private Set<String> patternsToSkip = new HashSet<String>();
     47
     48                private long numRecords = 0;
     49                private String inputFile;
     50
     51                public void setup(Context context) {
     52                        Configuration conf = context.getConfiguration();
     53                        caseSensitive = conf.getBoolean("wordcount.case.sensitive", true);
     54                        inputFile = conf.get("map.input.file");
     55
     56                        if (conf.getBoolean("wordcount.skip.patterns", false)) {
     57                                Path[] patternsFiles = new Path[0];
     58                                try {
     59                                        patternsFiles = DistributedCache.getLocalCacheFiles(conf);
     60                                } catch (IOException ioe) {
     61                                        System.err
     62                                                        .println("Caught exception while getting cached files: "
     63                                                                        + StringUtils.stringifyException(ioe));
     64                                }
     65                                for (Path patternsFile : patternsFiles) {
     66                                        parseSkipFile(patternsFile);
     67                                }
     68                        }
     69                }
     70
     71                private void parseSkipFile(Path patternsFile) {
     72                        try {
     73                                BufferedReader fis = new BufferedReader(new FileReader(
     74                                                patternsFile.toString()));
     75                                String pattern = null;
     76                                while ((pattern = fis.readLine()) != null) {
     77                                        patternsToSkip.add(pattern);
     78                                }
     79                        } catch (IOException ioe) {
     80                                System.err
     81                                                .println("Caught exception while parsing the cached file '"
     82                                                                + patternsFile
     83                                                                + "' : "
     84                                                                + StringUtils.stringifyException(ioe));
     85                        }
     86                }
     87
     88                public void map(LongWritable key, Text value,
     89                                OutputCollector<Text, IntWritable> output, Reporter reporter)
     90                                throws IOException {
     91                        String line = (caseSensitive) ? value.toString() : value.toString()
     92                                        .toLowerCase();
     93
     94                        for (String pattern : patternsToSkip) {
     95                                line = line.replaceAll(pattern, "");
     96                        }
     97
     98                        StringTokenizer tokenizer = new StringTokenizer(line);
     99                        while (tokenizer.hasMoreTokens()) {
     100                                word.set(tokenizer.nextToken());
     101                                output.collect(word, one);
     102                                reporter.incrCounter(Counters.INPUT_WORDS, 1);
     103                        }
     104
     105                        if ((++numRecords % 100) == 0) {
     106                                reporter.setStatus("Finished processing " + numRecords
     107                                                + " records " + "from the input file: " + inputFile);
     108                        }
     109                }
     110        }
     111
     112        public static class Reduce extends
     113                        Reducer<Text, IntWritable, Text, IntWritable> {
     114                public void reduce(Text key, Iterator<IntWritable> values,
     115                                OutputCollector<Text, IntWritable> output, Reporter reporter)
     116                                throws IOException {
     117                        int sum = 0;
     118                        while (values.hasNext()) {
     119                                sum += values.next().get();
     120                        }
     121                        output.collect(key, new IntWritable(sum));
     122                }
     123        }
     124
     125        public int run(String[] args) throws Exception {
     126                Configuration conf = new Configuration();
     127
     128                Job job = new Job(conf);
     129                job.setJarByClass(WordCount2for020.class);
     130                job.setJobName("wordcount");
     131
     132                job.setOutputKeyClass(Text.class);
     133                job.setOutputValueClass(IntWritable.class);
     134
     135                job.setMapperClass(Map.class);
     136                job.setCombinerClass(Reduce.class);
     137                job.setReducerClass(Reduce.class);
     138
     139                job.setInputFormatClass(TextInputFormat.class);
     140                job.setOutputFormatClass(TextOutputFormat.class);
     141
     142                List<String> other_args = new ArrayList<String>();
     143                for (int i = 0; i < args.length; ++i) {
     144                        if ("-skip".equals(args[i])) {
     145                                DistributedCache
     146                                                .addCacheFile(new Path(args[++i]).toUri(), conf);
     147                                conf.setBoolean("wordcount.skip.patterns", true);
     148                        } else {
     149                                other_args.add(args[i]);
     150                        }
     151                }
     152//              conf.set("mapred.job.tracker", "local");
     153//              conf.set("fs.default.name", "file:///");
     154
     155                FileInputFormat.setInputPaths(job, new Path(other_args.get(0)));
     156                FileOutputFormat.setOutputPath(job, new Path(other_args.get(1)));
     157                job.waitForCompletion(true);
     158
     159                return 0;
     160        }
     161
     162        public static void main(String[] args) throws Exception {
     163//              String[] argv = { "-Dwordcount.case.sensitive=false",
     164//                              "/user/waue/text_input", "/user/waue/output-v020", "-skip",
     165//                              "/user/waue/patterns" };
     166//              args = argv;
     167                int res = ToolRunner.run(new Configuration(), new WordCount2for020(),
     168                                args);
     169                System.exit(res);
     170        }
     171}
     172
     173}}}
     174
     175{{{
     17610/04/16 17:32:35 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
     17710/04/16 17:32:36 INFO input.FileInputFormat: Total input paths to process : 4
     17810/04/16 17:32:36 INFO mapred.JobClient: Running job: job_201003231850_0021
     17910/04/16 17:32:37 INFO mapred.JobClient:  map 0% reduce 0%
     18010/04/16 17:32:44 INFO mapred.JobClient: Task Id : attempt_201003231850_0021_m_000005_0, Status : FAILED
     181java.io.IOException: Task process exit with nonzero status of 126.
     182        at org.apache.hadoop.mapred.TaskRunner.run(TaskRunner.java:418)
     183
     18410/04/16 17:32:44 WARN mapred.JobClient: Error reading task outputhttp://vpro:50060/tasklog?plaintext=true&taskid=attempt_201003231850_0021_m_000005_0&filter=stdout
     185}}}