/**
 * Program: LogFetcher.java
 * Editor: Waue Chen 
 * From :  NCHC. Taiwn
 * Last Update Date: 07/02/2008
 */
/**
 * Purpose : 
 * 	This program will parse your apache log and store it into Hbase.
 * 
 * HowToUse : 
 * 	Make sure two thing :
 * 	1. Upload apache logs ( /var/log/apache2/access.log* ) to \ 
 * 		hdfs (default: /user/waue/apache-log) \
 * 	 $ bin/hadoop dfs -put /var/log/apache2/ apache-log
 * 	2. parameter "dir" in main contains the logs.
 *  3. you should filter the exception contents manually, \ 
 *  	ex:  ::1 - - [29/Jun/2008:07:35:15 +0800] "GET / HTTP/1.0" 200 729 "...
 *  
 * Check Result:
 * 	Go to hbase console, type : 
 * 		hql > select * from apache-log;

+-------------------------+-------------------------+-------------------------+
| Row                     | Column                  | Cell                    |
+-------------------------+-------------------------+-------------------------+
| 118.170.101.250         | http:agent              | Mozilla/4.0 (compatible;|
|                         |                         |  MSIE 4.01; Windows 95) |
+-------------------------+-------------------------+-------------------------+
| 118.170.101.250         | http:bytesize           | 318                     |
+-------------------------+-------------------------+-------------------------+
..........(skip)........
+-------------------------+-------------------------+-------------------------+
| 87.65.93.58             | http:method             | OPTIONS                 |
+-------------------------+-------------------------+-------------------------+
| 87.65.93.58             | http:protocol           | HTTP/1.1                |
+-------------------------+-------------------------+-------------------------+
| 87.65.93.58             | referrer:-              | *                       |
+-------------------------+-------------------------+-------------------------+
| 87.65.93.58             | url:*                   | -                       |
+-------------------------+-------------------------+-------------------------+
31 row(s) in set. (0.58 sec)

 */


package tw.org.nchc.demo;

import java.io.IOException;
import java.text.ParseException;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseAdmin;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTable;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
// import AccessLogParser
/**
 * Access_log fetcher. TODO: FgnStatLog, Error_log, Access_log (Default,
 * W3CExtended, IISw3cExtended)
 */
public class LogFetcher {
	static HBaseConfiguration conf = new HBaseConfiguration();

	public static final String TABLE = "table.name";

	static String tableName;

	static HTable table = null;

	static boolean eclipseRun = false;

	public static class MapClass extends MapReduceBase implements
			Mapper<WritableComparable, Text, Text, Writable> {

		@Override
		public void configure(JobConf job) {
			tableName = job.get(TABLE, "");
		}

		public void map(WritableComparable key, Text value,
				OutputCollector<Text, Writable> output, Reporter reporter)
				throws IOException {
			try {
				
				AccessLogParser log = new AccessLogParser(value.toString());
				if (table == null)
					table = new HTable(conf, new Text(tableName));
				long lockId = table.startUpdate(new Text(log.getIp()));
				table.put(lockId, new Text("http:protocol"), log.getProtocol()
						.getBytes());
				table.put(lockId, new Text("http:method"), log.getMethod()
						.getBytes());
				table.put(lockId, new Text("http:code"), log.getCode()
						.getBytes());
				table.put(lockId, new Text("http:bytesize"), log.getByteSize()
						.getBytes());
				table.put(lockId, new Text("http:agent"), log.getAgent()
						.getBytes());
				table.put(lockId, new Text("url:" + log.getUrl()), log
						.getReferrer().getBytes());
				table.put(lockId, new Text("referrer:" + log.getReferrer()),
						log.getUrl().getBytes());
				table.commit(lockId, log.getTimestamp());
			} catch (ParseException e) {
				e.printStackTrace();
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}
//	 do it to resolve warning : FileSystem.listPaths 
	static public Path[] listPaths(FileSystem fsm,Path path) throws IOException
	{
		FileStatus[] fss = fsm.listStatus(path);
		int length = fss.length;
		Path[] pi = new Path[length];
		for (int i=0 ; i< length; i++)
		{
			pi[i] = fss[i].getPath();
		}
		return pi;
	}	
	public static void runMapReduce(String table, String dir)
			throws IOException {
		Path tempDir = new Path("log/temp");
		Path InputDir = new Path(dir);
		FileSystem fs = FileSystem.get(conf);
		JobConf jobConf = new JobConf(conf, LogFetcher.class);
		jobConf.setJobName("apache log fetcher");
		jobConf.set(TABLE, table);
		Path[] in = listPaths(fs, InputDir);
		if (fs.isFile(InputDir)) {
			jobConf.setInputPath(InputDir);
		} else {
			for (int i = 0; i < in.length; i++) {
				if (fs.isFile(in[i])) {
					jobConf.addInputPath(in[i]);
				} else {
					Path[] sub = listPaths(fs, in[i]);
					for (int j = 0; j < sub.length; j++) {
						if (fs.isFile(sub[j])) {
							jobConf.addInputPath(sub[j]);
						}
					}
				}
			}
		}
		jobConf.setOutputPath(tempDir);
		
		jobConf.setMapperClass(MapClass.class);

		JobClient client = new JobClient(jobConf);
		ClusterStatus cluster = client.getClusterStatus();
		jobConf.setNumMapTasks(cluster.getMapTasks());
		jobConf.setNumReduceTasks(0);

		JobClient.runJob(jobConf);

		fs.delete(tempDir);		
		fs.close();
	}

	public static void creatTable(String table) throws IOException {
		HBaseAdmin admin = new HBaseAdmin(conf);
		if (!admin.tableExists(new Text(table))) {
			System.out.println("1. " + table
					+ " table creating ... please wait");
			HTableDescriptor tableDesc = new HTableDescriptor(table);
			tableDesc.addFamily(new HColumnDescriptor("http:"));
			tableDesc.addFamily(new HColumnDescriptor("url:"));
			tableDesc.addFamily(new HColumnDescriptor("referrer:"));
			admin.createTable(tableDesc);
		} else {
			System.out.println("1. " + table + " table already exists.");
		}
		System.out.println("2. access_log files fetching using map/reduce");
	}

	public static void main(String[] args) throws IOException {
		String table_name = "log";
		String dir = "apache-log";

		if (eclipseRun) {
			table_name = "log";
			dir = "apache-log";
		} else if (args.length < 2) {
			System.out
					.println("Usage: logfetcher <access_log file or directory> <table_name>");
			System.exit(1);
		} else {
			table_name = args[1];
			dir = args[0];
		}
		creatTable(table_name);
		runMapReduce(table_name, dir);

	}

}
