source: sample/hadoop-0.16/tw/org/nchc/code/SnortBase.java @ 47

Last change on this file since 47 was 47, checked in by waue, 16 years ago

SnortBase?.java -> tuning some field
SnortParser?.java -> debug and fix

File size: 9.1 KB
Line 
1/**
2 * Program: LogParserGo.java
3 * Editor: Waue Chen
4 * From :  NCHC. Taiwn
5 * Last Update Date: 07/02/2008
6 */
7/**
8 * Purpose :
9 *  This program will parse your apache log and store it into Hbase.
10 *
11 * HowToUse :
12 *  Make sure two thing :
13 *  1. Upload apache logs ( /var/log/apache2/access.log* ) to \
14 *    hdfs (default: /user/waue/apache-log) \
15 *   $ bin/hadoop dfs -put /var/log/apache2/ apache-log
16 *  2. parameter "dir" in main contains the logs.
17 *  3. you should filter the exception contents manually, \
18 *    ex:  ::1 - - [29/Jun/2008:07:35:15 +0800] "GET / HTTP/1.0" 200 729 "...
19 * 
20 * Check Result:
21 *  Go to hbase console, type :
22 *    hql > select * from apache-log;
23
24 +-------------------------+-------------------------+-------------------------+
25 | Row                     | Column                  | Cell                    |
26 +-------------------------+-------------------------+-------------------------+
27 | 118.170.101.250         | http:agent              | Mozilla/4.0 (compatible;|
28 |                         |                         |  MSIE 4.01; Windows 95) |
29 +-------------------------+-------------------------+-------------------------+
30 | 118.170.101.250         | http:bytesize           | 318                     |
31 +-------------------------+-------------------------+-------------------------+
32 ..........(skip)........
33 +-------------------------+-------------------------+-------------------------+
34 | 87.65.93.58             | http:method             | OPTIONS                 |
35 +-------------------------+-------------------------+-------------------------+
36 | 87.65.93.58             | http:protocol           | HTTP/1.1                |
37 +-------------------------+-------------------------+-------------------------+
38 | 87.65.93.58             | referrer:-              | *                       |
39 +-------------------------+-------------------------+-------------------------+
40 | 87.65.93.58             | url:*                   | -                       |
41 +-------------------------+-------------------------+-------------------------+
42 31 row(s) in set. (0.58 sec)
43
44
45
46 */
47package tw.org.nchc.code;
48
49import java.io.IOException;
50import java.text.ParsePosition;
51import java.text.SimpleDateFormat;
52import java.util.Date;
53import java.util.Locale;
54
55import org.apache.hadoop.fs.FileStatus;
56import org.apache.hadoop.fs.FileSystem;
57import org.apache.hadoop.fs.Path;
58import org.apache.hadoop.hbase.HBaseAdmin;
59import org.apache.hadoop.hbase.HBaseConfiguration;
60import org.apache.hadoop.hbase.HColumnDescriptor;
61import org.apache.hadoop.hbase.HTable;
62import org.apache.hadoop.hbase.HTableDescriptor;
63import org.apache.hadoop.io.Text;
64import org.apache.hadoop.io.Writable;
65import org.apache.hadoop.io.WritableComparable;
66import org.apache.hadoop.mapred.ClusterStatus;
67import org.apache.hadoop.mapred.JobClient;
68import org.apache.hadoop.mapred.JobConf;
69import org.apache.hadoop.mapred.MapReduceBase;
70import org.apache.hadoop.mapred.Mapper;
71import org.apache.hadoop.mapred.OutputCollector;
72import org.apache.hadoop.mapred.Reporter;
73
74class Log {
75  String gid, sid, version;
76
77  String alert_name, class_type, priority;
78
79  String source, destination, type;
80
81  // String ttl, tos, id, iplen, dgmlen;
82 
83  String srcport, dstport,tmp;
84  public Log(String data) {
85
86    String[] arr = data.split(";");
87    this.gid = arr[0];
88    this.sid = arr[1];
89    this.version = arr[2];
90    this.alert_name = arr[3];
91    this.class_type = arr[4];
92    this.priority = arr[5];
93    this.timestamp = getTime(arr[7] + "/" + arr[6] + ":" + arr[8] + ":"
94        + arr[9] + ":" + arr[10]);
95    this.source = getIP(arr[11]);
96    this.srcport = this.tmp;
97    this.destination = getIP(arr[12]);
98    this.dstport = this.tmp;
99    this.type = arr[13];
100    // this.ttl = arr[14];
101    // this.tos = arr[15];
102    // this.id = arr[16];
103    // this.iplen = arr[17];
104    // this.dgmlen = arr[18];
105   
106  }
107  long timestamp;
108
109
110  String getIP(String str){
111    String res;
112    int n = str.indexOf(":");
113    if (n == -1) {
114      res = str;
115      this.tmp = "0";
116    } else {
117      String[] vec = str.split(":");
118      res = vec[0];
119      this.tmp = vec[1];
120    }
121    return res;
122  }
123
124  long getTime(String str) {
125    SimpleDateFormat sdf = new SimpleDateFormat("dd/MM:HH:mm:ss",
126        Locale.TAIWAN);
127    Long timestamp = sdf.parse(str, new ParsePosition(0)).getTime();
128    return timestamp;
129  }
130}
131
132// import AccessLogParser
133public class SnortBase {
134  static HBaseConfiguration conf = new HBaseConfiguration();
135
136  public static final String TABLE = "table.name";
137
138  static String tableName = "flex";
139
140  static HTable table = null;
141
142  public static class MapClass extends MapReduceBase implements
143      Mapper<WritableComparable, Text, Text, Writable> {
144
145    @Override
146    // MapReduceBase.configure(JobConf job)
147    // Default implementation that does nothing.
148    public void configure(JobConf job) {
149      // String get(String name,String defaultValue)
150      // Get the value of the name property. If no such property exists,\
151      // then defaultValue is returned.
152    }
153
154    public void map(WritableComparable key, Text value,
155        OutputCollector<Text, Writable> output, Reporter reporter)
156        throws IOException {
157
158      // try {
159
160      Log log = new Log(value.toString());
161
162      // 查看value的值
163      // FileWriter out = new FileWriter(new File(
164      // "/home/waue/Desktop/snort-result.txt"));
165      // out.write(value.toString() + "_time=" + log.timestamp + "\n");
166      // out.flush();
167      // out.close();
168
169      if (table == null)
170        table = new HTable(conf, new Text(tableName));
171
172      long lockId = table.startUpdate(new Text(log.destination));
173      table.put(lockId, new Text("id:gid"), log.gid.getBytes());
174      table.put(lockId, new Text("id:sid"), log.sid.getBytes());
175      table.put(lockId, new Text("id:version"), log.version.getBytes());
176      table.put(lockId, new Text("name:name"), log.alert_name.getBytes());
177      table
178          .put(lockId, new Text("name:class"), log.class_type
179              .getBytes());
180      table.put(lockId, new Text("id:priority"), log.priority
181          .getBytes());
182      table.put(lockId, new Text("direction:soure"), log.source.getBytes());
183      table.put(lockId, new Text("direction:srcport"), log.srcport.getBytes());
184      table.put(lockId, new Text("direction:dstport"), log.dstport.getBytes());
185      table.put(lockId, new Text("payload:type"), log.type.getBytes());
186      // table.put(lockId, new Text("payload:ttl"), log.ttl.getBytes());
187      // table.put(lockId, new Text("payload:tos"), log.tos.getBytes());
188      // table.put(lockId, new Text("payload:id"), log.id.getBytes());
189      // table.put(lockId, new Text("payload:iplen"), log.iplen
190      // .getBytes());
191      // table.put(lockId, new Text("payload:dgmlen"), log.dgmlen
192      // .getBytes());
193      table.commit(lockId, log.timestamp);
194
195      // } catch (Exception e) {
196      // e.printStackTrace();
197      // }
198
199    }
200  }
201
202  // do it to resolve warning : FileSystem.listPaths
203  static public Path[] listPaths(FileSystem fsm, Path path)
204      throws IOException {
205    FileStatus[] fss = fsm.listStatus(path);
206    int length = fss.length;
207    Path[] pi = new Path[length];
208    for (int i = 0; i < length; i++) {
209      pi[i] = fss[i].getPath();
210    }
211    return pi;
212  }
213
214  public static void runMapReduce(String tableName, String inpath)
215      throws IOException {
216    Path tempDir = new Path("/tmp/Mylog/");
217    Path InputPath = new Path(inpath);
218    FileSystem fs = FileSystem.get(conf);
219    JobConf jobConf = new JobConf(conf, SnortBase.class);
220    jobConf.setJobName("Snort Parse");
221    jobConf.set(TABLE, tableName);
222    // 先省略 自動搜尋目錄的功能
223    // Path InputDir = new Path(inpath);
224    // Path[] in = listPaths(fs, InputDir);
225    // if (fs.isFile(InputDir))
226    // {
227    // jobConf.setInputPath(InputDir);
228    // }
229    // else{
230    // for (int i = 0; i < in.length; i++){
231    // if (fs.isFile(in[i])){
232    // jobConf.addInputPath(in[i]);
233    // } else
234    // {
235    // Path[] sub = listPaths(fs, in[i]);
236    // for (int j = 0; j < sub.length; j++)
237    // {
238    // if (fs.isFile(sub[j]))
239    // {
240    // jobConf.addInputPath(sub[j]);
241    // } } } } }
242
243    jobConf.setInputPath(InputPath);
244    jobConf.setOutputPath(tempDir);
245    jobConf.setMapperClass(MapClass.class);
246    JobClient client = new JobClient(jobConf);
247    ClusterStatus cluster = client.getClusterStatus();
248    jobConf.setNumMapTasks(cluster.getMapTasks());
249    jobConf.setNumReduceTasks(0);
250    fs.delete(tempDir);
251    JobClient.runJob(jobConf);
252    fs.delete(tempDir);
253    fs.close();
254  }
255
256  public static void creatTable(String table) throws IOException {
257    HBaseAdmin admin = new HBaseAdmin(conf);
258    if (!admin.tableExists(new Text(table))) {
259      System.out.println("1. " + table
260          + " table creating ... please wait");
261      HTableDescriptor tableDesc = new HTableDescriptor(table);
262      tableDesc.addFamily(new HColumnDescriptor("id:"));
263      tableDesc.addFamily(new HColumnDescriptor("name:"));
264      tableDesc.addFamily(new HColumnDescriptor("direction:"));
265      tableDesc.addFamily(new HColumnDescriptor("payload:"));
266      admin.createTable(tableDesc);
267    } else {
268      System.out.println("1. " + table + " table already exists.");
269    }
270    System.out.println("2. access_log files fetching using map/reduce");
271  }
272
273  public static void main(String[] args) throws IOException, Exception {
274
275    String path = "/user/waue/snort-log/alert_flex_parsed.txt";
276
277    // 先省略掉 parse完後自動上傳部份
278    /*
279     * SnortParser sp = new
280     * SnortParser("/tmp/alert","/tmp/alert_SnortBase"); sp.parseToLine();
281     */
282    creatTable(tableName);
283
284    runMapReduce(tableName, path);
285
286  }
287
288}
Note: See TracBrowser for help on using the repository browser.