/**
 * Program: LogParserGo.java
 * Editor: Waue Chen 
 * From :  NCHC. Taiwn
 * Last Update Date: 07/02/2008
 */
/**
 * Purpose : 
 * 	This program will parse your apache log and store it into Hbase.
 * 
 * HowToUse : 
 * 	Make sure two thing :
 * 	1. Upload apache logs ( /var/log/apache2/access.log* ) to \ 
 * 		hdfs (default: /user/waue/apache-log) \
 * 	 $ bin/hadoop dfs -put /var/log/apache2/ apache-log
 * 	2. parameter "dir" in main contains the logs.
 *  3. you should filter the exception contents manually, \ 
 *  	ex:  ::1 - - [29/Jun/2008:07:35:15 +0800] "GET / HTTP/1.0" 200 729 "...
 *  
 * Check Result:
 * 	Go to hbase console, type : 
 * 		hql > select * from apache-log;

+-------------------------+-------------------------+-------------------------+
| Row                     | Column                  | Cell                    |
+-------------------------+-------------------------+-------------------------+
| 118.170.101.250         | http:agent              | Mozilla/4.0 (compatible;|
|                         |                         |  MSIE 4.01; Windows 95) |
+-------------------------+-------------------------+-------------------------+
| 118.170.101.250         | http:bytesize           | 318                     |
+-------------------------+-------------------------+-------------------------+
..........(skip)........
+-------------------------+-------------------------+-------------------------+
| 87.65.93.58             | http:method             | OPTIONS                 |
+-------------------------+-------------------------+-------------------------+
| 87.65.93.58             | http:protocol           | HTTP/1.1                |
+-------------------------+-------------------------+-------------------------+
| 87.65.93.58             | referrer:-              | *                       |
+-------------------------+-------------------------+-------------------------+
| 87.65.93.58             | url:*                   | -                       |
+-------------------------+-------------------------+-------------------------+
31 row(s) in set. (0.58 sec)



 */
package tw.org.nchc.code;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseAdmin;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTable;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

// import AccessLogParser
/**
 * Access_log fetcher. TODO: FgnStatLog, Error_log, Access_log (Default,
 * W3CExtended, IISw3cExtended)
 */
public class LogParserGo {
	static HBaseConfiguration conf = new HBaseConfiguration();

	public static final String TABLE = "table.name";

	static String tableName;

	static HTable table = null;
	
	static void print(String str){
		System.out.println("STR  = "+str);
	}
	public static class MapClass extends MapReduceBase implements
			Mapper<WritableComparable, Text, Text, Writable> {

		@Override
		// MapReduceBase.configure(JobConf job) 
		// Default implementation that does nothing.
		public void configure(JobConf job) {
			// String get(String name,String defaultValue) 
			// Get the value of the name property. If no such property exists,\
			//	then defaultValue is returned.
			tableName = job.get(TABLE, "");
		}

		public void map(WritableComparable key, Text value,
				OutputCollector<Text, Writable> output, Reporter reporter)
				throws IOException {
			
			try {
				LogParser log = new LogParser(value.toString());
				print(value.toString());
				FileWriter out = new FileWriter(new File(
				"/home/waue/Desktop/mr-result.txt"));
				out.write(value.toString());
				out.flush();
				out.close();

				if (table == null)
					table = new HTable(conf, new Text(tableName));
				long lockId = table.startUpdate(new Text(log.getIp()));
				table.put(lockId, new Text("http:protocol"), log.getProtocol()
						.getBytes());
				table.put(lockId, new Text("http:method"), log.getMethod()
						.getBytes());
				table.put(lockId, new Text("http:code"), log.getCode()
						.getBytes());
				table.put(lockId, new Text("http:bytesize"), log.getByteSize()
						.getBytes());
				table.put(lockId, new Text("http:agent"), log.getAgent()
						.getBytes());
				table.put(lockId, new Text("url:" + log.getUrl()), log
						.getReferrer().getBytes());
				table.put(lockId, new Text("referrer:" + log.getReferrer()),
						log.getUrl().getBytes());
				table.commit(lockId, log.getTimestamp());
				
			} catch (Exception e) {
				e.printStackTrace();
			}
			
		}
	}

	// do it to resolve warning : FileSystem.listPaths
	static public Path[] listPaths(FileSystem fsm, Path path)
			throws IOException {
		FileStatus[] fss = fsm.listStatus(path);
		int length = fss.length;
		Path[] pi = new Path[length];
		for (int i = 0; i < length; i++) {
			pi[i] = fss[i].getPath();
		}
		return pi;
	}

	public static void runMapReduce(String table, String dir)
			throws IOException {
		Path tempDir = new Path("/tmp/Mylog/");
		Path InputDir = new Path(dir);
		FileSystem fs = FileSystem.get(conf);
		JobConf jobConf = new JobConf(conf, LogParserGo.class);
		jobConf.setJobName("apache log fetcher");
		jobConf.set(TABLE, table);
		Path[] in = listPaths(fs, InputDir);
		if (fs.isFile(InputDir)) {
			jobConf.setInputPath(InputDir);
		} else {
			for (int i = 0; i < in.length; i++) {
				if (fs.isFile(in[i])) {
					jobConf.addInputPath(in[i]);
				} else {
					Path[] sub = listPaths(fs, in[i]);
					for (int j = 0; j < sub.length; j++) {
						if (fs.isFile(sub[j])) {
							jobConf.addInputPath(sub[j]);
						}
					}
				}
			}
		}
		jobConf.setOutputPath(tempDir);

		jobConf.setMapperClass(MapClass.class);

		JobClient client = new JobClient(jobConf);
		ClusterStatus cluster = client.getClusterStatus();
		jobConf.setNumMapTasks(cluster.getMapTasks());
		jobConf.setNumReduceTasks(0);

		JobClient.runJob(jobConf);

		fs.delete(tempDir);
		fs.close();
	}

	public static void creatTable(String table) throws IOException {
		HBaseAdmin admin = new HBaseAdmin(conf);
		if (!admin.tableExists(new Text(table))) {
			System.out.println("1. " + table
					+ " table creating ... please wait");
			HTableDescriptor tableDesc = new HTableDescriptor(table);
			tableDesc.addFamily(new HColumnDescriptor("http:"));
			tableDesc.addFamily(new HColumnDescriptor("url:"));
			tableDesc.addFamily(new HColumnDescriptor("referrer:"));
			admin.createTable(tableDesc);
		} else {
			System.out.println("1. " + table + " table already exists.");
		}
		System.out.println("2. access_log files fetching using map/reduce");
	}

	public static void main(String[] args) throws IOException {
		String table_name = "apache-log2";
		String dir = "/user/waue/apache-log";
		
		// if (eclipseRun) {
		// table_name = "log";
		// dir = "apache-log";
		// } else if (args.length < 2) {
		// System.out
		// .println("Usage: logfetcher <access_log file or directory>
		// <table_name>");
		// System.exit(1);
		// } else {
		// table_name = args[1];
		// dir = args[0];
		// }

		creatTable(table_name);
		runMapReduce(table_name, dir);

	}

}
