	/**
 * Program: LogParserGo.java
 * Editor: Waue Chen 
 * From :  NCHC. Taiwn
 * Last Update Date: 07/02/2008
 */
/**
 * Purpose : 
 * 	This program will parse your apache log and store it into Hbase.
 * 
 * HowToUse : 
 * 	Make sure two thing :
 * 	1. Upload apache logs ( /var/log/apache2/access.log* ) to \ 
 * 		hdfs (default: /user/waue/apache-log) \
 * 	 $ bin/hadoop dfs -put /var/log/apache2/ apache-log
 * 	2. parameter "dir" in main contains the logs.
 *  3. you should filter the exception contents manually, \ 
 *  	ex:  ::1 - - [29/Jun/2008:07:35:15 +0800] "GET / HTTP/1.0" 200 729 "...
 *  
 * Check Result:
 * 	Go to hbase console, type : 
 * 		hql > select * from apache-log;

 +-------------------------+-------------------------+-------------------------+
 | Row                     | Column                  | Cell                    |
 +-------------------------+-------------------------+-------------------------+
 | 118.170.101.250         | http:agent              | Mozilla/4.0 (compatible;|
 |                         |                         |  MSIE 4.01; Windows 95) |
 +-------------------------+-------------------------+-------------------------+
 | 118.170.101.250         | http:bytesize           | 318                     |
 +-------------------------+-------------------------+-------------------------+
 ..........(skip)........
 +-------------------------+-------------------------+-------------------------+
 | 87.65.93.58             | http:method             | OPTIONS                 |
 +-------------------------+-------------------------+-------------------------+
 | 87.65.93.58             | http:protocol           | HTTP/1.1                |
 +-------------------------+-------------------------+-------------------------+
 | 87.65.93.58             | referrer:-              | *                       |
 +-------------------------+-------------------------+-------------------------+
 | 87.65.93.58             | url:*                   | -                       |
 +-------------------------+-------------------------+-------------------------+
 31 row(s) in set. (0.58 sec)



 */
package tw.org.nchc.code;

import java.io.IOException;
import java.text.ParsePosition;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseAdmin;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTable;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

class Log {
	String gid, sid, version;

	String alert_name, class_type, priority;

	String source, destination, type;

	// String ttl, tos, id, iplen, dgmlen;

	String srcport, dstport, tmp;

	public Log(String data) {

		String[] arr = data.split(";");
		this.gid = arr[0];
		this.sid = arr[1];
		this.version = arr[2];
		this.alert_name = arr[3];
		this.class_type = arr[4];
		this.priority = arr[5];
		this.timestamp = getTime(arr[7] + "/" + arr[6] + ":" + arr[8] + ":"
				+ arr[9] + ":" + arr[10]);
		this.source = getIP(arr[11]);
		this.srcport = this.tmp;
		this.destination = getIP(arr[12]);
		this.dstport = this.tmp;
		this.type = arr[13];

	}

	long timestamp;

	String getIP(String str) {
		String res;
		int n = str.indexOf(":");
		if (n == -1) {
			res = str;
			this.tmp = "0";
		} else {
			String[] vec = str.split(":");
			res = vec[0];
			this.tmp = vec[1];
		}
		return res;
	}

	long getTime(String str) {
		SimpleDateFormat sdf = new SimpleDateFormat("dd/MM:HH:mm:ss",
				Locale.TAIWAN);
		Long timestamp = sdf.parse(str, new ParsePosition(0)).getTime();
		return timestamp;
	}
}

// import AccessLogParser
public class SnortBase {
	static HBaseConfiguration conf = new HBaseConfiguration();

	public static final String TABLE = "table.name";

	static String tableName = "NewTable2";

	static HTable table = null;

