wiki:waue/2009/0729

icas 0.1版完成

/**
 * Program SnortProduce.java
 * Editor: Waue Chen 
 * From :  GTD. NCHC. Taiwn
 * Last Update Date: 07/29/2009
 * support version : hadoop 0.18.3, hbase 0.18.1
 */

package tw.org.nchc.code;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapred.TableReduce;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ICAS extends Configured implements Tool {

  HBaseConfiguration hbase_conf;
  HBaseAdmin hbase_admin;

  public ICAS() throws IOException {
    hbase_conf = new HBaseConfiguration();
    hbase_admin = new HBaseAdmin(hbase_conf);

  }

  public static class ICAS_M extends MapReduceBase implements
      Mapper<LongWritable, Text, Text, Text> {

    String getip(String str) {
      String[] ret = str.split(":");
      return ret[0];
    }

    public void map(LongWritable key, Text value,
        OutputCollector<Text, Text> output, Reporter reporter)
        throws IOException {
      // [3] sig [4]class [5]y [6]m [7]d [8]h [9]M [10]s [11]s [12]d [13]t

      String line = value.toString();
      String[] str = line.split(";");
      String source_ip = getip(str[11]);
      String dest_ip = getip(str[12]);
      String fkey = dest_ip ;

      String date = str[5] + "/" + str[6] + "/" + str[7] + "_" + str[8]
          + ":" + str[9] + ":" + str[10];
      // source @ date @ sig @ class @ type
      String fvalue = source_ip + "@" + date + "@" + str[3] +"@" +str[4] 
            + "@"+ str[13];
      output.collect(new Text(fkey), new Text(fvalue));
    }
  }

  public static class ICAS_R extends TableReduce<Text, Text> {

    public void reduce(Text key, Iterator<Text> values,
        OutputCollector<ImmutableBytesWritable, BatchUpdate> output,
        Reporter reporter) throws IOException {
      HTable table = new HTable("ICAS");
      String source_ip;
      String date;
      String sig_class;
      String type;
      String signature;
      
      String rawstr = new String(values.next().getBytes());
      String[] str = rawstr.split("@");
      source_ip = str[0];
      date = str[1];
      signature = str[2];
      sig_class = str[3];
      type = str[4];
      // values.next().getByte() can get value and transfer to byte form,
      while (values.hasNext()) {
        // source_ip + "@" + date + "@" + sig + "@" + class + "@" + type;
        rawstr = new String(values.next().getBytes());
        str = rawstr.split("@");
        source_ip = source_ip + " ; " + str[0];
        date = date + " ; " + str[1];
        signature = signature + ";" + str[2];
      }
      reporter.setStatus("amp emitting cell for row'" + key.toString()
          + "'");
      BatchUpdate map = new BatchUpdate(key.toString());
      // map.setTimestamp(timestamp);
      map.put("infor:source_ip", Bytes.toBytes(source_ip));
      map.put("infor:signature", Bytes.toBytes(signature));
      map.put("infor:date", Bytes.toBytes(date));
      map.put("infor:class", Bytes.toBytes(sig_class));
      map.put("infor:type", Bytes.toBytes(type));
      table.commit(map);
      // ImmutableBytesWritable Hkey = new ImmutableBytesWritable(Bytes
      // .toBytes(key.toString()));
      // output.collect(Hkey, map);

    }
  }

  public static class ICAS_T extends MapReduceBase implements
      Reducer<Text, Text, Text, Text> {

    public void reduce(Text key, Iterator<Text> values,
        OutputCollector<Text, Text> output, Reporter reporter)
        throws IOException {

      StringBuilder ret = new StringBuilder("\n");
      while (values.hasNext()) {
        String v = values.next().toString().trim();
        if (v.length() > 0)
          ret.append(v + "\n");
      }
      output.collect((Text) key, new Text(ret.toString()));
    }
  }

  /**
   * A reducer class that just emits the sum of the input values.
   */

  public boolean checkTableExist(String table_name) throws IOException {
    if (!hbase_admin.tableExists(table_name)) {
      return false;
    }
    return true;
  }

  // create Hbase table , success = 1 ; failse = 0; Table_exist = 2;
  public boolean createTable(String table_name, String[] column_family)
      throws IOException {
    // check whether Table name exite or not
    if (!checkTableExist(table_name)) {
      HTableDescriptor tableDesc = new HTableDescriptor(table_name);
      for (int i = 0; i < column_family.length; i++) {
        String cf = column_family[i];
        // check and correct name format "string:"
        if (!cf.endsWith(":")) {
          column_family[i] = cf + ":";
        }
        // add column family
        tableDesc.addFamily(new HColumnDescriptor(column_family[i]));
      }
      hbase_admin.createTable(tableDesc);
      return true;
    } else {
      return false;
    }
  }

  static boolean hdfsput(String source_file, String text_tmp) {
    try {
      Process p = Runtime.getRuntime().exec(
          "hadoop dfs -put " + source_file + " " + text_tmp);
      BufferedReader in = new BufferedReader(new InputStreamReader(p
          .getErrorStream()));
      String err = null;
      while ((err = in.readLine()) != null) {
        System.err.println(err);
      }
      System.err.println("finish");
      return true;
    } catch (Exception e) {
      e.printStackTrace();
      return false;
    }
  }

  int printUsage() {
    System.out
        .println("ICAS <sourceFile> <tempFile> <tableName> <mapNumber> <reduceNumber> ");
    ToolRunner.printGenericCommandUsage(System.out);
    return -1;
  }

  public int run(String[] args) throws Exception {

    Path text_path = new Path(args[1]); // text path
    Path output = new Path("/tmp/output"); // /tmp/output

    /* set conf */
    JobConf conf = new JobConf(getConf(), ICAS.class);
    conf.setJobName("SnortProduce");

    // key point
    conf.setOutputKeyClass(Text.class);
    conf.setOutputValueClass(Text.class);

    /* set mapper and reducer */
    conf.setMapperClass(ICAS_M.class);
    conf.setReducerClass(ICAS_R.class);
    // conf.setMapperClass(IdentityMapper.class);
    // conf.setReducerClass(IdentityReducer.class);

    /* delete previous output dir */
    if (FileSystem.get(conf).exists(output))
      FileSystem.get(conf).delete(output, true);
    conf.setNumMapTasks(Integer.parseInt(args[3]));
    conf.setNumReduceTasks(Integer.parseInt(args[4]));

    /* set input path */
    FileInputFormat.setInputPaths(conf, text_path);
    FileOutputFormat.setOutputPath(conf, output);

    /* run */
    JobClient.runJob(conf);

    return 0;
  }

  public static void main(String[] args) throws Exception {
    /* Major parameter */
    // it indicates local path, not hadoop file system path
    String source_file = "/home/waue/log";
    // data source tmp
    String text_tmp = "/user/waue/parsed";
    /* Minor parameter */
    String table_name = "ICAS";

    String[] argv = { source_file, text_tmp, table_name, "1", "1" };
    args = argv;

    /* build table */
    String[] col_family = { "infor:" };
    BuildHTable build_table = new BuildHTable();
    if (!build_table.checkTableExist(args[2])) {
      if (!build_table.createTable(args[2], col_family)) {
        System.err.println("create table error !");
      }
    } else {
      System.err.println("Table \"" + args[2]
          + "\" has already existed !");
    }

    // if (!hdfsput(args[0], args[1])) {
    // System.out.println("directory is existed.");
    // }

    int res = ToolRunner.run(new Configuration(), new ICAS(), args);
    System.exit(res);
  }
}

