Changes between Initial Version and Version 1 of waue/2012/WordCountHBase


Ignore:
Timestamp:
Jan 20, 2012, 6:34:07 PM (12 years ago)
Author:
waue
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • waue/2012/WordCountHBase

    v1 v1  
     1{{{
     2#!java
     3// WordCountHBase
     4//說明:
     5//      此程式碼將輸入路徑的檔案內的字串取出做字數統計
     6//      再將結果塞回HTable內
     7//
     8//運算方法:
     9//      將此程式運作在hadoop 1.0.0 + hbase 0.90.5 平台上
     10//
     11//結果:
     12// ---------------------------
     13//      $ hbase shell
     14//      > scan 'wordcount'
     15//      ROW             COLUMN+CELL
     16//      am              column=content:count, timestamp=1264406245488, value=1
     17//  chen        column=content:count, timestamp=1264406245488, value=1
     18//      hi,             column=content:count, timestamp=1264406245488, value=2
     19//  ......(略)
     20// ---------------------------
     21//注意:
     22//1.    在hdfs 上來源檔案的路徑為 "/user/$YOUR_NAME/input"
     23//      請注意必須先放資料到此hdfs上的資料夾內,且此資料夾內只能放檔案,不可再放資料夾
     24//2.    運算完後,程式將執行結果放在hbase的wordcount資料表內
     25//
     26
     27import java.io.IOException;
     28
     29import org.apache.hadoop.conf.Configuration;
     30import org.apache.hadoop.fs.Path;
     31import org.apache.hadoop.hbase.HBaseConfiguration;
     32import org.apache.hadoop.hbase.HColumnDescriptor;
     33import org.apache.hadoop.hbase.HTableDescriptor;
     34import org.apache.hadoop.hbase.MasterNotRunningException;
     35import org.apache.hadoop.hbase.ZooKeeperConnectionException;
     36import org.apache.hadoop.hbase.client.HBaseAdmin;
     37import org.apache.hadoop.hbase.client.Put;
     38import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
     39import org.apache.hadoop.hbase.mapreduce.TableReducer;
     40import org.apache.hadoop.hbase.util.Bytes;
     41import org.apache.hadoop.io.IntWritable;
     42import org.apache.hadoop.io.LongWritable;
     43import org.apache.hadoop.io.Text;
     44import org.apache.hadoop.mapreduce.Job;
     45import org.apache.hadoop.mapreduce.Mapper;
     46import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     47import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
     48
     49public class WordCountHBase {
     50       
     51        public Configuration hbase_config ;
     52        public Configuration hadoop_config;
     53        public HBaseAdmin hbase_admin;
     54        public String input ,tablename;
     55       
     56        public WordCountHBase() throws MasterNotRunningException, ZooKeeperConnectionException{
     57                hbase_config = HBaseConfiguration.create();
     58                hadoop_config = new Configuration();
     59                hbase_admin = new HBaseAdmin(hbase_config);
     60        }
     61       
     62        public void createHBaseTable(String family)
     63                        throws IOException {
     64                // HTableDescriptor contains the name of an HTable, and its column
     65                // families
     66                // HTableDescriptor 用來描述table的屬性
     67                HTableDescriptor htd = new HTableDescriptor(tablename);
     68                // HColumnDescriptor HColumnDescriptor contains information about a
     69                // column family such as the number of versions, compression settings,
     70                // etc.
     71                // HTableDescriptor 透過 add() 方法來加入Column family
     72
     73                htd.addFamily(new HColumnDescriptor(family));
     74
     75                // HBaseConfiguration 能接收 hbase-site.xml 的設定值
     76                // HBaseConfiguration config = new HBaseConfiguration();
     77               
     78
     79                // 檔案的操作則使用 HBaseAdmin
     80               
     81                // 檢查
     82                if (hbase_admin.tableExists(tablename)) {
     83                        System.out.println("Table: " + tablename + "Existed.");
     84                } else {
     85                        System.out.println("create new table: " + tablename);
     86                        // 建立
     87                        hbase_admin.createTable(htd);
     88                }
     89        }
     90
     91        public void drop() {
     92                // HBaseConfiguration conf = new HBaseConfiguration(); // deprecated ,
     93                // v0.90
     94               
     95                try {
     96                        hbase_admin = new HBaseAdmin(hbase_config);
     97                        if (hbase_admin.tableExists(tablename)) {
     98                                hbase_admin.disableTable(tablename);
     99                                hbase_admin.deleteTable(tablename);
     100                                System.out.println("Droped the table [" + tablename + "]");
     101                        } else {
     102                                System.out.println("Table [" + tablename + "] was not found!");
     103                        }
     104
     105                } catch (IOException e) {
     106                        e.printStackTrace();
     107                }
     108        }
     109
     110        public static class HtMap extends
     111                        Mapper<LongWritable, Text, Text, IntWritable> {
     112                private IntWritable one = new IntWritable(1);
     113
     114                public void map(LongWritable key, Text value, Context context)
     115                                throws IOException, InterruptedException {
     116                        // 輸入的字串先轉換成小寫再用空白區隔
     117                        String s[] = value.toString().toLowerCase().trim().split(" ");
     118                        for (String m : s) {
     119                                // 寫入到輸出串流
     120                                context.write(new Text(m), one);
     121                        }
     122                }
     123        }
     124
     125        // TableReducer<KEYIN,VALUEIN,KEYOUT>
     126        // 原本為 TableReducer<Text, IntWritable, NullWritable >
     127        // 但在此改成 LongWritable 也可以
     128        // 因此證明在此的Class可以很多,org.apache.hadoop.io.* 內有write()的Writable class應該皆可
     129        public static class HtReduce extends
     130                        TableReducer<Text, IntWritable, LongWritable> {
     131
     132                public void reduce(Text key, Iterable<IntWritable> values,
     133                                Context context) throws IOException, InterruptedException {
     134                        int sum = 0;
     135                        for (IntWritable i : values) {
     136                                sum += i.get();
     137                        }
     138
     139                        // org.apache.hadoop.hbase.client.Put
     140                        // Used to perform Put operations for a single row.
     141                        // new Put(byte[] row)
     142                        Put put = new Put(Bytes.toBytes(key.toString()));
     143
     144                        // add(byte[] family, byte[] qualifier, byte[] value)
     145                        // 在main設定output format class 為 TableOutputFormat
     146                        // TableReducer 內有定義 output Key class 必須為 Put 或 Delete
     147                        put.add(Bytes.toBytes("content"), Bytes.toBytes("word"),
     148                                        Bytes.toBytes(key.toString()));
     149                        put.add(Bytes.toBytes("content"), Bytes.toBytes("count"),
     150                                        Bytes.toBytes(String.valueOf(sum)));
     151
     152                        // NullWritable.get(): Returns the single instance of this class.
     153                        // NullWritable.write(): Serialize the fields of this object to out.
     154                        context.write(new LongWritable(), put);
     155                        // context.write(NullWritable.get(), put)
     156                }
     157        }
     158        public boolean run() throws IOException, InterruptedException, ClassNotFoundException{
     159                Job job = new Job(hadoop_config, "WordCount table with " + input);
     160
     161                job.setJarByClass(WordCountHBase.class);
     162
     163                job.setMapperClass(HtMap.class);
     164                job.setReducerClass(HtReduce.class);
     165                // 此範例的輸出為 <Text,IntWritable> 因此其實可以省略宣告
     166                // set{Map|Reduce}Output{Key|Value}Class()
     167                job.setMapOutputKeyClass(Text.class);
     168                job.setMapOutputValueClass(IntWritable.class);
     169                // InputFormat 只有一個子介面
     170                // FileInputFormat <-(SequenceFileInputFormat,TextInputFormat)
     171                // 其中TextInputFormat 最常用 ,預設輸入為 LongWritable,Text
     172                // 另外HBase 則設計了一個子類別 TableInputFormat
     173                job.setInputFormatClass(TextInputFormat.class);
     174                // TAbleOutputFormat
     175                // 宣告此行則可使 reduce 輸出為 HBase 的table
     176                job.setOutputFormatClass(TableOutputFormat.class);
     177
     178                // 原本設定輸入檔案為 Config.setInputPath(Path) 卻改為
     179                // FileInputFormat.addInputPath(Job, Path()) 的設計,
     180                // 猜測應該是考慮某些檔案操作並不需要跑mapreduce的Job,因此提到外面
     181                FileInputFormat.addInputPath(job, new Path(input));
     182
     183                return job.waitForCompletion(true) ? true : false;
     184        }
     185        public static void main(String argv[]) throws Exception {
     186                // debug
     187                String[] argc = { "/tmp/ProductEmp/input/" };
     188                argv = argc;
     189               
     190                WordCountHBase wchb = new WordCountHBase();
     191               
     192
     193                if (argv.length < 1) {
     194                        System.out.println("CountToHBaseReducer <inHdfsDir> ");
     195                        return;
     196                }
     197
     198                wchb.input = argv[0];
     199
     200                wchb.tablename = "wordcounthbase";
     201
     202                // OUTPUT_TABLE = "hbase.mapred.outputtable"
     203                // conf.set 用於設定 如 core-site.xml 的 name 與 value
     204                // 告訴程式 hbase.mapred.outputtable --> wordcount
     205               
     206                wchb.hadoop_config.set(TableOutputFormat.OUTPUT_TABLE, wchb.tablename);
     207
     208               
     209                // 建立hbase 的table 否則沒先建立會出錯
     210                wchb.drop();
     211                wchb.createHBaseTable( "content");
     212
     213                wchb.run();
     214        }
     215}
     216}}}