/**
 * Program: HBaseRecordPro.java
 * Editor: Waue Chen 
 * From :  NCHC. Taiwn
 * Last Update Date: 07/02/2008
 * Upgrade to 0.17
 */
/*
 * Cloud9: A MapReduce Library for Hadoop
 */

package tw.org.nchc.demo;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;

import tw.org.nchc.code.Convert;
import tw.org.nchc.tuple.ListWritable;
import tw.org.nchc.tuple.Schema;
import tw.org.nchc.tuple.Tuple;

/**
 * <p>
 * Demo that illustrates the use of the tuple library ({@link Tuple} and
 * {@link ListWritable} class). Input comes from Bible+Shakespeare sample
 * collection, encoded with {@link DemoPackRecords2}. Otherwise, this demo is
 * exactly the same as {@link DemoWordCountTuple}.
 * </p>
 */
public class DemoWordCountTuple2 {

	// create the schema for the tuple that will serve as the key
	private static final Schema KEY_SCHEMA = new Schema();

	// define the schema statically
	static {
		KEY_SCHEMA.addField("Token", String.class, "");
		KEY_SCHEMA.addField("EvenOrOdd", Integer.class, new Integer(1));
	}

	// mapper that emits tuple as the key, and value '1' for each occurrence
	private static class MapClass extends MapReduceBase implements
			Mapper<LongWritable, Tuple, Tuple, IntWritable> {

		// define value '1' statically so we can reuse the object, i.e., avoid
		// unnecessary object creation
		private final static IntWritable one = new IntWritable(1);

		// once again, reuse tuples if possible
		private Tuple tupleOut = KEY_SCHEMA.instantiate();

		public void map(LongWritable key, Tuple tupleIn,
				OutputCollector<Tuple, IntWritable> output, Reporter reporter)
				throws IOException {

			@SuppressWarnings("unchecked")
			ListWritable<Text> list = (ListWritable<Text>) tupleIn.get(1);

			for (int i = 0; i < list.size(); i++) {
				Text t = (Text) list.get(i);

				String token = t.toString();

				// put new values into the tuple
				tupleOut.set("Token", token);
				tupleOut.set("EvenOrOdd", ((Integer) tupleIn.get(0)) % 2);

				// emit key-value pair
				output.collect(tupleOut, one);
			}
		}
	}

	// reducer counts up tuple occurrences
	private static class ReduceClass extends MapReduceBase implements
			Reducer<Tuple, IntWritable, Tuple, IntWritable> {
		private final static IntWritable SumValue = new IntWritable();

		public synchronized void reduce(Tuple tupleKey,
				Iterator<IntWritable> values,
				OutputCollector<Tuple, IntWritable> output, Reporter reporter)
				throws IOException {
			// sum values
			int sum = 0;
			while (values.hasNext()) {
				sum += values.next().get();
			}

			// keep original tuple key, emit sum of counts as value
			SumValue.set(sum);
			output.collect(tupleKey, SumValue);
		}
	}

	// dummy constructor
	private DemoWordCountTuple2() {
	}

	/**
	 * Runs the demo.
	 */
	public static void main(String[] args) throws IOException {
		String inPath = "/shared/sample-input/bible+shakes.nopunc.packed2";
		String outputPath = "word-counts2-tuple";
		int numMapTasks = 20;
		int numReduceTasks = 20;

		JobConf conf = new JobConf(DemoWordCountTuple2.class);
		conf.setJobName("wordcount");

		conf.setNumMapTasks(numMapTasks);
		conf.setNumReduceTasks(numReduceTasks);
		
		// 0.16
//		conf.setInputPath(new Path(inPath));
		Convert.setInputPath(conf,new Path(inPath));
		conf.setInputFormat(SequenceFileInputFormat.class);
		// 0.16
//		conf.setOutputPath(new Path(outputPath));
		Convert.setInputPath(conf, new Path(outputPath));
		
		conf.setOutputKeyClass(Tuple.class);
		conf.setOutputValueClass(IntWritable.class);
		conf.setOutputFormat(SequenceFileOutputFormat.class);

		conf.setMapperClass(MapClass.class);
		conf.setCombinerClass(ReduceClass.class);
		conf.setReducerClass(ReduceClass.class);

		JobClient.runJob(conf);
	}
}