測試資料:

1;2404;5;attack_method_1;Attempted_Privilege;1;08;06;16;09;46;1.2.3.4:3394;140.110.138.22:445;TCP;
1;2404;5;attack_method_1;Attempted_Privilege;1;08;06;16;09;48;5.6.7.8:3394;140.110.138.22:445;TCP;
1;2404;5;attack_method_2;Attempted_Privilege;1;08;06;16;09;48;5.6.7.8:3394;140.110.138.22:445;TCP;
1;2404;5;attack_method_3;Attempted_Privilege;1;08;06;16;09;48;5.6.7.8:3394;140.110.138.22:445;TCP;
1;2404;5;attack_method_4;Attempted_Privilege;1;08;06;16;09;48;5.6.7.8:3394;140.110.138.22:445;TCP;
1;2404;5;attack_method_1;Attempted_Privilege;1;08;06;16;09;48;5.6.7.8:3394;140.110.141.33:445;TCP;

結果:

hbase(main):005:0> scan 'ICAS'
ROW                          COLUMN+CELL                                                                      
 140.110.138.22              column=infor:class, timestamp=1248863177234, value=Attempted_Privilege           
 140.110.138.22              column=infor:date, timestamp=1248863177234, value=1/08/06_16:09:46 ; 1/08/06_16:0
                             9:48 ; 1/08/06_16:09:48 ; 1/08/06_16:09:48 ; 1/08/06_16:09:48                    
 140.110.138.22              column=infor:signature, timestamp=1248863177234, value=attack_method_1;attack_met
                             hod_1;attack_method_2;attack_method_3;attack_method_4                            
 140.110.138.22              column=infor:source_ip, timestamp=1248863177234, value=1.2.3.4 ; 5.6.7.8 ; 5.6.7.
                             8 ; 5.6.7.8 ; 5.6.7.8                                                            
 140.110.138.22              column=infor:type, timestamp=1248863177234, value=TCP                            
 140.110.141.33              column=infor:class, timestamp=1248863177268, value=Attempted_Privilege           
 140.110.141.33              column=infor:date, timestamp=1248863177268, value=1/08/06_16:09:48               
 140.110.141.33              column=infor:signature, timestamp=1248863177268, value=attack_method_1           
 140.110.141.33              column=infor:source_ip, timestamp=1248863177268, value=5.6.7.8                   
 140.110.141.33              column=infor:type, timestamp=1248863177268, value=TCP                            
10 row(s) in 0.1550 seconds
  • 問題1: 如何讓run裡宣告的物件,map可使用,reduce也可以呼叫到
public class ICAS extends Configured implements Tool {

  HBaseConfiguration hbase_conf;
  HBaseAdmin hbase_admin;

  public ICAS() throws IOException {
    hbase_conf = new HBaseConfiguration();
    hbase_admin = new HBaseAdmin(hbase_conf);

  }

  public static class ICAS_M extends MapReduceBase implements
      Mapper<LongWritable, Text, Text, Text> {
        //無法用hbase_admin, hbase_conf
  }

  public static class ICAS_R extends TableReduce<Text, Text> {
        //無法用hbase_admin, hbase_conf
  }

  public static void main(String[] args) throws Exception {
        //可以用hbase_admin, hbase_conf
  }
  • 問題二、如何不覆蓋原本hbase內的資料,而是累加進去
    • 要先從原本的資料庫把資料撈出來,再整合後放進去
  • 問題三、如何讓reduce 顯示進度
  • 問題四、如何讓<key,value>後的value 在進行一次<key,value>
    • 現在:
key= dest ip value= infor: sip
host M host1; host2; host1
  • 期望值:(也就是多出來的host1要被濾掉)
key= dest ip value= infor: sip
host M host1; host2
Last modified 15 years ago Last modified on Jul 29, 2009, 6:28:19 PM