/**
 * Program: HBaseRecord.java
 * Editor: Waue Chen 
 * From :  NCHC. Taiwn
 * Last Update Date: 06/01/2008
 */

/**
 * Purpose : 
 * 	Parse your record and then store in HBase.
 * 
 * HowToUse : 
 * 	Make sure Hadoop file system and Hbase are running correctly.
 * 	1. put test.txt in t1 directory which content is 
	---------------
	name:locate:years 
	waue:taiwan:1981
	shellon:taiwan:1981
	---------------
 * 	2. hadoop_root/$ bin/hadoop dfs -put t1 t1
	  ----------------
 * Check Result:
 * 	Go to hbase console, type : 
 * 		hql > select * from t1_table; 
08/06/06 12:20:48 INFO hbase.HTable: Creating scanner over t1_table starting at key 
+-------------------------+-------------------------+-------------------------+
| Row                     | Column                  | Cell                    |
+-------------------------+-------------------------+-------------------------+
| 0                       | person:locate           | locate                  |
+-------------------------+-------------------------+-------------------------+
| 0                       | person:name             | name                    |
+-------------------------+-------------------------+-------------------------+
| 0                       | person:years            | years                   |
+-------------------------+-------------------------+-------------------------+
| 19                      | person:locate           | taiwan                  |
+-------------------------+-------------------------+-------------------------+
| 19                      | person:name             | waue                    |
+-------------------------+-------------------------+-------------------------+
| 19                      | person:years            | 1981                    |
+-------------------------+-------------------------+-------------------------+
| 36                      | person:locate           | taiwan                  |
+-------------------------+-------------------------+-------------------------+
| 36                      | person:name             | shellon                 |
+-------------------------+-------------------------+-------------------------+
| 36                      | person:years            | 1981                    |
+-------------------------+-------------------------+-------------------------+
3 row(s) in set. (0.04 sec)
 */




package tw.org.nchc.code;

import java.io.FileInputStream;
import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapred.TableReduce;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;


public class HBaseRecord2 {

	/* Denify parameter */
	static String[] bf = {"person:name","person:local","person:birthyear"};
	// file path in hadoop file system (not phisical file system)
	String file_path = "/user/waue/t1/test.txt";
	// Hbase table name
	String table_name = "testtable";
	
	
	// setup MapTask and Reduce Task
	int mapTasks = 1;
	int reduceTasks = 1;
	
	private static class ReduceClass extends TableReduce<LongWritable, Text> {


		
		// on this sample, map is nonuse, we use reduce to handle
		public void reduce(LongWritable key, Iterator<Text> values,
				OutputCollector<Text, MapWritable> output, Reporter reporter)
				throws IOException {
			// this map holds the columns per row
			MapWritable map = new MapWritable();	
			// values.next().getByte() can get value and transfer to byte form, 
			String stro = new String(values.next().getBytes());
			String str[] = stro.split(":");
			
			int length = bf.length;
			
			// Column id is created dymanically, 
			Text[] col_n = new Text[length];
			byte[][] b_l = new byte[length][];
			// contents must be ImmutableBytesWritable
			ImmutableBytesWritable[] w_l = new ImmutableBytesWritable[length];
			map.clear();
			for(int i = 0; i < length; i++){
				col_n[i] = new Text(bf[i]);
				b_l[i] = str[i].getBytes();
				w_l[i] = new ImmutableBytesWritable(b_l[i]);
				// populate the current row
				map.put(col_n[i], w_l[i]);
			}
			// add the row with the key as the row id
			output.collect(new Text(key.toString()), map);
		}
	}

	private HBaseRecord2() {
	}

	/**
	 * Runs the demo.
	 */
	public static void main(String[] args) throws IOException {

		
		HBaseRecord2 setup = new HBaseRecord2();
		String[] tmp = bf[0].split(":");
		String[] CF = {tmp[0]};
		BuildHTable build_table = new BuildHTable(setup.table_name, CF);
		if (!build_table.checkTableExist(setup.table_name)) {
			if (!build_table.createTable()) {
				System.out.println("create table error !");
			}
		} else {
			System.out.println("Table \"" + setup.table_name
					+ "\" has already existed !");
		}
		FileInputStream fi = new FileInputStream(setup.file_path);
		
		
		JobConf conf = new JobConf(HBaseRecord2.class);

		//Job name; you can modify to any you like  
		conf.setJobName("PersonDataBase");

		// Hbase table name must be correct , in our profile is t1_table
		TableReduce.initJob(setup.table_name, ReduceClass.class, conf);
		
		// below are map-reduce profile
		conf.setNumMapTasks(setup.mapTasks);
		conf.setNumReduceTasks(setup.reduceTasks);
		conf.setInputPath(new Path(setup.file_path));
		conf.setMapperClass(IdentityMapper.class);
		conf.setCombinerClass(IdentityReducer.class);
		conf.setReducerClass(ReduceClass.class);
		JobClient.runJob(conf);
	}
}
