/**
 * Program: SnortUploadHbase.java
 * Editor: Waue Chen 
 * From :  NCHC. Taiwn
 * Last Update Date: 07/23/2008
 */

/**
 * Purpose : 
 * 	The program will parse the log of snort (/var/log/snort/alert)
 * 		into Hbase table "snort".
 * 
 * HowToUse : 
 * 	Run by eclipse ! (dependency by SnortParser.java)

 * Check Result:
 * 	Go to hbase console, type : 
 * 		hql > select * from snort; 


 */

package tw.org.nchc.code;

import java.io.File;
import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapred.TableReduce;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;

import com.sun.org.apache.xerces.internal.impl.xpath.regex.ParseException;

public class SnortUploadHbase {
	/* Major parameter */
	// it indicates local path, not hadoop file system path
	final static String source_file = "/var/log/snort/alert";

	/* Minor parameter */
	// column family name
	final static String column_family = "snort:";

	// table name
	final static String table_name = "Snort";

	// separate char
	final static String sp = ";";
	
	// data source tmp
	final static String text_tmp = "/tmp/alert_my";

	// on this sample, map is nonuse, we use reduce to handle
	private static class ReduceClass extends TableReduce<LongWritable, Text> {
		public void reduce(LongWritable key, Iterator<Text> values,
				OutputCollector<Text, MapWritable> output, Reporter reporter)
				throws IOException {

			String first_line = "gid;sid;version;alert name;" +
					"class;priority;month;day;hour;min;second;source;" +
					"destination;type;ttl;tos;id; iplen;dgmlen;";

			// extract cf data
			String[] cf = first_line.split(sp);
			int length = cf.length;
			 
			// values.next().getByte() can get value and transfer to byte form,
			String stro = new String(values.next().getBytes());
			String str[] = stro.split(sp);

			// Column id is created dymanically,
			Text[] col_n = new Text[length];
			byte[][] b_l = new byte[length][];
			// contents must be ImmutableBytesWritable
			ImmutableBytesWritable[] w_l = new ImmutableBytesWritable[length];

			// This map connect to hbase table and holds the columns per row
			MapWritable map = new MapWritable();
			map.clear();

			// prepare to write data into map
			for (int i = 0; i < length; i++) {
				col_n[i] = new Text(column_family + cf[i]);
				b_l[i] = str[i].getBytes();
				w_l[i] = new ImmutableBytesWritable(b_l[i]);
				// populate the current row
				map.put(col_n[i], w_l[i]);
			}
			// add the row with the key as the row id
			output.collect(new Text(key.toString()), map);
		}
	}

	public SnortUploadHbase() {
	}
	
	// tmp file delete
	boolean deleteFile(String str)throws IOException{
		File df = new File(str);
		
		if(df.exists()){
			if(!df.delete()){
				System.err.print("delete file error !");
			}
		}else{
			System.out.println("file not exit!");
		}
		return true;
	}
	/**
	 * Runs the demo.
	 */
	public static void main(String[] args) throws IOException,ParseException,Exception {
		
		String[] col_family = {column_family};
		Path text_path = new Path(text_tmp);
		
//		setup.parseFirstLine(source_file, text_tmp);
//		System.out.println(first_line);
		SnortParser sp = new SnortParser(source_file,text_tmp);
		sp.parseToLine();
		
		
		BuildHTable build_table = new BuildHTable(table_name,
				col_family);
		if (!build_table.checkTableExist(table_name)) {
			if (!build_table.createTable()) {
				System.out.println("create table error !");
			}
		} else {
			System.out.println("Table \"" + table_name
					+ "\" has already existed !");
		}
		JobConf conf = new JobConf(SnortUploadHbase.class);
		FileSystem fileconf = FileSystem.get(conf);
		fileconf.copyFromLocalFile(true, text_path, text_path);
		// Job name; you can modify to any you like
		conf.setJobName("SnortDataBase");
		final int mapTasks = 1;
		final int reduceTasks = 1;
		// Hbase table name must be correct , in our profile is t1_table
		TableReduce.initJob(table_name, ReduceClass.class, conf);

		// below are map-reduce profile
		conf.setNumMapTasks(mapTasks);
		conf.setNumReduceTasks(reduceTasks);
		
		conf.setInputPath(text_path);

		
		conf.setMapperClass(IdentityMapper.class);
		conf.setCombinerClass(IdentityReducer.class);
		conf.setReducerClass(ReduceClass.class);

		JobClient.runJob(conf);
		
		// delete tmp file
		// 0.16
		FileSystem.get(conf).delete(text_path);
		
	}
}