Changes between Initial Version and Version 1 of waue/2008/0701


Ignore:
Timestamp:
Jun 29, 2010, 11:27:03 AM (14 years ago)
Author:
waue
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • waue/2008/0701

    v1 v1  
     1 = 程式宣告 =
     2 * Program: HBaseRecordPro.java
     3 * Editor: Waue Chen
     4 * From :  NCHC. Taiwn
     5 * Last Update Date: 07/01/2008
     6-----------------
     7 = 程式功能 =
     8 * Program would parse your record and create Hbase
     9 * Then it sets the first line as column qualify
     10 * Finally it stores in HBase automatically.
     11 
     12 = 如何使用  =
     13 *      Make sure two thing :
     14 *      1. source_file must be regular as follow:
     15 *              first line: qualify1:qualify2:...:qualifyN
     16 *              other line: records1:records2:...:recordsN
     17 *     2. source_file path must be correct.
     18
     19 = 原始檔內容 =
     20{{{
     21name:locate:years
     22waue:taiwan:1981
     23rock:taiwan:1981
     24aso:taiwan:1981
     25jazz:taiwan:1982
     26}}}
     27
     28 = 結果 =
     29{{{
     30 *      Go to hbase console, type :
     31 *              hql > select * from t1_table;
     32 08/06/06 12:20:48 INFO hbase.HTable: Creating scanner over t1_table starting at key
     33
     34+-------------------------+-------------------------+-------------------------+
     35| Row                     | Column                  | Cell                    |
     36+-------------------------+-------------------------+-------------------------+
     37| 0                       | member:locate           | taiwan                  |
     38+-------------------------+-------------------------+-------------------------+
     39| 0                       | member:name             | waue                    |
     40+-------------------------+-------------------------+-------------------------+
     41| 0                       | member:years            | 1981                    |
     42+-------------------------+-------------------------+-------------------------+
     43| 17                      | member:locate           | taiwan                  |
     44+-------------------------+-------------------------+-------------------------+
     45| 17                      | member:name             | rock                    |
     46+-------------------------+-------------------------+-------------------------+
     47| 17                      | member:years            | 1981                    |
     48+-------------------------+-------------------------+-------------------------+
     49| 34                      | member:locate           | taiwan                  |
     50+-------------------------+-------------------------+-------------------------+
     51| 34                      | member:name             | aso                     |
     52+-------------------------+-------------------------+-------------------------+
     53| 34                      | member:years            | 1981                    |
     54+-------------------------+-------------------------+-------------------------+
     55| 50                      | member:locate           | taiwan                  |
     56+-------------------------+-------------------------+-------------------------+
     57| 50                      | member:name             | jazz                    |
     58+-------------------------+-------------------------+-------------------------+
     59| 50                      | member:years            | 1982                    |
     60+-------------------------+-------------------------+-------------------------+
     614 row(s) in set. (0.31 sec)
     62
     63 */
     64}}}
     65
     66 = 程式碼 =
     67{{{
     68package tw.org.nchc.code;
     69
     70import java.io.BufferedReader;
     71import java.io.BufferedWriter;
     72import java.io.File;
     73import java.io.FileReader;
     74import java.io.FileWriter;
     75import java.io.IOException;
     76import java.util.Iterator;
     77
     78import org.apache.hadoop.fs.FileSystem;
     79import org.apache.hadoop.fs.Path;
     80import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
     81import org.apache.hadoop.hbase.mapred.TableReduce;
     82import org.apache.hadoop.io.LongWritable;
     83import org.apache.hadoop.io.MapWritable;
     84import org.apache.hadoop.io.Text;
     85import org.apache.hadoop.mapred.JobClient;
     86import org.apache.hadoop.mapred.JobConf;
     87import org.apache.hadoop.mapred.OutputCollector;
     88import org.apache.hadoop.mapred.Reporter;
     89import org.apache.hadoop.mapred.lib.IdentityMapper;
     90import org.apache.hadoop.mapred.lib.IdentityReducer;
     91
     92public class HBaseRecordPro {
     93        /* Major parameter */
     94        // it indicates local path, not hadoop file system path
     95        final static String source_file = "/home/waue/test.txt";
     96
     97        /* Minor parameter */
     98        // column family name
     99        final static String column_family = "member:";
     100
     101        // table name
     102        final static String table_name = "HBaseRecord";
     103
     104        // separate char
     105        final static String sp = ":";
     106       
     107        // conf tmp with column qualify
     108        final static String conf_tmp = "/tmp/HBaseRecordPro.firstLine.tmp";
     109       
     110        // data source tmp
     111        final static String text_tmp = "/tmp/HBaseRecord.text.tmp";
     112
     113        // on this sample, map is nonuse, we use reduce to handle
     114        private static class ReduceClass extends TableReduce<LongWritable, Text> {
     115                public void reduce(LongWritable key, Iterator<Text> values,
     116                                OutputCollector<Text, MapWritable> output, Reporter reporter)
     117                                throws IOException {
     118
     119                        // read the configure file
     120                        BufferedReader fconf = new BufferedReader(new FileReader(new File(
     121                                        conf_tmp)));
     122                        String first_line = fconf.readLine();
     123                        fconf.close();
     124                        // extract cf data
     125                        String[] cf = first_line.split(sp);
     126                        int length = cf.length;
     127                         
     128                        // values.next().getByte() can get value and transfer to byte form,
     129                        String stro = new String(values.next().getBytes());
     130                        String str[] = stro.split(sp);
     131
     132                        // Column id is created dymanically,
     133                        Text[] col_n = new Text[length];
     134                        byte[][] b_l = new byte[length][];
     135                        // contents must be ImmutableBytesWritable
     136                        ImmutableBytesWritable[] w_l = new ImmutableBytesWritable[length];
     137
     138                        // This map connect to hbase table and holds the columns per row
     139                        MapWritable map = new MapWritable();
     140                        map.clear();
     141
     142                        // prepare to write data into map
     143                        for (int i = 0; i < length; i++) {
     144                                col_n[i] = new Text(column_family + cf[i]);
     145                                b_l[i] = str[i].getBytes();
     146                                w_l[i] = new ImmutableBytesWritable(b_l[i]);
     147                                // populate the current row
     148                                map.put(col_n[i], w_l[i]);
     149                        }
     150                        // add the row with the key as the row id
     151                        output.collect(new Text(key.toString()), map);
     152                }
     153        }
     154
     155        public HBaseRecordPro() {
     156        }
     157       
     158        // This function can split the source text into two file, \
     159        //      conf_tmp file recorded first line is used to set column qualify \
     160        //      text_tmp , ou, recorded data is used to store into table.
     161        public String parseFirstLine(String in, String ou)
     162                        throws IOException {
     163
     164                BufferedReader fi = new BufferedReader(new FileReader(new File(in)));
     165                BufferedWriter ff = new BufferedWriter(new FileWriter(new File(conf_tmp)));
     166                BufferedWriter fw = new BufferedWriter(new FileWriter(new File(ou)));
     167                String first_line, data;
     168                first_line = fi.readLine();
     169                ff.write(first_line);
     170                ff.flush();
     171                do {
     172                        data = fi.readLine();
     173                        if (data == null) {
     174                                break;
     175                        } else {
     176                                fw.write(data + "\n");
     177                                fw.flush();
     178                        }
     179                } while (true);
     180                fw.close();
     181                ff.close();
     182                return first_line;
     183        }
     184        // tmp file delete
     185        boolean deleteFile(String str)throws IOException{
     186                File df = new File(str);
     187               
     188                if(df.exists()){
     189                        if(!df.delete()){
     190                                System.err.print("delete file error !");
     191                        }
     192                }else{
     193                        System.out.println("file not exit!");
     194                }
     195                return true;
     196        }
     197        /**
     198         * Runs the demo.
     199         */
     200        public static void main(String[] args) throws IOException {
     201
     202                HBaseRecordPro setup = new HBaseRecordPro();
     203                String[] col_family = {column_family};
     204                Path text_path = new Path(text_tmp);
     205               
     206                setup.parseFirstLine(source_file, text_tmp);
     207//              System.out.println(first_line);
     208
     209                BuildHTable build_table = new BuildHTable(table_name,
     210                                col_family);
     211                if (!build_table.checkTableExist(table_name)) {
     212                        if (!build_table.createTable()) {
     213                                System.out.println("create table error !");
     214                        }
     215                } else {
     216                        System.out.println("Table \"" + table_name
     217                                        + "\" has already existed !");
     218                }
     219                JobConf conf = new JobConf(HBaseRecordPro.class);
     220                FileSystem fileconf = FileSystem.get(conf);
     221                fileconf.copyFromLocalFile(true, text_path, text_path);
     222                // Job name; you can modify to any you like
     223                conf.setJobName("PersonDataBase");
     224                final int mapTasks = 1;
     225                final int reduceTasks = 1;
     226                // Hbase table name must be correct , in our profile is t1_table
     227                TableReduce.initJob(table_name, ReduceClass.class, conf);
     228
     229                // below are map-reduce profile
     230                conf.setNumMapTasks(mapTasks);
     231                conf.setNumReduceTasks(reduceTasks);
     232                conf.setInputPath(text_path);
     233                conf.setMapperClass(IdentityMapper.class);
     234                conf.setCombinerClass(IdentityReducer.class);
     235                conf.setReducerClass(ReduceClass.class);
     236
     237                JobClient.runJob(conf);
     238               
     239                // delete tmp file
     240                FileSystem.get(conf).delete(text_path);
     241                setup.deleteFile(conf_tmp);
     242        }
     243}
     244}}}