Changes between Version 6 and Version 7 of NCHCCloudCourse100928_4_EXM5


Ignore:
Timestamp:
Aug 2, 2011, 4:36:03 PM (13 years ago)
Author:
waue
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • NCHCCloudCourse100928_4_EXM5

    v6 v7  
    1313{{{
    1414#!text
    15 WordCountV2
     15WordCountV20
    1616說明:
    17         用於字數統計,並且增加略過大小寫辨識、符號篩除等功能
     17        用於字數統計,並且增加略過大小寫辨識、符號篩除等功能 [已全改為 hadoop 0.20 API ]
    1818
    1919測試方法:
    2020        將此程式運作在hadoop 0.20 平台上,執行:
    2121        ---------------------------
    22         hadoop jar WordCountV2.jar -Dwordcount.case.sensitive=false \
    23         <input> <output> -skip patterns/patterns.txt
     22        hadoop jar WordCountV2.jar "/home/nchc/input" "/home/nchc/output-wc2" "-c"  "-skip" "/home/nchc/patterns/patterns.txt"
    2423        ---------------------------
    2524
    2625注意:
    27 1. 在hdfs 上來源檔案的路徑為 你所指定的 <input>
    28    請注意必須先放資料到此hdfs上的資料夾內,且此資料夾內只能放檔案,不可再放資料夾
    29 2. 運算完後,程式將執行結果放在hdfs 的輸出路徑為 你所指定的 <output>
    30 3. 請建立一個資料夾 pattern 並在裡面放置pattern.txt,內容如下(一行一個,前置提示符號\)
     261. 以在程式內設定<input> <output> 路徑為local 的 "/home/nchc/input" "/home/nchc/output-wc2"
     272. 若要測試 skip功能,請建立一個"/home/nchc/patterns/patterns.txt" 檔,內容如下(一行一個,前置提示符號\)
    3128        \.
    3229        \,
    3330        \!
    34 
     313. 若要測試過濾大小寫功能,請加入 -c 參數(有-c 代表 "不考慮大小寫" )
     324. 注意 DistributedCache , setup() , conf 參數傳遞於 main, mapper, setup 中
    3533}}}
    3634
     
    3937{{{
    4038#!java
     39
    4140package org.nchc.hadoop;
     41
    4242import java.io.BufferedReader;
    4343import java.io.FileReader;
    4444import java.io.IOException;
    45 import java.util.ArrayList;
    4645import java.util.HashSet;
    47 import java.util.Iterator;
    48 import java.util.List;
    4946import java.util.Set;
    5047import java.util.StringTokenizer;
    5148
    5249import org.apache.hadoop.conf.Configuration;
    53 import org.apache.hadoop.conf.Configured;
    5450import org.apache.hadoop.filecache.DistributedCache;
    5551import org.apache.hadoop.fs.Path;
     
    5753import org.apache.hadoop.io.LongWritable;
    5854import org.apache.hadoop.io.Text;
    59 import org.apache.hadoop.mapred.FileInputFormat;
    60 import org.apache.hadoop.mapred.FileOutputFormat;
    61 import org.apache.hadoop.mapred.JobClient;
    62 import org.apache.hadoop.mapred.JobConf;
    63 import org.apache.hadoop.mapred.MapReduceBase;
    64 import org.apache.hadoop.mapred.Mapper;
    65 import org.apache.hadoop.mapred.OutputCollector;
    66 import org.apache.hadoop.mapred.Reducer;
    67 import org.apache.hadoop.mapred.Reporter;
    68 import org.apache.hadoop.mapred.TextInputFormat;
    69 import org.apache.hadoop.mapred.TextOutputFormat;
    70 import org.apache.hadoop.util.GenericOptionsParser;
     55import org.apache.hadoop.mapreduce.Job;
     56import org.apache.hadoop.mapreduce.Mapper;
     57import org.apache.hadoop.mapreduce.Reducer;
     58import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     59import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    7160import org.apache.hadoop.util.StringUtils;
    72 import org.apache.hadoop.util.Tool;
    73 import org.apache.hadoop.util.ToolRunner;
    7461
    75 @SuppressWarnings("deprecation")
    76 public class WordCountV2 extends Configured implements Tool {
     62public class WordCountV20 {
    7763
    78         public static class Map extends MapReduceBase implements
     64        public static class Map extends
    7965                        Mapper<LongWritable, Text, Text, IntWritable> {
    80 
    8166                static enum Counters {
    8267                        INPUT_WORDS
     
    8873                private boolean caseSensitive = true;
    8974                private Set<String> patternsToSkip = new HashSet<String>();
    90 
    91                 private long numRecords = 0;
    92                 private String inputFile;
    93 
    94                 public void configure(JobConf job) {
    95                         caseSensitive = job.getBoolean("wordcount.case.sensitive", true);
    96                         inputFile = job.get("map.input.file");
    97 
    98                         if (job.getBoolean("wordcount.skip.patterns", false)) {
    99                                 Path[] patternsFiles = new Path[0];
    100                                 try {
    101                                         patternsFiles = DistributedCache.getLocalCacheFiles(job);
    102                                 } catch (IOException ioe) {
    103                                         System.err
    104                                                         .println("Caught exception while getting cached files: "
    105                                                                         + StringUtils.stringifyException(ioe));
    106                                 }
    107                                 for (Path patternsFile : patternsFiles) {
    108                                         parseSkipFile(patternsFile);
    109                                 }
    110                         }
    111                 }
    11275
    11376                private void parseSkipFile(Path patternsFile) {
     
    12891                }
    12992
    130                 public void map(LongWritable key, Text value,
    131                                 OutputCollector<Text, IntWritable> output, Reporter reporter)
    132                                 throws IOException {
     93                @Override
     94                public void setup(Context context) {
     95                        Configuration conf = context.getConfiguration();
     96                        caseSensitive = conf.getBoolean("wordcount.case.sensitive", true);
     97
     98                        if (conf.getBoolean("wordcount.skip.patterns", false)) {
     99                                Path[] patternsFiles = new Path[0];
     100                                try {
     101                                        patternsFiles = DistributedCache.getLocalCacheFiles(conf);
     102                                } catch (IOException ioe) {
     103                                        System.err
     104                                                        .println("Caught exception while getting cached files: "
     105                                                                        + StringUtils.stringifyException(ioe));
     106                                }
     107                                for (Path patternsFile : patternsFiles) {
     108                                        parseSkipFile(patternsFile);
     109                                }
     110                        }
     111                }
     112
     113                @Override
     114                public void map(LongWritable key, Text value, Context context)
     115                                throws IOException, InterruptedException {
     116
    133117                        String line = (caseSensitive) ? value.toString() : value.toString()
    134118                                        .toLowerCase();
     
    141125                        while (tokenizer.hasMoreTokens()) {
    142126                                word.set(tokenizer.nextToken());
    143                                 output.collect(word, one);
    144                                 reporter.incrCounter(Counters.INPUT_WORDS, 1);
     127                                context.write(word, one);
     128
    145129                        }
    146130
    147                         if ((++numRecords % 100) == 0) {
    148                                 reporter.setStatus("Finished processing " + numRecords
    149                                                 + " records " + "from the input file: " + inputFile);
    150                         }
    151131                }
    152132        }
    153133
    154         public static class Reduce extends MapReduceBase implements
     134        public static class Reduce extends
    155135                        Reducer<Text, IntWritable, Text, IntWritable> {
    156                 public void reduce(Text key, Iterator<IntWritable> values,
    157                                 OutputCollector<Text, IntWritable> output, Reporter reporter)
    158                                 throws IOException {
     136                private IntWritable result = new IntWritable();
     137
     138                @Override
     139                public void reduce(Text key, Iterable<IntWritable> values,
     140                                Context context) throws IOException, InterruptedException {
    159141                        int sum = 0;
    160                         while (values.hasNext()) {
    161                                 sum += values.next().get();
     142                        for (IntWritable val : values) {
     143                                sum += val.get();
    162144                        }
    163                         output.collect(key, new IntWritable(sum));
     145                        result.set(sum);
     146                        context.write(key, result);
    164147                }
    165148        }
    166149
    167         public int run(String[] args) throws Exception {
     150        public static void main(String[] args) throws Exception {
     151                String[] argv = { "/home/nchc/input", "/home/nchc/output-wc2", "-c",  "-skip",
     152                                "/home/nchc/patterns/patterns.txt" };
    168153
    169                 JobConf conf = new JobConf(getConf(), WordCount.class);
    170                 conf.setJobName("wordcount");
    171                 String[] otherArgs = new GenericOptionsParser(conf, args)
    172                                 .getRemainingArgs();
    173                 if (otherArgs.length < 2) {
    174                         System.out.println("WordCountV2 [-Dwordcount.case.sensitive=<false|true>] \\ ");
    175                         System.out.println("            <inDir> <outDir> [-skip Pattern_file]");
    176                         return 0;
     154                args = argv;
     155                Configuration conf = new Configuration();
     156                conf.set("mapred.job.tracker", "local"); // for single
     157                conf.set("fs.default.name", "file:///"); // for single
     158
     159                if (args.length < 2) {
     160                        System.err
     161                                        .println("Usage: hadoop jar WordCount.jar <input> <output> [-c] [-skip <path>]");
     162                        System.exit(2);
    177163                }
    178                 conf.setOutputKeyClass(Text.class);
    179                 conf.setOutputValueClass(IntWritable.class);
    180164
    181                 conf.setMapperClass(Map.class);
    182                 conf.setCombinerClass(Reduce.class);
    183                 conf.setReducerClass(Reduce.class);
    184 
    185                 conf.setInputFormat(TextInputFormat.class);
    186                 conf.setOutputFormat(TextOutputFormat.class);
    187 
    188                 List<String> other_args = new ArrayList<String>();
    189165                for (int i = 0; i < args.length; ++i) {
    190166                        if ("-skip".equals(args[i])) {
     
    192168                                                .addCacheFile(new Path(args[++i]).toUri(), conf);
    193169                                conf.setBoolean("wordcount.skip.patterns", true);
    194                         } else {
    195                                 other_args.add(args[i]);
     170                        }
     171                        if ("-c".equals(args[i])){
     172                                conf.setBoolean("wordcount.case.sensitive", false);
    196173                        }
    197174                }
    198175
    199                 FileInputFormat.setInputPaths(conf, new Path(other_args.get(0)));
    200                 FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
    201                 CheckAndDelete.checkAndDelete(other_args.get(1), conf);
    202                 JobClient.runJob(conf);
    203                 return 0;
    204         }
     176                CheckAndDelete.checkAndDelete(args[1], conf);
     177                Job job = new Job(conf, "Word Count");
     178                job.setJarByClass(WordCountV20.class);
     179                job.setMapperClass(Map.class);
     180                job.setCombinerClass(Reduce.class);
     181                job.setReducerClass(Reduce.class);
     182                job.setOutputKeyClass(Text.class);
     183                job.setOutputValueClass(IntWritable.class);
     184                FileInputFormat.addInputPath(job, new Path(args[0]));
     185                FileOutputFormat.setOutputPath(job, new Path(args[1]));
    205186
    206         public static void main(String[] args) throws Exception {
    207 //              String[] argv = { "-Dwordcount.case.sensitive=false", "/user/hadoop/input",
    208 //                              "/user/hadoop/output-wc2", "-skip", "/user/hadoop/patterns/patterns.txt" };
    209 //              args = argv;
    210 
    211                 int res = ToolRunner.run(new Configuration(), new WordCountV2(), args);
    212                 System.exit(res);
     187                System.exit(job.waitForCompletion(true) ? 0 : 1);
    213188        }
    214189}
    215190
    216191}}}
    217 
    218 
    219 [wiki:wc2 v.0.20]