/**
 * Program: WordCountFromHBase.java
 * Editor: Waue Chen 
 * From :  NCHC. Taiwn
 * Last Update Date: 07/02/2008
 */

/**
 * Purpose : 
 * 	Word counting from Hbase then store result in Hadoop file system 
 * 
 * HowToUse : 
 * 	Make sure Hadoop file system are running and HBase has correct data.
 * 	Suggest to run WordCountIntoHBase first.
 * 	finally, modify these setup parameters and run.
 * 
 * Check Result:
 *  
 * 	inspect http://localhost:50070 by web explorer
 */

package tw.org.nchc.code;

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HStoreKey;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapred.TableInputFormat;
import org.apache.hadoop.hbase.mapred.TableMap;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
@SuppressWarnings("unused")

public class WordCountFromHBase {
	/* setup parameters */
	// set the output path 
	static String outputPath = "counts2";

	// org.apache.hadoop.hbase.mapred.TableMap<K,V>  \ 
	// TableMap<K extends org.apache.hadoop.io.WritableComparable, \ 
	// 	V extends org.apache.hadoop.io.Writable> \
	// Scan an HBase table to sort by a specified sort column. \
	// If the column does not exist, the record is not passed to Reduce.;
	private static class MapClass extends TableMap<Text, IntWritable> {

		// set one as (IntWritable)1
		private final static IntWritable one = new IntWritable(1);
		// set column 
		private final static Text textcol = new Text(WordCountIntoHBase.colstr);
		private Text word = new Text();		
		// TableMap is a interface, map is a abstract method. now, we should \
		// 	inprement map() at here, format is : \
		// map(HStoreKey key, MapWritable value,  \
		// 	OutputCollector<K,V> output, Reporter reporter) ;
        // Call a user defined function on a single HBase record, \  
		// 	represented by a key and its associated record value. ; 
		public void map(HStoreKey key, MapWritable cols,
				OutputCollector<Text, IntWritable> output, Reporter reporter)
				throws IOException {
			// 
			// The first get() is : Writable <- get(Object key) \
			// 	get in interface Map<Writable,Writable>  ;
			// Use ImmutableBytesWritable to downcast Writable \
			// The second get() is : byte[] <- get() \
			// 	Get the data from the BytesWritable. ;
			// Text.decode is parse UTF-8 code to a String ;
			// per "line" is per row data in HTable 
			String line = Text.decode( ((ImmutableBytesWritable) cols.get(textcol) )
					.get() );
			
			//let us know what is "line"
			/*
			RandomAccessFile raf = 
				new RandomAccessFile("/home/waue/mr-result.txt","rw");
			raf.seek(raf.length()); // move pointer to end
			raf.write(("\n"+line).getBytes());
			raf.close();
			*///end
			// the result is the contents of merged files "
			
			//StringTokenizer will divide a line into a word  
			StringTokenizer itr = new StringTokenizer(line);
			// set every word as one
			while (itr.hasMoreTokens()) {
				// nextToken will return this value in String and point to next \
				// Text.set() = Set to contain the contents of a string.
				word.set(itr.nextToken());	
				// OutputCollector.collect = collect(K key, V value) \
				//  Adds a key/value pair to the output.
				output.collect(word, one);
			}
		}
	}

	// reducer: sums up all the counts
	private static class ReduceClass extends MapReduceBase implements
			Reducer<Text, IntWritable, Text, IntWritable> {

		// reuse objects
		private final static IntWritable SumValue = new IntWritable();
		
		// this sample's reduce() format is the same as map() \
		// 	reduce is a method waiting for implement \
		// 	four type in this sample is (Text , Iterator<IntWritable>, \
		// 		OutputCollector<Text, IntWritable> , Reporter ) ;
		public void reduce(Text key, Iterator<IntWritable> values,
				OutputCollector<Text, IntWritable> output, Reporter reporter)
				throws IOException {
			// sum up value
			int sum = 0;
			// "key" is word , "value" is sum 
			// why values.hasNext(), not key.hasNext()
			while (values.hasNext()) { 
				// next() will return this value and pointer to next event \
				// 	IntWritable.get() will transfer IntWritable to Int
				sum += values.next().get(); 
			}
			// IntWritable.set(int) will transfer Int to IntWritable
			SumValue.set(sum);
			// hense we set outputPath in main, the output.collect will put 
			// 	data in Hadoop
			output.collect(key, SumValue);
		}
	}

	private WordCountFromHBase() {
	}

	/**
	 * Runs the demo.
	 */
	public static void main(String[] args) throws IOException {
		

		int mapTasks = 1;
		int reduceTasks = 1;
		// initialize job;
		JobConf conf = new JobConf(WordCountFromHBase.class);
		// TableMap.initJob will build HBase code \
		// 	"org.apache.hadoop.hbase.mapred.TableMap".initJob \
		// 	(Table_name,column_string,Which_class_will_use,job_configure);
		TableMap.initJob(WordCountIntoHBase.Table_Name,
				WordCountIntoHBase.colstr, MapClass.class, conf);
		conf.setJobName(WordCountIntoHBase.Table_Name + "store");
		conf.setNumMapTasks(mapTasks);
		conf.setNumReduceTasks(reduceTasks);
		
		//Set the key class for the job output data.
		conf.setOutputKeyClass(Text.class);
		//Set the value class for job outputs.
		conf.setOutputValueClass(IntWritable.class);
		// MapperClass,CombinerClass,ReducerClass are essential
		conf.setMapperClass(MapClass.class);
		conf.setCombinerClass(ReduceClass.class);
		conf.setReducerClass(ReduceClass.class);
		// input is Hbase format => TableInputFormat
		conf.setInputFormat(TableInputFormat.class);
		conf.setOutputPath(new Path(outputPath));
//		 delete the old path with the same name 
		FileSystem.get(conf).delete(new Path(outputPath));
		JobClient.runJob(conf);
	}
}
