source: sample/WordCountFromHBase.java @ 18

Last change on this file since 18 was 18, checked in by waue, 16 years ago

upgrade 0.16 to 0.17

File size: 6.3 KB
RevLine 
[7]1/**
2 * Program: WordCountFromHBase.java
3 * Editor: Waue Chen
4 * From :  NCHC. Taiwn
[18]5 * Last Update Date: 07/02/2008
6 * Upgrade to 0.17
[7]7 */
8
9/**
10 * Purpose :
[9]11 *  Word counting from Hbase then store result in Hadoop file system
[7]12 *
13 * HowToUse :
[9]14 *  Make sure Hadoop file system are running and HBase has correct data.
15 *  Suggest to run WordCountIntoHBase first.
16 *  finally, modify these setup parameters and run.
[7]17 *
18 * Check Result:
[9]19 * 
20 *  inspect http://localhost:50070 by web explorer
[7]21 */
22
[8]23package tw.org.nchc.code;
[7]24
25import java.io.IOException;
26import java.util.Iterator;
27import java.util.StringTokenizer;
[18]28
[7]29import org.apache.hadoop.fs.FileSystem;
30import org.apache.hadoop.fs.Path;
31import org.apache.hadoop.hbase.HStoreKey;
32import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
33import org.apache.hadoop.hbase.mapred.TableInputFormat;
34import org.apache.hadoop.hbase.mapred.TableMap;
35import org.apache.hadoop.io.IntWritable;
36import org.apache.hadoop.io.MapWritable;
37import org.apache.hadoop.io.Text;
38import org.apache.hadoop.mapred.JobClient;
39import org.apache.hadoop.mapred.JobConf;
40import org.apache.hadoop.mapred.MapReduceBase;
41import org.apache.hadoop.mapred.OutputCollector;
42import org.apache.hadoop.mapred.Reducer;
43import org.apache.hadoop.mapred.Reporter;
44@SuppressWarnings("unused")
45
46public class WordCountFromHBase {
47  /* setup parameters */
48  // set the output path
49  static String outputPath = "counts2";
50
51  // org.apache.hadoop.hbase.mapred.TableMap<K,V>  \
52  // TableMap<K extends org.apache.hadoop.io.WritableComparable, \
53  //  V extends org.apache.hadoop.io.Writable> \
54  // Scan an HBase table to sort by a specified sort column. \
55  // If the column does not exist, the record is not passed to Reduce.;
56  private static class MapClass extends TableMap<Text, IntWritable> {
57
58    // set one as (IntWritable)1
59    private final static IntWritable one = new IntWritable(1);
60    // set column
61    private final static Text textcol = new Text(WordCountIntoHBase.colstr);
62    private Text word = new Text();   
63    // TableMap is a interface, map is a abstract method. now, we should \
64    //  inprement map() at here, format is : \
65    // map(HStoreKey key, MapWritable value,  \
66    //  OutputCollector<K,V> output, Reporter reporter) ;
67        // Call a user defined function on a single HBase record, \ 
68    //  represented by a key and its associated record value. ;
69    public void map(HStoreKey key, MapWritable cols,
70        OutputCollector<Text, IntWritable> output, Reporter reporter)
71        throws IOException {
72      //
73      // The first get() is : Writable <- get(Object key) \
74      //  get in interface Map<Writable,Writable>  ;
75      // Use ImmutableBytesWritable to downcast Writable \
76      // The second get() is : byte[] <- get() \
77      //  Get the data from the BytesWritable. ;
78      // Text.decode is parse UTF-8 code to a String ;
79      // per "line" is per row data in HTable
80      String line = Text.decode( ((ImmutableBytesWritable) cols.get(textcol) )
81          .get() );
[9]82     
[7]83      //let us know what is "line"
84      /*
85      RandomAccessFile raf =
86        new RandomAccessFile("/home/waue/mr-result.txt","rw");
87      raf.seek(raf.length()); // move pointer to end
88      raf.write(("\n"+line).getBytes());
89      raf.close();
90      *///end
91      // the result is the contents of merged files "
92     
[9]93      //StringTokenizer will divide a line into a word 
[7]94      StringTokenizer itr = new StringTokenizer(line);
95      // set every word as one
96      while (itr.hasMoreTokens()) {
[9]97        // nextToken will return this value in String and point to next \
98        // Text.set() = Set to contain the contents of a string.
99        word.set(itr.nextToken()); 
100        // OutputCollector.collect = collect(K key, V value) \
101        //  Adds a key/value pair to the output.
[7]102        output.collect(word, one);
103      }
104    }
105  }
106
107  // reducer: sums up all the counts
108  private static class ReduceClass extends MapReduceBase implements
109      Reducer<Text, IntWritable, Text, IntWritable> {
110
111    // reuse objects
112    private final static IntWritable SumValue = new IntWritable();
[9]113   
114    // this sample's reduce() format is the same as map() \
115    //  reduce is a method waiting for implement \
116    //  four type in this sample is (Text , Iterator<IntWritable>, \
117    //    OutputCollector<Text, IntWritable> , Reporter ) ;
[7]118    public void reduce(Text key, Iterator<IntWritable> values,
119        OutputCollector<Text, IntWritable> output, Reporter reporter)
120        throws IOException {
[9]121      // sum up value
[7]122      int sum = 0;
[9]123      // "key" is word , "value" is sum
124      // why values.hasNext(), not key.hasNext()
125      while (values.hasNext()) { 
126        // next() will return this value and pointer to next event \
127        //  IntWritable.get() will transfer IntWritable to Int
128        sum += values.next().get(); 
[7]129      }
[9]130      // IntWritable.set(int) will transfer Int to IntWritable
[7]131      SumValue.set(sum);
[9]132      // hense we set outputPath in main, the output.collect will put
133      //  data in Hadoop
[7]134      output.collect(key, SumValue);
135    }
136  }
137
138  private WordCountFromHBase() {
139  }
140
141  /**
142   * Runs the demo.
143   */
144  public static void main(String[] args) throws IOException {
145   
146
147    int mapTasks = 1;
148    int reduceTasks = 1;
149    // initialize job;
150    JobConf conf = new JobConf(WordCountFromHBase.class);
151    // TableMap.initJob will build HBase code \
152    //  "org.apache.hadoop.hbase.mapred.TableMap".initJob \
153    //  (Table_name,column_string,Which_class_will_use,job_configure);
154    TableMap.initJob(WordCountIntoHBase.Table_Name,
155        WordCountIntoHBase.colstr, MapClass.class, conf);
156    conf.setJobName(WordCountIntoHBase.Table_Name + "store");
157    conf.setNumMapTasks(mapTasks);
158    conf.setNumReduceTasks(reduceTasks);
159   
160    //Set the key class for the job output data.
161    conf.setOutputKeyClass(Text.class);
162    //Set the value class for job outputs.
163    conf.setOutputValueClass(IntWritable.class);
164    // MapperClass,CombinerClass,ReducerClass are essential
165    conf.setMapperClass(MapClass.class);
166    conf.setCombinerClass(ReduceClass.class);
167    conf.setReducerClass(ReduceClass.class);
168    // input is Hbase format => TableInputFormat
169    conf.setInputFormat(TableInputFormat.class);
[18]170    // 0.16
171//    conf.setOutputPath(new Path(outputPath));
172    Convert.setOutputPath(conf, new Path(outputPath));
[7]173//     delete the old path with the same name
[18]174    FileSystem.get(conf).delete(new Path(outputPath),true);
[7]175    JobClient.runJob(conf);
176  }
177}
Note: See TracBrowser for help on using the repository browser.