Changes between Initial Version and Version 1 of waue/2010/0418


Ignore:
Timestamp:
Apr 19, 2010, 10:00:46 AM (14 years ago)
Author:
waue
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • waue/2010/0418

    v1 v1  
     1 = code v0.20 =
     2
     3{{{
     4#!java
     5import java.io.BufferedReader;
     6import java.io.FileReader;
     7import java.io.IOException;
     8import java.util.ArrayList;
     9import java.util.HashSet;
     10import java.util.Iterator;
     11import java.util.List;
     12import java.util.Set;
     13import java.util.StringTokenizer;
     14
     15import org.apache.hadoop.conf.Configuration;
     16import org.apache.hadoop.conf.Configured;
     17import org.apache.hadoop.filecache.DistributedCache;
     18import org.apache.hadoop.fs.Path;
     19import org.apache.hadoop.io.IntWritable;
     20import org.apache.hadoop.io.LongWritable;
     21import org.apache.hadoop.io.Text;
     22import org.apache.hadoop.mapred.OutputCollector;
     23import org.apache.hadoop.mapred.Reporter;
     24import org.apache.hadoop.mapreduce.Job;
     25import org.apache.hadoop.mapreduce.Mapper;
     26import org.apache.hadoop.mapreduce.Reducer;
     27import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     28import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
     29import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     30import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
     31import org.apache.hadoop.util.StringUtils;
     32import org.apache.hadoop.util.Tool;
     33import org.apache.hadoop.util.ToolRunner;
     34
     35public class WordCount2for020 extends Configured implements Tool {
     36
     37        public static class Map extends
     38                        Mapper<LongWritable, Text, Text, IntWritable> {
     39
     40                static enum Counters {
     41                        INPUT_WORDS
     42                }
     43
     44                private final static IntWritable one = new IntWritable(1);
     45                private Text word = new Text();
     46
     47                private boolean caseSensitive = true;
     48                private Set<String> patternsToSkip = new HashSet<String>();
     49
     50                private long numRecords = 0;
     51                private String inputFile;
     52
     53                public void setup(Context context) {
     54                        Configuration conf = context.getConfiguration();
     55                        caseSensitive = conf.getBoolean("wordcount.case.sensitive", true);
     56                        inputFile = conf.get("map.input.file");
     57
     58                        if (conf.getBoolean("wordcount.skip.patterns", false)) {
     59                                Path[] patternsFiles = new Path[0];
     60                                try {
     61                                        patternsFiles = DistributedCache.getLocalCacheFiles(conf);
     62                                } catch (IOException ioe) {
     63                                        System.err
     64                                                        .println("Caught exception while getting cached files: "
     65                                                                        + StringUtils.stringifyException(ioe));
     66                                }
     67                                for (Path patternsFile : patternsFiles) {
     68                                        parseSkipFile(patternsFile);
     69                                }
     70                        }
     71                }
     72
     73                private void parseSkipFile(Path patternsFile) {
     74                        try {
     75                                BufferedReader fis = new BufferedReader(new FileReader(
     76                                                patternsFile.toString()));
     77                                String pattern = null;
     78                                while ((pattern = fis.readLine()) != null) {
     79                                        patternsToSkip.add(pattern);
     80                                }
     81                        } catch (IOException ioe) {
     82                                System.err
     83                                                .println("Caught exception while parsing the cached file '"
     84                                                                + patternsFile
     85                                                                + "' : "
     86                                                                + StringUtils.stringifyException(ioe));
     87                        }
     88                }
     89
     90                public void map(LongWritable key, Text value,
     91                                OutputCollector<Text, IntWritable> output, Reporter reporter)
     92                                throws IOException {
     93                        String line = (caseSensitive) ? value.toString() : value.toString()
     94                                        .toLowerCase();
     95
     96                        for (String pattern : patternsToSkip) {
     97                                line = line.replaceAll(pattern, "");
     98                        }
     99
     100                        StringTokenizer tokenizer = new StringTokenizer(line);
     101                        while (tokenizer.hasMoreTokens()) {
     102                                word.set(tokenizer.nextToken());
     103                                output.collect(word, one);
     104                                reporter.incrCounter(Counters.INPUT_WORDS, 1);
     105                        }
     106
     107                        if ((++numRecords % 100) == 0) {
     108                                reporter.setStatus("Finished processing " + numRecords
     109                                                + " records " + "from the input file: " + inputFile);
     110                        }
     111                }
     112        }
     113
     114        public static class Reduce extends
     115                        Reducer<Text, IntWritable, Text, IntWritable> {
     116                public void reduce(Text key, Iterator<IntWritable> values,
     117                                OutputCollector<Text, IntWritable> output, Reporter reporter)
     118                                throws IOException {
     119                        int sum = 0;
     120                        while (values.hasNext()) {
     121                                sum += values.next().get();
     122                        }
     123                        output.collect(key, new IntWritable(sum));
     124                }
     125        }
     126
     127        public int run(String[] args) throws Exception {
     128                Configuration conf = new Configuration();
     129
     130                Job job = new Job(conf);
     131                job.setJarByClass(WordCount2for020.class);
     132                job.setJobName("wordcount");
     133               
     134                job.setMapOutputKeyClass(Text.class);
     135                job.setMapOutputValueClass(IntWritable.class);
     136                job.setOutputKeyClass(Text.class);
     137                job.setOutputValueClass(IntWritable.class);
     138
     139                job.setMapperClass(Map.class);
     140                job.setCombinerClass(Reduce.class);
     141                job.setReducerClass(Reduce.class);
     142
     143                job.setInputFormatClass(TextInputFormat.class);
     144                job.setOutputFormatClass(TextOutputFormat.class);
     145
     146                List<String> other_args = new ArrayList<String>();
     147                for (int i = 0; i < args.length; ++i) {
     148                        if ("-skip".equals(args[i])) {
     149                                DistributedCache
     150                                                .addCacheFile(new Path(args[++i]).toUri(), conf);
     151                                conf.setBoolean("wordcount.skip.patterns", true);
     152                        } else {
     153                                other_args.add(args[i]);
     154                        }
     155                }
     156//              conf.set("mapred.job.tracker", "local");
     157//              conf.set("fs.default.name", "file:///");
     158
     159                FileInputFormat.setInputPaths(job, new Path(other_args.get(0)));
     160                FileOutputFormat.setOutputPath(job, new Path(other_args.get(1)));
     161                job.waitForCompletion(true);
     162
     163                return 0;
     164        }
     165
     166        public static void main(String[] args) throws Exception {
     167                String[] argv = { "-Dwordcount.case.sensitive=false",
     168                                "/user/waue/text_input", "/user/waue/output-v020", "-skip",
     169                                "/user/waue/patterns" };
     170                args = argv;
     171                int res = ToolRunner.run(new Configuration(), new WordCount2for020(),
     172                                args);
     173                System.exit(res);
     174        }
     175}
     176}}}
     177 = error message =
     178{{{
     17910/04/19 09:57:17 INFO input.FileInputFormat: Total input paths to process : 4
     18010/04/19 09:57:17 INFO mapred.JobClient: Running job: job_201004190849_0004
     18110/04/19 09:57:18 INFO mapred.JobClient:  map 0% reduce 0%
     18210/04/19 09:57:28 INFO mapred.JobClient: Task Id : attempt_201004190849_0004_m_000000_0, Status : FAILED
     183java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, recieved org.apache.hadoop.io.LongWritable
     184        at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:807)
     185        at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:504)
     186        at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
     187        at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124)
     188        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
     189        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:583)
     190        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305)
     191        at org.apache.hadoop.mapred.Child.main(Child.java:170)
     192}}}