icas 0.1版完成
/**
* Program SnortProduce.java
* Editor: Waue Chen
* From : GTD. NCHC. Taiwn
* Last Update Date: 07/29/2009
* support version : hadoop 0.18.3, hbase 0.18.1
*/
package tw.org.nchc.code;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapred.TableReduce;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class ICAS extends Configured implements Tool {
HBaseConfiguration hbase_conf;
HBaseAdmin hbase_admin;
public ICAS() throws IOException {
hbase_conf = new HBaseConfiguration();
hbase_admin = new HBaseAdmin(hbase_conf);
}
public static class ICAS_M extends MapReduceBase implements
Mapper<LongWritable, Text, Text, Text> {
String getip(String str) {
String[] ret = str.split(":");
return ret[0];
}
public void map(LongWritable key, Text value,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
// [3] sig [4]class [5]y [6]m [7]d [8]h [9]M [10]s [11]s [12]d [13]t
String line = value.toString();
String[] str = line.split(";");
String source_ip = getip(str[11]);
String dest_ip = getip(str[12]);
String fkey = dest_ip ;
String date = str[5] + "/" + str[6] + "/" + str[7] + "_" + str[8]
+ ":" + str[9] + ":" + str[10];
// source @ date @ sig @ class @ type
String fvalue = source_ip + "@" + date + "@" + str[3] +"@" +str[4]
+ "@"+ str[13];
output.collect(new Text(fkey), new Text(fvalue));
}
}
public static class ICAS_R extends TableReduce<Text, Text> {
public void reduce(Text key, Iterator<Text> values,
OutputCollector<ImmutableBytesWritable, BatchUpdate> output,
Reporter reporter) throws IOException {
HTable table = new HTable("ICAS");
String source_ip;
String date;
String sig_class;
String type;
String signature;
String rawstr = new String(values.next().getBytes());
String[] str = rawstr.split("@");
source_ip = str[0];
date = str[1];
signature = str[2];
sig_class = str[3];
type = str[4];
// values.next().getByte() can get value and transfer to byte form,
while (values.hasNext()) {
// source_ip + "@" + date + "@" + sig + "@" + class + "@" + type;
rawstr = new String(values.next().getBytes());
str = rawstr.split("@");
source_ip = source_ip + " ; " + str[0];
date = date + " ; " + str[1];
signature = signature + ";" + str[2];
}
reporter.setStatus("amp emitting cell for row'" + key.toString()
+ "'");
BatchUpdate map = new BatchUpdate(key.toString());
// map.setTimestamp(timestamp);
map.put("infor:source_ip", Bytes.toBytes(source_ip));
map.put("infor:signature", Bytes.toBytes(signature));
map.put("infor:date", Bytes.toBytes(date));
map.put("infor:class", Bytes.toBytes(sig_class));
map.put("infor:type", Bytes.toBytes(type));
table.commit(map);
// ImmutableBytesWritable Hkey = new ImmutableBytesWritable(Bytes
// .toBytes(key.toString()));
// output.collect(Hkey, map);
}
}
public static class ICAS_T extends MapReduceBase implements
Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
StringBuilder ret = new StringBuilder("\n");
while (values.hasNext()) {
String v = values.next().toString().trim();
if (v.length() > 0)
ret.append(v + "\n");
}
output.collect((Text) key, new Text(ret.toString()));
}
}
/**
* A reducer class that just emits the sum of the input values.
*/
public boolean checkTableExist(String table_name) throws IOException {
if (!hbase_admin.tableExists(table_name)) {
return false;
}
return true;
}
// create Hbase table , success = 1 ; failse = 0; Table_exist = 2;
public boolean createTable(String table_name, String[] column_family)
throws IOException {
// check whether Table name exite or not
if (!checkTableExist(table_name)) {
HTableDescriptor tableDesc = new HTableDescriptor(table_name);
for (int i = 0; i < column_family.length; i++) {
String cf = column_family[i];
// check and correct name format "string:"
if (!cf.endsWith(":")) {
column_family[i] = cf + ":";
}
// add column family
tableDesc.addFamily(new HColumnDescriptor(column_family[i]));
}
hbase_admin.createTable(tableDesc);
return true;
} else {
return false;
}
}
static boolean hdfsput(String source_file, String text_tmp) {
try {
Process p = Runtime.getRuntime().exec(
"hadoop dfs -put " + source_file + " " + text_tmp);
BufferedReader in = new BufferedReader(new InputStreamReader(p
.getErrorStream()));
String err = null;
while ((err = in.readLine()) != null) {
System.err.println(err);
}
System.err.println("finish");
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
int printUsage() {
System.out
.println("ICAS <sourceFile> <tempFile> <tableName> <mapNumber> <reduceNumber> ");
ToolRunner.printGenericCommandUsage(System.out);
return -1;
}
public int run(String[] args) throws Exception {
Path text_path = new Path(args[1]); // text path
Path output = new Path("/tmp/output"); // /tmp/output
/* set conf */
JobConf conf = new JobConf(getConf(), ICAS.class);
conf.setJobName("SnortProduce");
// key point
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
/* set mapper and reducer */
conf.setMapperClass(ICAS_M.class);
conf.setReducerClass(ICAS_R.class);
// conf.setMapperClass(IdentityMapper.class);
// conf.setReducerClass(IdentityReducer.class);
/* delete previous output dir */
if (FileSystem.get(conf).exists(output))
FileSystem.get(conf).delete(output, true);
conf.setNumMapTasks(Integer.parseInt(args[3]));
conf.setNumReduceTasks(Integer.parseInt(args[4]));
/* set input path */
FileInputFormat.setInputPaths(conf, text_path);
FileOutputFormat.setOutputPath(conf, output);
/* run */
JobClient.runJob(conf);
return 0;
}
public static void main(String[] args) throws Exception {
/* Major parameter */
// it indicates local path, not hadoop file system path
String source_file = "/home/waue/log";
// data source tmp
String text_tmp = "/user/waue/parsed";
/* Minor parameter */
String table_name = "ICAS";
String[] argv = { source_file, text_tmp, table_name, "1", "1" };
args = argv;
/* build table */
String[] col_family = { "infor:" };
BuildHTable build_table = new BuildHTable();
if (!build_table.checkTableExist(args[2])) {
if (!build_table.createTable(args[2], col_family)) {
System.err.println("create table error !");
}
} else {
System.err.println("Table \"" + args[2]
+ "\" has already existed !");
}
// if (!hdfsput(args[0], args[1])) {
// System.out.println("directory is existed.");
// }
int res = ToolRunner.run(new Configuration(), new ICAS(), args);
System.exit(res);
}
}
測試資料:
1;2404;5;attack_method_1;Attempted_Privilege;1;08;06;16;09;46;1.2.3.4:3394;140.110.138.22:445;TCP;
1;2404;5;attack_method_1;Attempted_Privilege;1;08;06;16;09;48;5.6.7.8:3394;140.110.138.22:445;TCP;
1;2404;5;attack_method_2;Attempted_Privilege;1;08;06;16;09;48;5.6.7.8:3394;140.110.138.22:445;TCP;
1;2404;5;attack_method_3;Attempted_Privilege;1;08;06;16;09;48;5.6.7.8:3394;140.110.138.22:445;TCP;
1;2404;5;attack_method_4;Attempted_Privilege;1;08;06;16;09;48;5.6.7.8:3394;140.110.138.22:445;TCP;
1;2404;5;attack_method_1;Attempted_Privilege;1;08;06;16;09;48;5.6.7.8:3394;140.110.141.33:445;TCP;
結果:
hbase(main):005:0> scan 'ICAS'
ROW COLUMN+CELL
140.110.138.22 column=infor:class, timestamp=1248863177234, value=Attempted_Privilege
140.110.138.22 column=infor:date, timestamp=1248863177234, value=1/08/06_16:09:46 ; 1/08/06_16:0
9:48 ; 1/08/06_16:09:48 ; 1/08/06_16:09:48 ; 1/08/06_16:09:48
140.110.138.22 column=infor:signature, timestamp=1248863177234, value=attack_method_1;attack_met
hod_1;attack_method_2;attack_method_3;attack_method_4
140.110.138.22 column=infor:source_ip, timestamp=1248863177234, value=1.2.3.4 ; 5.6.7.8 ; 5.6.7.
8 ; 5.6.7.8 ; 5.6.7.8
140.110.138.22 column=infor:type, timestamp=1248863177234, value=TCP
140.110.141.33 column=infor:class, timestamp=1248863177268, value=Attempted_Privilege
140.110.141.33 column=infor:date, timestamp=1248863177268, value=1/08/06_16:09:48
140.110.141.33 column=infor:signature, timestamp=1248863177268, value=attack_method_1
140.110.141.33 column=infor:source_ip, timestamp=1248863177268, value=5.6.7.8
140.110.141.33 column=infor:type, timestamp=1248863177268, value=TCP
10 row(s) in 0.1550 seconds
- 問題1: 如何讓run裡宣告的物件,map可使用,reduce也可以呼叫到
public class ICAS extends Configured implements Tool {
HBaseConfiguration hbase_conf;
HBaseAdmin hbase_admin;
public ICAS() throws IOException {
hbase_conf = new HBaseConfiguration();
hbase_admin = new HBaseAdmin(hbase_conf);
}
public static class ICAS_M extends MapReduceBase implements
Mapper<LongWritable, Text, Text, Text> {
//無法用hbase_admin, hbase_conf
}
public static class ICAS_R extends TableReduce<Text, Text> {
//無法用hbase_admin, hbase_conf
}
public static void main(String[] args) throws Exception {
//可以用hbase_admin, hbase_conf
}
- 問題二、如何不覆蓋原本hbase內的資料,而是累加進去
- 問題四、如何讓<key,value>後的value 在進行一次<key,value>
key= dest ip | value= infor: sip
|
host M | host1; host2; host1
|
key= dest ip | value= infor: sip
|
host M | host1; host2
|