/**
 * Program: HBaseRecordPro.java
 * Editor: Waue Chen 
 * From :  NCHC. Taiwn
 * Last Update Date: 07/02/2008
 */
/*
 * Cloud9: A MapReduce Library for Hadoop
 */

package tw.org.nchc.demo;

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;

import tw.org.nchc.tuple.Schema;
import tw.org.nchc.tuple.Tuple;

/**
 * <p>
 * Demo that illustrates the use of the tuple library ({@link Tuple} class).
 * Input comes from Bible+Shakespeare sample collection, encoded as single-field
 * tuples; see {@link DemoPackRecords}. Sample of final output:
 * </p>
 * 
 * <pre>
 * ...
 * (admirable, 0)    9
 * (admirable, 1)    6
 * (admiral, 0)      2
 * (admiral, 1)      4
 * (admiration, 0)  10
 * (admiration, 1)   6
 * (admire, 0)       5
 * (admire, 1)       3
 * (admired, 0)     12
 * (admired, 1)      7
 * ...
 * </pre>
 * 
 * <p>
 * The first field of the key tuple contains a token, the second field indicates
 * whether it was found on a even-length or odd-length line. The value is the
 * count of the tuple occurrences in the collection. In the MapReduce cycle,
 * output keys consist of tuples (Token, EvenOrOdd). The second field of the
 * tuple indicates whether the token was found on a line with an even or an odd
 * number of characters. Values consist of counts of tuple occurrences. Expected
 * trace of the demo:
 * </p>
 * 
 * <pre>
 * Map input records=156215
 * Map output records=1734298
 * Map input bytes=13118917
 * Map output bytes=66214039
 * Combine input records=1734298
 * Combine output records=192045
 * Reduce input groups=59225
 * Reduce input records=192045
 * Reduce output records=59225
 * </pre>
 * 
 * <p>
 * Obviously, this isn't a particularly meaningful program, but does illustrate
 * the use of the {@link Tuple} class.
 * </p>
 */
public class DemoWordCountTuple {

	// create the schema for the tuple that will serve as the key
	private static final Schema KEY_SCHEMA = new Schema();

	// define the schema statically
	static {
		KEY_SCHEMA.addField("Token", String.class, "");
		KEY_SCHEMA.addField("EvenOrOdd", Integer.class, new Integer(1));
	}

	// mapper that emits tuple as the key, and value '1' for each occurrence
	private static class MapClass extends MapReduceBase implements
			Mapper<LongWritable, Tuple, Tuple, IntWritable> {

		// define value '1' statically so we can reuse the object, i.e., avoid
		// unnecessary object creation
		private final static IntWritable one = new IntWritable(1);

		// once again, reuse tuples if possible
		private Tuple tupleOut = KEY_SCHEMA.instantiate();

		public void map(LongWritable key, Tuple tupleIn,
				OutputCollector<Tuple, IntWritable> output, Reporter reporter)
				throws IOException {

			// the input value is a tuple; get field 0
			// see DemoPackRecords of how input SequenceFile is generated
			String line = (String) tupleIn.get(0);
			StringTokenizer itr = new StringTokenizer(line);
			while (itr.hasMoreTokens()) {
				String token = itr.nextToken();

				// put new values into the tuple
				tupleOut.set("Token", token);
				tupleOut.set("EvenOrOdd", line.length() % 2);

				// emit key-value pair
				output.collect(tupleOut, one);
			}
		}
	}

	// reducer counts up tuple occurrences
	private static class ReduceClass extends MapReduceBase implements
			Reducer<Tuple, IntWritable, Tuple, IntWritable> {
		private final static IntWritable SumValue = new IntWritable();

		public synchronized void reduce(Tuple tupleKey,
				Iterator<IntWritable> values,
				OutputCollector<Tuple, IntWritable> output, Reporter reporter)
				throws IOException {
			// sum values
			int sum = 0;
			while (values.hasNext()) {
				sum += values.next().get();
			}

			// keep original tuple key, emit sum of counts as value
			SumValue.set(sum);
			output.collect(tupleKey, SumValue);
		}
	}

	// dummy constructor
	private DemoWordCountTuple() {
	}

	/**
	 * Runs the demo.
	 */
	public static void main(String[] args) throws IOException {
		String inPath = "/shared/sample-input/bible+shakes.nopunc.packed";
		String outputPath = "word-counts-tuple";
		int numMapTasks = 20;
		int numReduceTasks = 20;

		JobConf conf = new JobConf(DemoWordCountTuple.class);
		conf.setJobName("wordcount");

		conf.setNumMapTasks(numMapTasks);
		conf.setNumReduceTasks(numReduceTasks);
		// 0.16
		conf.setInputPath(new Path(inPath));
		conf.setInputFormat(SequenceFileInputFormat.class);

		conf.setOutputPath(new Path(outputPath));
		conf.setOutputKeyClass(Tuple.class);
		conf.setOutputValueClass(IntWritable.class);
		conf.setOutputFormat(SequenceFileOutputFormat.class);

		conf.setMapperClass(MapClass.class);
		conf.setCombinerClass(ReduceClass.class);
		conf.setReducerClass(ReduceClass.class);

		JobClient.runJob(conf);
	}
}