	public static class MapClass extends MapReduceBase implements
			Mapper<WritableComparable, Text, Text, Writable> {

		@Override
		// MapReduceBase.configure(JobConf job)
		// Default implementation that does nothing.
		public void configure(JobConf job) {
			// String get(String name,String defaultValue)
			// Get the value of the name property. If no such property exists,\
			// then defaultValue is returned.
		}

		public void map(WritableComparable key, Text value,
				OutputCollector<Text, Writable> output, Reporter reporter)
				throws IOException {

			// try {

			Log log = new Log(value.toString());

			// 查看value的值
			// FileWriter out = new FileWriter(new File(
			// "/home/waue/Desktop/snort-result.txt"));
			// out.write(value.toString() + "_time=" + log.timestamp + "\n");
			// out.flush();
			// out.close();

			if (table == null)
				table = new HTable(conf, new Text(tableName));
			// 實驗三

			String property_name = "name=" + log.alert_name + ";priority="
					+ log.priority + ";class=" + log.class_type + ";dst_port="
					+ log.dstport + ";type=" + log.type;
			long lockId = table.startUpdate(new Text(log.destination));
			table.put(lockId, new Text("SourceSID:" + log.source + "("
					+ log.sid+")"), property_name.getBytes());
			// 實驗二
			// long lockId = table.startUpdate(new
			// Text(log.destination+":"+log.sid));
			// String property_name =
			// "priority="+log.priority+
			// ";class="+log.class_type+
			// ";snort_id="+log.sid;
			// String property_source =
			// log.source+":"+log.srcport+" => "
			// +log.destination+":"+log.dstport;
			// String property_payload = log.type;
			// table.put(lockId, new Text("name:"+log.alert_name),
			// property_name.getBytes());
			// table.put(lockId, new Text("from:"+log.source),
			// property_source.getBytes());
			// table.put(lockId, new Text("payload:"+log.type),
			// property_payload.getBytes());
			// 實驗一
			// table.put(lockId, new Text("property:gen_id"),
			// log.gid.getBytes());
			// table.put(lockId, new Text("property:name"), log.sid.getBytes());
			// table.put(lockId, new Text("id:version"),
			// log.version.getBytes());
			// table.put(lockId, new Text("name:name"),
			// log.alert_name.getBytes());
			// table
			// .put(lockId, new Text("name:class"), log.class_type
			// .getBytes());
			// table.put(lockId, new Text("id:priority"), log.priority
			// .getBytes());
			// table.put(lockId, new Text("direction:soure"),
			// log.source.getBytes());
			// table.put(lockId, new Text("direction:destination"),
			// log.destination.getBytes());
			// table.put(lockId, new Text("direction:srcport"),
			// log.srcport.getBytes());
			// table.put(lockId, new Text("direction:dstport"),
			// log.dstport.getBytes());
			// table.put(lockId, new Text("payload:type"), log.type.getBytes());

			table.commit(lockId, log.timestamp);

			// } catch (Exception e) {
			// e.printStackTrace();
			// }

		}
	}

	// do it to resolve warning : FileSystem.listPaths
	static public Path[] listPaths(FileSystem fsm, Path path)
			throws IOException {
		FileStatus[] fss = fsm.listStatus(path);
		int length = fss.length;
		Path[] pi = new Path[length];
		for (int i = 0; i < length; i++) {
			pi[i] = fss[i].getPath();
		}
		return pi;
	}

	public static void runMapReduce(String tableName, String inpath)
			throws IOException {
		Path tempDir = new Path("/tmp/Mylog/");
		Path InputPath = new Path(inpath);
		FileSystem fs = FileSystem.get(conf);
		JobConf jobConf = new JobConf(conf, SnortBase.class);
		jobConf.setJobName("Snort Parse");
		jobConf.set(TABLE, tableName);
		// 先省略 自動搜尋目錄的功能
		// Path InputDir = new Path(inpath);
		// Path[] in = listPaths(fs, InputDir);
		// if (fs.isFile(InputDir))
		// {
		// jobConf.setInputPath(InputDir);
		// }
		// else{
		// for (int i = 0; i < in.length; i++){
		// if (fs.isFile(in[i])){
		// jobConf.addInputPath(in[i]);
		// } else
		// {
		// Path[] sub = listPaths(fs, in[i]);
		// for (int j = 0; j < sub.length; j++)
		// {
		// if (fs.isFile(sub[j]))
		// {
		// jobConf.addInputPath(sub[j]);
		// } } } } }

		jobConf.setInputPath(InputPath);
		jobConf.setOutputPath(tempDir);
		jobConf.setMapperClass(MapClass.class);
		JobClient client = new JobClient(jobConf);
		ClusterStatus cluster = client.getClusterStatus();
		jobConf.setNumMapTasks(cluster.getMapTasks());
		jobConf.setNumReduceTasks(0);
		fs.delete(tempDir);
		JobClient.runJob(jobConf);
		fs.delete(tempDir);
		fs.close();
	}

	public static void creatTable(String table) throws IOException {
		HBaseAdmin admin = new HBaseAdmin(conf);
		if (!admin.tableExists(new Text(table))) {
			System.out.println("1. " + table
					+ " table creating ... please wait");
			HTableDescriptor tableDesc = new HTableDescriptor(table);
			// 實驗三
			tableDesc.addFamily(new HColumnDescriptor("SourceSID:"));
			// 實驗二
			// tableDesc.addFamily(new HColumnDescriptor("name:"));
			// tableDesc.addFamily(new HColumnDescriptor("from:"));
			// tableDesc.addFamily(new HColumnDescriptor("payload:"));
			admin.createTable(tableDesc);
		} else {
			System.out.println("1. " + table + " table already exists.");
		}
		System.out.println("2. access_log files fetching using map/reduce");
	}

	public static void main(String[] args) throws IOException, Exception {

		String path = "/user/waue/snort-log/alert_flex_parsed.txt";

		// 先省略掉 parse完後自動上傳部份
		/*
		 * SnortParser sp = new
		 * SnortParser("/tmp/alert","/tmp/alert_SnortBase"); sp.parseToLine();
		 */
		creatTable(tableName);
		Long start_time = (new Date()).getTime();
		runMapReduce(tableName, path);
		Long end_time = (new Date()).getTime();
		System.out.println(end_time - start_time);
	}

}
