Changes between Initial Version and Version 1 of waue/2010/0416


Ignore:
Timestamp:
Apr 16, 2010, 10:23:08 AM (14 years ago)
Author:
waue
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • waue/2010/0416

    v1 v1  
     1
     2{{{
     3#!java
     4import java.io.BufferedReader;
     5import java.io.FileReader;
     6import java.io.IOException;
     7import java.util.ArrayList;
     8import java.util.HashSet;
     9import java.util.Iterator;
     10import java.util.List;
     11import java.util.Set;
     12import java.util.StringTokenizer;
     13
     14import org.apache.hadoop.conf.Configuration;
     15import org.apache.hadoop.conf.Configured;
     16import org.apache.hadoop.filecache.DistributedCache;
     17import org.apache.hadoop.fs.Path;
     18import org.apache.hadoop.io.IntWritable;
     19import org.apache.hadoop.io.LongWritable;
     20import org.apache.hadoop.io.Text;
     21import org.apache.hadoop.mapreduce.Job;
     22import org.apache.hadoop.mapreduce.Mapper;
     23import org.apache.hadoop.mapreduce.Reducer;
     24import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     25import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     26import org.apache.hadoop.util.StringUtils;
     27import org.apache.hadoop.util.Tool;
     28import org.apache.hadoop.util.ToolRunner;
     29
     30public class WordCountV020 extends Configured implements Tool {
     31
     32        static Set<String> patternsToSkip = new HashSet<String>();
     33
     34        static void parseSkipFile(Path patternsFile) {
     35                try {
     36                        BufferedReader fis = new BufferedReader(new FileReader(patternsFile
     37                                        .toString()));
     38                        String pattern = null;
     39                        while ((pattern = fis.readLine()) != null) {
     40                                patternsToSkip.add(pattern);
     41                        }
     42                } catch (IOException ioe) {
     43                        System.err
     44                                        .println("Caught exception while parsing the cached file '"
     45                                                        + patternsFile + "' : "
     46                                                        + StringUtils.stringifyException(ioe));
     47                }
     48        }
     49
     50        public static class Map extends
     51                        Mapper<LongWritable, Text, Text, IntWritable> {
     52
     53                static enum Counters {
     54                        INPUT_WORDS
     55                }
     56
     57                boolean caseSensitive;
     58
     59                private final static IntWritable one = new IntWritable(1);
     60                private Text word = new Text();
     61
     62                public void setup(Context context) {
     63                        Configuration conf = context.getConfiguration();
     64                        caseSensitive = conf.getBoolean("wordcount.case.sensitive", true);
     65                }
     66
     67                public void map(LongWritable key, Text value, Context context)
     68                                throws IOException, InterruptedException {
     69
     70                        String line = (caseSensitive) ? value.toString() : value.toString()
     71                                        .toLowerCase();
     72
     73                        for (String pattern : patternsToSkip) {
     74                                line = line.replaceAll(pattern, "");
     75                        }
     76
     77                        StringTokenizer tokenizer = new StringTokenizer(line);
     78                        while (tokenizer.hasMoreTokens()) {
     79                                word.set(tokenizer.nextToken());
     80                                context.write(word, one);
     81
     82                        }
     83
     84                }
     85        }
     86
     87        public static class Reduce extends
     88                        Reducer<Text, IntWritable, Text, IntWritable> {
     89                public void reduce(Text key, Iterator<IntWritable> values,
     90                                Context context) throws IOException, InterruptedException {
     91                        int sum = 0;
     92                        while (values.hasNext()) {
     93                                sum += values.next().get();
     94                        }
     95                        context.write(key, new IntWritable(sum));
     96                }
     97        }
     98
     99        public int run(String[] args) throws Exception {
     100
     101                Configuration conf = new Configuration();
     102                // 宣告job 取得conf 並設定名稱 Hadoop Hello World
     103                Job job = new Job(conf, "Hadoop Hello World");
     104                // 設定此運算的主程式
     105                job.setJarByClass(WordCountV020.class);
     106
     107                job.setOutputKeyClass(Text.class);
     108                job.setOutputValueClass(IntWritable.class);
     109
     110                job.setMapperClass(Map.class);
     111                job.setCombinerClass(Reduce.class);
     112                job.setReducerClass(Reduce.class);
     113
     114                List<String> other_args = new ArrayList<String>();
     115
     116                for (int i = 0; i < args.length; ++i) {
     117                        if ("-skip".equals(args[i])) {
     118                                DistributedCache
     119                                                .addCacheFile(new Path(args[++i]).toUri(), conf);
     120                                conf.setBoolean("wordcount.skip.patterns", true);
     121                        } else {
     122                                other_args.add(args[i]);
     123                        }
     124                }
     125
     126                Path[] patternsFiles = null;
     127                if (conf.getBoolean("wordcount.skip.patterns", false)) {
     128
     129                        try {
     130                                patternsFiles = DistributedCache.getLocalCacheFiles(conf);
     131
     132                        } catch (Exception ioe) {
     133                                System.err
     134                                                .println("Caught exception while getting cached files: "
     135                                                                + StringUtils.stringifyException(ioe));
     136                        }
     137                        if (patternsFiles != null) {
     138
     139                                for (Path patternsFile : patternsFiles) {
     140                                        System.err.println(patternsFile);
     141                                        parseSkipFile(patternsFile);
     142                                }
     143                        } else {
     144                                System.err.print("no path");
     145                        }
     146
     147                }
     148
     149                FileInputFormat.setInputPaths(job, new Path(other_args.get(0)));
     150                // 設定輸出路徑
     151                FileOutputFormat.setOutputPath(job, new Path(other_args.get(1)));
     152
     153                // CheckAndDelete.checkAndDelete(other_args.get(1), conf);
     154                job.waitForCompletion(true);
     155                return 0;
     156        }
     157
     158        public static void main(String[] args) throws Exception {
     159                String[] argv = { "-Dwordcount.case.sensitive=false",
     160                                "/user/waue/input", "/user/waue/output-v020", "-skip",
     161                                "/user/waue/patterns" };
     162                args = argv;
     163                int res = ToolRunner
     164                                .run(new Configuration(), new WordCountV020(), args);
     165                System.exit(res);
     166        }
     167}
     168
     169
     170}}}