|   | 1 |  = 程式宣告 =  | 
                  
                          |   | 2 |  * Program: HBaseRecordPro.java | 
                  
                          |   | 3 |  * Editor: Waue Chen  | 
                  
                          |   | 4 |  * From :  NCHC. Taiwn | 
                  
                          |   | 5 |  * Last Update Date: 07/01/2008 | 
                  
                          |   | 6 | ----------------- | 
                  
                          |   | 7 |  = 程式功能 =  | 
                  
                          |   | 8 |  * Program would parse your record and create Hbase | 
                  
                          |   | 9 |  * Then it sets the first line as column qualify  | 
                  
                          |   | 10 |  * Finally it stores in HBase automatically. | 
                  
                          |   | 11 |   | 
                  
                          |   | 12 |  = 如何使用  =  | 
                  
                          |   | 13 |  *      Make sure two thing : | 
                  
                          |   | 14 |  *      1. source_file must be regular as follow: | 
                  
                          |   | 15 |  *              first line: qualify1:qualify2:...:qualifyN | 
                  
                          |   | 16 |  *              other line: records1:records2:...:recordsN | 
                  
                          |   | 17 |  *     2. source_file path must be correct. | 
                  
                          |   | 18 |  | 
                  
                          |   | 19 |  = 原始檔內容 =  | 
                  
                          |   | 20 | {{{ | 
                  
                          |   | 21 | name:locate:years | 
                  
                          |   | 22 | waue:taiwan:1981 | 
                  
                          |   | 23 | rock:taiwan:1981 | 
                  
                          |   | 24 | aso:taiwan:1981 | 
                  
                          |   | 25 | jazz:taiwan:1982 | 
                  
                          |   | 26 | }}} | 
                  
                          |   | 27 |  | 
                  
                          |   | 28 |  = 結果 =  | 
                  
                          |   | 29 | {{{ | 
                  
                          |   | 30 |  *      Go to hbase console, type :  | 
                  
                          |   | 31 |  *              hql > select * from t1_table;  | 
                  
                          |   | 32 |  08/06/06 12:20:48 INFO hbase.HTable: Creating scanner over t1_table starting at key  | 
                  
                          |   | 33 |  | 
                  
                          |   | 34 | +-------------------------+-------------------------+-------------------------+ | 
                  
                          |   | 35 | | Row                     | Column                  | Cell                    | | 
                  
                          |   | 36 | +-------------------------+-------------------------+-------------------------+ | 
                  
                          |   | 37 | | 0                       | member:locate           | taiwan                  | | 
                  
                          |   | 38 | +-------------------------+-------------------------+-------------------------+ | 
                  
                          |   | 39 | | 0                       | member:name             | waue                    | | 
                  
                          |   | 40 | +-------------------------+-------------------------+-------------------------+ | 
                  
                          |   | 41 | | 0                       | member:years            | 1981                    | | 
                  
                          |   | 42 | +-------------------------+-------------------------+-------------------------+ | 
                  
                          |   | 43 | | 17                      | member:locate           | taiwan                  | | 
                  
                          |   | 44 | +-------------------------+-------------------------+-------------------------+ | 
                  
                          |   | 45 | | 17                      | member:name             | rock                    | | 
                  
                          |   | 46 | +-------------------------+-------------------------+-------------------------+ | 
                  
                          |   | 47 | | 17                      | member:years            | 1981                    | | 
                  
                          |   | 48 | +-------------------------+-------------------------+-------------------------+ | 
                  
                          |   | 49 | | 34                      | member:locate           | taiwan                  | | 
                  
                          |   | 50 | +-------------------------+-------------------------+-------------------------+ | 
                  
                          |   | 51 | | 34                      | member:name             | aso                     | | 
                  
                          |   | 52 | +-------------------------+-------------------------+-------------------------+ | 
                  
                          |   | 53 | | 34                      | member:years            | 1981                    | | 
                  
                          |   | 54 | +-------------------------+-------------------------+-------------------------+ | 
                  
                          |   | 55 | | 50                      | member:locate           | taiwan                  | | 
                  
                          |   | 56 | +-------------------------+-------------------------+-------------------------+ | 
                  
                          |   | 57 | | 50                      | member:name             | jazz                    | | 
                  
                          |   | 58 | +-------------------------+-------------------------+-------------------------+ | 
                  
                          |   | 59 | | 50                      | member:years            | 1982                    | | 
                  
                          |   | 60 | +-------------------------+-------------------------+-------------------------+ | 
                  
                          |   | 61 | 4 row(s) in set. (0.31 sec) | 
                  
                          |   | 62 |  | 
                  
                          |   | 63 |  */ | 
                  
                          |   | 64 | }}} | 
                  
                          |   | 65 |  | 
                  
                          |   | 66 |  = 程式碼 =  | 
                  
                          |   | 67 | {{{ | 
                  
                          |   | 68 | package tw.org.nchc.code; | 
                  
                          |   | 69 |  | 
                  
                          |   | 70 | import java.io.BufferedReader; | 
                  
                          |   | 71 | import java.io.BufferedWriter; | 
                  
                          |   | 72 | import java.io.File; | 
                  
                          |   | 73 | import java.io.FileReader; | 
                  
                          |   | 74 | import java.io.FileWriter; | 
                  
                          |   | 75 | import java.io.IOException; | 
                  
                          |   | 76 | import java.util.Iterator; | 
                  
                          |   | 77 |  | 
                  
                          |   | 78 | import org.apache.hadoop.fs.FileSystem; | 
                  
                          |   | 79 | import org.apache.hadoop.fs.Path; | 
                  
                          |   | 80 | import org.apache.hadoop.hbase.io.ImmutableBytesWritable; | 
                  
                          |   | 81 | import org.apache.hadoop.hbase.mapred.TableReduce; | 
                  
                          |   | 82 | import org.apache.hadoop.io.LongWritable; | 
                  
                          |   | 83 | import org.apache.hadoop.io.MapWritable; | 
                  
                          |   | 84 | import org.apache.hadoop.io.Text; | 
                  
                          |   | 85 | import org.apache.hadoop.mapred.JobClient; | 
                  
                          |   | 86 | import org.apache.hadoop.mapred.JobConf; | 
                  
                          |   | 87 | import org.apache.hadoop.mapred.OutputCollector; | 
                  
                          |   | 88 | import org.apache.hadoop.mapred.Reporter; | 
                  
                          |   | 89 | import org.apache.hadoop.mapred.lib.IdentityMapper; | 
                  
                          |   | 90 | import org.apache.hadoop.mapred.lib.IdentityReducer; | 
                  
                          |   | 91 |  | 
                  
                          |   | 92 | public class HBaseRecordPro { | 
                  
                          |   | 93 |         /* Major parameter */ | 
                  
                          |   | 94 |         // it indicates local path, not hadoop file system path | 
                  
                          |   | 95 |         final static String source_file = "/home/waue/test.txt"; | 
                  
                          |   | 96 |  | 
                  
                          |   | 97 |         /* Minor parameter */ | 
                  
                          |   | 98 |         // column family name | 
                  
                          |   | 99 |         final static String column_family = "member:"; | 
                  
                          |   | 100 |  | 
                  
                          |   | 101 |         // table name | 
                  
                          |   | 102 |         final static String table_name = "HBaseRecord"; | 
                  
                          |   | 103 |  | 
                  
                          |   | 104 |         // separate char | 
                  
                          |   | 105 |         final static String sp = ":"; | 
                  
                          |   | 106 |          | 
                  
                          |   | 107 |         // conf tmp with column qualify | 
                  
                          |   | 108 |         final static String conf_tmp = "/tmp/HBaseRecordPro.firstLine.tmp"; | 
                  
                          |   | 109 |          | 
                  
                          |   | 110 |         // data source tmp | 
                  
                          |   | 111 |         final static String text_tmp = "/tmp/HBaseRecord.text.tmp"; | 
                  
                          |   | 112 |  | 
                  
                          |   | 113 |         // on this sample, map is nonuse, we use reduce to handle | 
                  
                          |   | 114 |         private static class ReduceClass extends TableReduce<LongWritable, Text> { | 
                  
                          |   | 115 |                 public void reduce(LongWritable key, Iterator<Text> values, | 
                  
                          |   | 116 |                                 OutputCollector<Text, MapWritable> output, Reporter reporter) | 
                  
                          |   | 117 |                                 throws IOException { | 
                  
                          |   | 118 |  | 
                  
                          |   | 119 |                         // read the configure file | 
                  
                          |   | 120 |                         BufferedReader fconf = new BufferedReader(new FileReader(new File( | 
                  
                          |   | 121 |                                         conf_tmp))); | 
                  
                          |   | 122 |                         String first_line = fconf.readLine(); | 
                  
                          |   | 123 |                         fconf.close(); | 
                  
                          |   | 124 |                         // extract cf data | 
                  
                          |   | 125 |                         String[] cf = first_line.split(sp); | 
                  
                          |   | 126 |                         int length = cf.length; | 
                  
                          |   | 127 |                           | 
                  
                          |   | 128 |                         // values.next().getByte() can get value and transfer to byte form, | 
                  
                          |   | 129 |                         String stro = new String(values.next().getBytes()); | 
                  
                          |   | 130 |                         String str[] = stro.split(sp); | 
                  
                          |   | 131 |  | 
                  
                          |   | 132 |                         // Column id is created dymanically, | 
                  
                          |   | 133 |                         Text[] col_n = new Text[length]; | 
                  
                          |   | 134 |                         byte[][] b_l = new byte[length][]; | 
                  
                          |   | 135 |                         // contents must be ImmutableBytesWritable | 
                  
                          |   | 136 |                         ImmutableBytesWritable[] w_l = new ImmutableBytesWritable[length]; | 
                  
                          |   | 137 |  | 
                  
                          |   | 138 |                         // This map connect to hbase table and holds the columns per row | 
                  
                          |   | 139 |                         MapWritable map = new MapWritable(); | 
                  
                          |   | 140 |                         map.clear(); | 
                  
                          |   | 141 |  | 
                  
                          |   | 142 |                         // prepare to write data into map | 
                  
                          |   | 143 |                         for (int i = 0; i < length; i++) { | 
                  
                          |   | 144 |                                 col_n[i] = new Text(column_family + cf[i]); | 
                  
                          |   | 145 |                                 b_l[i] = str[i].getBytes(); | 
                  
                          |   | 146 |                                 w_l[i] = new ImmutableBytesWritable(b_l[i]); | 
                  
                          |   | 147 |                                 // populate the current row | 
                  
                          |   | 148 |                                 map.put(col_n[i], w_l[i]); | 
                  
                          |   | 149 |                         } | 
                  
                          |   | 150 |                         // add the row with the key as the row id | 
                  
                          |   | 151 |                         output.collect(new Text(key.toString()), map); | 
                  
                          |   | 152 |                 } | 
                  
                          |   | 153 |         } | 
                  
                          |   | 154 |  | 
                  
                          |   | 155 |         public HBaseRecordPro() { | 
                  
                          |   | 156 |         } | 
                  
                          |   | 157 |          | 
                  
                          |   | 158 |         // This function can split the source text into two file, \ | 
                  
                          |   | 159 |         //      conf_tmp file recorded first line is used to set column qualify \ | 
                  
                          |   | 160 |         //      text_tmp , ou, recorded data is used to store into table. | 
                  
                          |   | 161 |         public String parseFirstLine(String in, String ou) | 
                  
                          |   | 162 |                         throws IOException { | 
                  
                          |   | 163 |  | 
                  
                          |   | 164 |                 BufferedReader fi = new BufferedReader(new FileReader(new File(in))); | 
                  
                          |   | 165 |                 BufferedWriter ff = new BufferedWriter(new FileWriter(new File(conf_tmp))); | 
                  
                          |   | 166 |                 BufferedWriter fw = new BufferedWriter(new FileWriter(new File(ou))); | 
                  
                          |   | 167 |                 String first_line, data; | 
                  
                          |   | 168 |                 first_line = fi.readLine(); | 
                  
                          |   | 169 |                 ff.write(first_line); | 
                  
                          |   | 170 |                 ff.flush(); | 
                  
                          |   | 171 |                 do { | 
                  
                          |   | 172 |                         data = fi.readLine(); | 
                  
                          |   | 173 |                         if (data == null) { | 
                  
                          |   | 174 |                                 break; | 
                  
                          |   | 175 |                         } else { | 
                  
                          |   | 176 |                                 fw.write(data + "\n"); | 
                  
                          |   | 177 |                                 fw.flush(); | 
                  
                          |   | 178 |                         } | 
                  
                          |   | 179 |                 } while (true); | 
                  
                          |   | 180 |                 fw.close(); | 
                  
                          |   | 181 |                 ff.close(); | 
                  
                          |   | 182 |                 return first_line; | 
                  
                          |   | 183 |         } | 
                  
                          |   | 184 |         // tmp file delete | 
                  
                          |   | 185 |         boolean deleteFile(String str)throws IOException{ | 
                  
                          |   | 186 |                 File df = new File(str); | 
                  
                          |   | 187 |                  | 
                  
                          |   | 188 |                 if(df.exists()){ | 
                  
                          |   | 189 |                         if(!df.delete()){ | 
                  
                          |   | 190 |                                 System.err.print("delete file error !"); | 
                  
                          |   | 191 |                         } | 
                  
                          |   | 192 |                 }else{ | 
                  
                          |   | 193 |                         System.out.println("file not exit!"); | 
                  
                          |   | 194 |                 } | 
                  
                          |   | 195 |                 return true; | 
                  
                          |   | 196 |         } | 
                  
                          |   | 197 |         /** | 
                  
                          |   | 198 |          * Runs the demo. | 
                  
                          |   | 199 |          */ | 
                  
                          |   | 200 |         public static void main(String[] args) throws IOException { | 
                  
                          |   | 201 |  | 
                  
                          |   | 202 |                 HBaseRecordPro setup = new HBaseRecordPro(); | 
                  
                          |   | 203 |                 String[] col_family = {column_family}; | 
                  
                          |   | 204 |                 Path text_path = new Path(text_tmp); | 
                  
                          |   | 205 |                  | 
                  
                          |   | 206 |                 setup.parseFirstLine(source_file, text_tmp); | 
                  
                          |   | 207 | //              System.out.println(first_line); | 
                  
                          |   | 208 |  | 
                  
                          |   | 209 |                 BuildHTable build_table = new BuildHTable(table_name, | 
                  
                          |   | 210 |                                 col_family); | 
                  
                          |   | 211 |                 if (!build_table.checkTableExist(table_name)) { | 
                  
                          |   | 212 |                         if (!build_table.createTable()) { | 
                  
                          |   | 213 |                                 System.out.println("create table error !"); | 
                  
                          |   | 214 |                         } | 
                  
                          |   | 215 |                 } else { | 
                  
                          |   | 216 |                         System.out.println("Table \"" + table_name | 
                  
                          |   | 217 |                                         + "\" has already existed !"); | 
                  
                          |   | 218 |                 } | 
                  
                          |   | 219 |                 JobConf conf = new JobConf(HBaseRecordPro.class); | 
                  
                          |   | 220 |                 FileSystem fileconf = FileSystem.get(conf); | 
                  
                          |   | 221 |                 fileconf.copyFromLocalFile(true, text_path, text_path); | 
                  
                          |   | 222 |                 // Job name; you can modify to any you like | 
                  
                          |   | 223 |                 conf.setJobName("PersonDataBase"); | 
                  
                          |   | 224 |                 final int mapTasks = 1; | 
                  
                          |   | 225 |                 final int reduceTasks = 1; | 
                  
                          |   | 226 |                 // Hbase table name must be correct , in our profile is t1_table | 
                  
                          |   | 227 |                 TableReduce.initJob(table_name, ReduceClass.class, conf); | 
                  
                          |   | 228 |  | 
                  
                          |   | 229 |                 // below are map-reduce profile | 
                  
                          |   | 230 |                 conf.setNumMapTasks(mapTasks); | 
                  
                          |   | 231 |                 conf.setNumReduceTasks(reduceTasks); | 
                  
                          |   | 232 |                 conf.setInputPath(text_path); | 
                  
                          |   | 233 |                 conf.setMapperClass(IdentityMapper.class); | 
                  
                          |   | 234 |                 conf.setCombinerClass(IdentityReducer.class); | 
                  
                          |   | 235 |                 conf.setReducerClass(ReduceClass.class); | 
                  
                          |   | 236 |  | 
                  
                          |   | 237 |                 JobClient.runJob(conf); | 
                  
                          |   | 238 |                  | 
                  
                          |   | 239 |                 // delete tmp file | 
                  
                          |   | 240 |                 FileSystem.get(conf).delete(text_path); | 
                  
                          |   | 241 |                 setup.deleteFile(conf_tmp); | 
                  
                          |   | 242 |         } | 
                  
                          |   | 243 | } | 
                  
                          |   | 244 | }}} |