Index: /sample/HBaseRecord2.java
===================================================================
--- /sample/HBaseRecord2.java	(revision 12)
+++ /sample/HBaseRecord2.java	(revision 13)
@@ -3,11 +3,10 @@
  * Editor: Waue Chen 
  * From :  NCHC. Taiwn
- * Last Update Date: 06/13/2008
+ * Last Update Date: 06/01/2008
  */
 
 /**
  * Purpose : 
- * 	1.Auto generate HTable 
- *  2.Parse your record and then store in HBase.
+ * 	Parse your record and then store in HBase.
  * 
  * HowToUse : 
@@ -20,12 +19,4 @@
 	---------------
  * 	2. hadoop_root/$ bin/hadoop dfs -put t1 t1
- * 	3. hbase_root/$ bin/hbase shell
- * 	4. hql > create table t1_table("person");
- * 	5. Come to Eclipse and run this code, and we will let database as that 
- 	t1_table -> person
-	  ----------------
-	  |  name | locate | years |
-	  | waue  | taiwan | 1981 |
-	  | shellon | taiwan | 1981 |
 	  ----------------
  * Check Result:
@@ -82,33 +73,18 @@
 
 	/* Denify parameter */
-	// one column family: person; three column qualifier: name,locate,years
-	final String colstr;
+	static String[] bf = {"person:name","person:local","person:birthyear"};
+	// file path in hadoop file system (not phisical file system)
+	String file_path = "/user/waue/t1/test.txt";
 	// Hbase table name
-	static String[] col;
-	String Table_Name = "Record1";
-	//split character
-	static String sp = ":";
-	// file path in hadoop file system (not phisical file system)
-	String file_path = "/user/waue/t1";
-
-
-
-	public HBaseRecord2(){
-		colstr ="person:name,locate,years";
-	}
-	public HBaseRecord2(String str){
-		colstr = str; 
-	}
-
+	String table_name = "testtable";
+	
+	
+	// setup MapTask and Reduce Task
+	int mapTasks = 1;
+	int reduceTasks = 1;
 	
 	private static class ReduceClass extends TableReduce<LongWritable, Text> {
 
-		// Column id is created dymanically, 
-		private static final Text col_name = new Text(baseId1);
-		private static final Text col_local = new Text(baseId2);
-		private static final Text col_year = new Text(baseId3);
-		
-		// this map holds the columns per row
-		private MapWritable map = new MapWritable();	
+
 		
 		// on this sample, map is nonuse, we use reduce to handle
@@ -116,27 +92,31 @@
 				OutputCollector<Text, MapWritable> output, Reporter reporter)
 				throws IOException {
-
-			// values.next().getByte() can get value and transfer to byte form, there is an other way that let decode()
-			// to substitude getByte() 
+			// this map holds the columns per row
+			MapWritable map = new MapWritable();	
+			// values.next().getByte() can get value and transfer to byte form, 
 			String stro = new String(values.next().getBytes());
-			String str[] = stro.split(sp);
-			byte b_local[] = str[0].getBytes();
-			byte b_name[] = str[1].getBytes();
-			byte b_year[] = str[2].getBytes();
+			String str[] = stro.split(":");
 			
+			int length = bf.length;
+			
+			// Column id is created dymanically, 
+			Text[] col_n = new Text[length];
+			byte[][] b_l = new byte[length][];
 			// contents must be ImmutableBytesWritable
-			ImmutableBytesWritable w_local = new ImmutableBytesWritable( b_local);
-			ImmutableBytesWritable w_name = new ImmutableBytesWritable( b_name );
-			ImmutableBytesWritable w_year = new ImmutableBytesWritable( b_year );
-
-			// populate the current row
+			ImmutableBytesWritable[] w_l = new ImmutableBytesWritable[length];
 			map.clear();
-			map.put(col_name, w_local);
-			map.put(col_local, w_name);
-			map.put(col_year, w_year);
-
+			for(int i = 0; i < length; i++){
+				col_n[i] = new Text(bf[i]);
+				b_l[i] = str[i].getBytes();
+				w_l[i] = new ImmutableBytesWritable(b_l[i]);
+				// populate the current row
+				map.put(col_n[i], w_l[i]);
+			}
 			// add the row with the key as the row id
 			output.collect(new Text(key.toString()), map);
 		}
+	}
+
+	private HBaseRecord2() {
 	}
 
@@ -145,36 +125,31 @@
 	 */
 	public static void main(String[] args) throws IOException {
-		// parse colstr to split column family and column qualify
-		HBaseRecord2 work = new HBaseRecord2();
+
 		
-		String tmp[] = work.colstr.split(":");
-		String Column_Family = tmp[0]+":";
-		String CF[] = {Column_Family};
-		String CQ[] = tmp[2].split(",");
-		// check whether create table or not , we don't admit \ 
-		// the same name but different structure
-		
-		BuildHTable build_table = new BuildHTable(work.Table_Name,CF);
-		if (!build_table.checkTableExist(work.Table_Name)) {
+		HBaseRecord2 setup = new HBaseRecord2();
+		String[] tmp = bf[0].split(":");
+		String[] CF = {tmp[0]};
+		BuildHTable build_table = new BuildHTable(setup.table_name, CF);
+		if (!build_table.checkTableExist(setup.table_name)) {
 			if (!build_table.createTable()) {
 				System.out.println("create table error !");
 			}
-		}else{
-			System.out.println("Table \"" + work.Table_Name +"\" has already existed !");
-		}		
+		} else {
+			System.out.println("Table \"" + setup.table_name
+					+ "\" has already existed !");
+		}
+		
+		JobConf conf = new JobConf(HBaseRecord2.class);
 
-		JobConf conf = new JobConf(HBaseRecord2.class);
-		int mapTasks = 1;
-		int reduceTasks = 1;
 		//Job name; you can modify to any you like  
-		conf.setJobName("NCHC_PersonDataBase");
+		conf.setJobName("PersonDataBase");
 
 		// Hbase table name must be correct , in our profile is t1_table
-		TableReduce.initJob(work.Table_Name, ReduceClass.class, conf);
+		TableReduce.initJob(setup.table_name, ReduceClass.class, conf);
 		
 		// below are map-reduce profile
-		conf.setNumMapTasks(mapTasks);
-		conf.setNumReduceTasks(reduceTasks);
-		conf.setInputPath(new Path(work.file_path));
+		conf.setNumMapTasks(setup.mapTasks);
+		conf.setNumReduceTasks(setup.reduceTasks);
+		conf.setInputPath(new Path(setup.file_path));
 		conf.setMapperClass(IdentityMapper.class);
 		conf.setCombinerClass(IdentityReducer.class);
