/*
 *  NCHC Hbase with map reduce sample code 
 *  DemoHBaseSlink.java
 */

package tw.org.nchc.code;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapred.TableReduce;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;

/**
 * This sample code will put the indicate data to Hbase.
 * 1. put test.txt in t1 directory which content is 
---------------
name:locate:years 
waue:taiwan:1981
shellon:taiwan:1981
---------------
 * 2. hadoop_root/$ bin/hadoop dfs -put t1 t1
 * 3. hbase_root/$ bin/hbase shell
 * 4. hql > create table t1_table("person");
 * 5. Come to Eclipse and run this code, and we will let database as that 
 t1_table -> person
  ----------------
  |  name | locate | years |
  ----------------
  | waue  | taiwan | 1981 |
  ----------------
  | shellon | taiwan | 1981 |
  * 6. Go to hbase console, type : hql > select * from t1_table;
     
08/06/06 12:20:48 INFO hbase.HTable: Creating scanner over t1_table starting at key 
+-------------------------+-------------------------+-------------------------+
| Row                     | Column                  | Cell                    |
+-------------------------+-------------------------+-------------------------+
| 0                       | person:locate           | locate                  |
+-------------------------+-------------------------+-------------------------+
| 0                       | person:name             | name                    |
+-------------------------+-------------------------+-------------------------+
| 0                       | person:years            | years                   |
+-------------------------+-------------------------+-------------------------+
| 19                      | person:locate           | taiwan                  |
+-------------------------+-------------------------+-------------------------+
| 19                      | person:name             | waue                    |
+-------------------------+-------------------------+-------------------------+
| 19                      | person:years            | 1981                    |
+-------------------------+-------------------------+-------------------------+
| 36                      | person:locate           | taiwan                  |
+-------------------------+-------------------------+-------------------------+
| 36                      | person:name             | shellon                 |
+-------------------------+-------------------------+-------------------------+
| 36                      | person:years            | 1981                    |
+-------------------------+-------------------------+-------------------------+
3 row(s) in set. (0.04 sec)
 **/
public class HBaseRecord {

	/* Denify parameter */
	// one column family: person; three column qualifier: name,locate,years
	static private String  baseId1 = "person:name";
	static private String  baseId2 ="person:locate";
	static private String  baseId3 ="person:years";
	//split character
	static private String sp = ":";
	// file path in hadoop file system (not phisical file system)
	String file_path = "/user/waue/t1";
	// Hbase table name
	String table_name = "t1_table";
	// setup MapTask and Reduce Task
	int mapTasks = 1;
	int reduceTasks = 1;
	
	private static class ReduceClass extends TableReduce<LongWritable, Text> {

		// Column id is created dymanically, 
		private static final Text col_name = new Text(baseId1);
		private static final Text col_local = new Text(baseId2);
		private static final Text col_year = new Text(baseId3);
		
		// this map holds the columns per row
		private MapWritable map = new MapWritable();	
		
		// on this sample, map is nonuse, we use reduce to handle
		public void reduce(LongWritable key, Iterator<Text> values,
				OutputCollector<Text, MapWritable> output, Reporter reporter)
				throws IOException {

			// values.next().getByte() can get value and transfer to byte form, there is an other way that let decode()
			// to substitude getByte() 
			String stro = new String(values.next().getBytes());
			String str[] = stro.split(sp);
			byte b_local[] = str[0].getBytes();
			byte b_name[] = str[1].getBytes();
			byte b_year[] = str[2].getBytes();
			
			// contents must be ImmutableBytesWritable
			ImmutableBytesWritable w_local = new ImmutableBytesWritable( b_local);
			ImmutableBytesWritable w_name = new ImmutableBytesWritable( b_name );
			ImmutableBytesWritable w_year = new ImmutableBytesWritable( b_year );

			// populate the current row
			map.clear();
			map.put(col_name, w_local);
			map.put(col_local, w_name);
			map.put(col_year, w_year);

			// add the row with the key as the row id
			output.collect(new Text(key.toString()), map);
		}
	}

	private HBaseRecord() {
	}

	/**
	 * Runs the demo.
	 */
	public static void main(String[] args) throws IOException {
		// which path of input files in Hadoop file system 	

		
		HBaseRecord setup = new HBaseRecord();
		JobConf conf = new JobConf(HBaseRecord.class);

		//Job name; you can modify to any you like  
		conf.setJobName("NCHC_PersonDataBase");

		// Hbase table name must be correct , in our profile is t1_table
		TableReduce.initJob(setup.table_name, ReduceClass.class, conf);
		
		// below are map-reduce profile
		conf.setNumMapTasks(setup.mapTasks);
		conf.setNumReduceTasks(setup.reduceTasks);
		conf.setInputPath(new Path(setup.file_path));
		conf.setMapperClass(IdentityMapper.class);
		conf.setCombinerClass(IdentityReducer.class);
		conf.setReducerClass(ReduceClass.class);
		JobClient.runJob(conf);
	}
}
