/**
 * Program: WordCountIntoHBase.java
 * Editor: Waue Chen 
 * From :  NCHC. Taiwn
 * Last Update Date: 07/02/2008
 */

/**
 * Purpose : 
 * 	Store every line from $Input_Path to HBase
 * 
 * HowToUse : 
 * 	Make sure Hadoop file system and HBase are running correctly.
 * 	Use Hadoop instruction to add input-text-files to $Input_Path.
 *  ($ bin/hadoop dfs -put local_dir hdfs_dir)
 * 	Then run the program with BuildHTable.java after \
 * 	modifying these setup parameters.
 * 
 * Check Result : 
 * 	View the result by hbase instruction (hql> select * from $Table_Name). 
 * 	Or run WordCountFromHBase.java then inspect http://localhost:60070 by web explorer;
 */

package tw.org.nchc.code;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapred.TableReduce;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;

public class WordCountIntoHBase {

	/* setup parameters */
	// $Input_Path. Please make sure the path is correct and contains input
	// files
	static final String Input_Path = "/user/waue/input";

	// Hbase table name, the program will create it
	static final String Table_Name = "word_count5";

	// column name, the program will create it
	static final String colstr = "word:text";

	// constructor
	private WordCountIntoHBase() {
	}

	private static class ReduceClass extends TableReduce<LongWritable, Text> {
		// set (column_family:column_qualify)
		private static final Text col = new Text(WordCountIntoHBase.colstr);

		// this map holds the columns per row
		private MapWritable map = new MapWritable();

		public void reduce(LongWritable key, Iterator<Text> values,
				OutputCollector<Text, MapWritable> output, Reporter reporter)
				throws IOException {
			// contents must be ImmutableBytesWritable
			ImmutableBytesWritable bytes = new ImmutableBytesWritable(values
					.next().getBytes());
			map.clear();
			// write data
			map.put(col, bytes);
			// add the row with the key as the row id
			output.collect(new Text(key.toString()), map);
		}
	}

	/**
	 * Runs the demo.
	 */
	public static void main(String[] args) throws IOException {
		// parse colstr to split column family and column qualify
		String tmp[] = colstr.split(":");
		String Column_Family = tmp[0] + ":";
		String CF[] = { Column_Family };
		// check whether create table or not , we don't admit \
		// the same name but different structure
		BuildHTable build_table = new BuildHTable(Table_Name, CF);
		if (!build_table.checkTableExist(Table_Name)) {
			if (!build_table.createTable()) {
				System.out.println("create table error !");
			}
		} else {
			System.out.println("Table \"" + Table_Name
					+ "\" has already existed !");
		}
		int mapTasks = 1;
		int reduceTasks = 1;
		JobConf conf = new JobConf(WordCountIntoHBase.class);
		conf.setJobName(Table_Name);

		// must initialize the TableReduce before running job
		TableReduce.initJob(Table_Name, ReduceClass.class, conf);
		conf.setNumMapTasks(mapTasks);
		conf.setNumReduceTasks(reduceTasks);
		conf.setInputPath(new Path(Input_Path));		
		conf.setMapperClass(IdentityMapper.class);
		conf.setCombinerClass(IdentityReducer.class);
		conf.setReducerClass(ReduceClass.class);

		JobClient.runJob(conf);
	}
}
