source: sample/hadoop-0.16/tw/org/nchc/code/SnortBase.java @ 45

Last change on this file since 45 was 45, checked in by waue, 16 years ago

SnortBase? is needed to debug.

File size: 8.7 KB
Line 
1/**
2 * Program: LogParserGo.java
3 * Editor: Waue Chen
4 * From :  NCHC. Taiwn
5 * Last Update Date: 07/02/2008
6 */
7/**
8 * Purpose :
9 *  This program will parse your apache log and store it into Hbase.
10 *
11 * HowToUse :
12 *  Make sure two thing :
13 *  1. Upload apache logs ( /var/log/apache2/access.log* ) to \
14 *    hdfs (default: /user/waue/apache-log) \
15 *   $ bin/hadoop dfs -put /var/log/apache2/ apache-log
16 *  2. parameter "dir" in main contains the logs.
17 *  3. you should filter the exception contents manually, \
18 *    ex:  ::1 - - [29/Jun/2008:07:35:15 +0800] "GET / HTTP/1.0" 200 729 "...
19 * 
20 * Check Result:
21 *  Go to hbase console, type :
22 *    hql > select * from apache-log;
23
24 +-------------------------+-------------------------+-------------------------+
25 | Row                     | Column                  | Cell                    |
26 +-------------------------+-------------------------+-------------------------+
27 | 118.170.101.250         | http:agent              | Mozilla/4.0 (compatible;|
28 |                         |                         |  MSIE 4.01; Windows 95) |
29 +-------------------------+-------------------------+-------------------------+
30 | 118.170.101.250         | http:bytesize           | 318                     |
31 +-------------------------+-------------------------+-------------------------+
32 ..........(skip)........
33 +-------------------------+-------------------------+-------------------------+
34 | 87.65.93.58             | http:method             | OPTIONS                 |
35 +-------------------------+-------------------------+-------------------------+
36 | 87.65.93.58             | http:protocol           | HTTP/1.1                |
37 +-------------------------+-------------------------+-------------------------+
38 | 87.65.93.58             | referrer:-              | *                       |
39 +-------------------------+-------------------------+-------------------------+
40 | 87.65.93.58             | url:*                   | -                       |
41 +-------------------------+-------------------------+-------------------------+
42 31 row(s) in set. (0.58 sec)
43
44
45
46 */
47package tw.org.nchc.code;
48
49import java.io.File;
50import java.io.FileWriter;
51import java.io.IOException;
52import java.text.SimpleDateFormat;
53import java.util.Locale;
54
55import org.apache.hadoop.fs.FileStatus;
56import org.apache.hadoop.fs.FileSystem;
57import org.apache.hadoop.fs.Path;
58import org.apache.hadoop.hbase.HBaseAdmin;
59import org.apache.hadoop.hbase.HBaseConfiguration;
60import org.apache.hadoop.hbase.HColumnDescriptor;
61import org.apache.hadoop.hbase.HTable;
62import org.apache.hadoop.hbase.HTableDescriptor;
63import org.apache.hadoop.io.Text;
64import org.apache.hadoop.io.Writable;
65import org.apache.hadoop.io.WritableComparable;
66import org.apache.hadoop.mapred.ClusterStatus;
67import org.apache.hadoop.mapred.JobClient;
68import org.apache.hadoop.mapred.JobConf;
69import org.apache.hadoop.mapred.MapReduceBase;
70import org.apache.hadoop.mapred.Mapper;
71import org.apache.hadoop.mapred.OutputCollector;
72import org.apache.hadoop.mapred.Reporter;
73
74class Log {
75
76  public Log(String data) throws Exception {
77
78    String[] arr = data.split(";");
79
80    this.gid = arr[0];
81    this.sid = arr[1];
82    this.version = arr[2];
83    this.alert_name = arr[3];
84    this.class_type = arr[4];
85    this.priority = arr[5];
86    // this.timestamp = "2008" + arr[6] + arr[7] + arr[8] + arr[9] +
87    // arr[10];
88    this.timestamp = getTime(arr[7] + "/" + arr[6] + "/2008:" + arr[8]
89        + ":" + arr[9] + ":" + arr[10]);
90    this.source = arr[11];
91    this.destination = arr[12];
92    this.type = arr[13];
93    this.ttl = arr[14];
94    this.tos = arr[15];
95    this.id = arr[16];
96    this.iplen = arr[17];
97    this.dgmlen = arr[18];
98
99  }
100
101  long timestamp;
102
103  String gid, sid, version;
104
105  String alert_name, class_type, priority;
106
107  String source, destination, type, ttl, tos, id, iplen, dgmlen;
108
109  long getTime(String str) throws Exception {
110    SimpleDateFormat sdf = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss",
111        Locale.TAIWAN);
112    Long timestamp = sdf.parse(str).getTime();
113    return timestamp;
114  }
115}
116
117// import AccessLogParser
118public class SnortBase {
119  static HBaseConfiguration conf = new HBaseConfiguration();
120
121  public static final String TABLE = "table.name";
122
123  static String tableName = "mySnort";
124
125  static HTable table = null;
126
127
128  public static class MapClass extends MapReduceBase implements
129      Mapper<WritableComparable, Text, Text, Writable> {
130
131    @Override
132    // MapReduceBase.configure(JobConf job)
133    // Default implementation that does nothing.
134    public void configure(JobConf job) {
135      // String get(String name,String defaultValue)
136      // Get the value of the name property. If no such property exists,\
137      // then defaultValue is returned.
138    }
139
140    public void map(WritableComparable key, Text value,
141        OutputCollector<Text, Writable> output, Reporter reporter)
142        throws IOException {
143
144      try {
145
146        Log log = new Log(value.toString());
147       
148        // 查看value的值
149        FileWriter out = new FileWriter(new File(
150            "/home/waue/Desktop/snort-result.txt"));
151        out.write(value.toString() + "_time=" + log.timestamp + "\n");
152        out.flush();
153        out.close();
154
155        if (table == null)
156          table = new HTable(conf, new Text(tableName));
157
158        long lockId = table.startUpdate(new Text(log.destination));
159        table.put(lockId, new Text("id:gid"), log.gid.getBytes());
160        table.put(lockId, new Text("id:sid"), log.sid.getBytes());
161        table.put(lockId, new Text("id:version"), log.version
162            .getBytes());
163        table.put(lockId, new Text("name:name"), log.alert_name
164            .getBytes());
165        table.put(lockId, new Text("name:class"), log.class_type
166            .getBytes());
167        table.put(lockId, new Text("index:priority"), log.priority
168            .getBytes());
169        table.put(lockId, new Text("index:soure"), log.source
170            .getBytes());
171        table
172            .put(lockId, new Text("payload:type"), log.type
173                .getBytes());
174        table.put(lockId, new Text("payload:ttl"), log.ttl.getBytes());
175        table.put(lockId, new Text("payload:tos"), log.tos.getBytes());
176        table.put(lockId, new Text("payload:id"), log.id.getBytes());
177        table.put(lockId, new Text("payload:iplen"), log.iplen
178            .getBytes());
179        table.put(lockId, new Text("payload:dgmlen"), log.dgmlen
180            .getBytes());
181        table.commit(lockId, log.timestamp);
182
183      } catch (Exception e) {
184        e.printStackTrace();
185      }
186
187    }
188  }
189
190  // do it to resolve warning : FileSystem.listPaths
191  static public Path[] listPaths(FileSystem fsm, Path path)
192      throws IOException {
193    FileStatus[] fss = fsm.listStatus(path);
194    int length = fss.length;
195    Path[] pi = new Path[length];
196    for (int i = 0; i < length; i++) {
197      pi[i] = fss[i].getPath();
198    }
199    return pi;
200  }
201
202  public static void runMapReduce(String table, String inpath)
203      throws IOException {
204    Path tempDir = new Path("/tmp/Mylog/");
205    Path InputPath = new Path(inpath);
206    FileSystem fs = FileSystem.get(conf);
207    JobConf jobConf = new JobConf(conf, SnortBase.class);
208    jobConf.setJobName("Snort Parse");
209    jobConf.set(TABLE, table);
210    // 先省略 自動搜尋目錄的功能
211    /*
212     * Path[] in = listPaths(fs, InputDir); if (fs.isFile(InputDir)) {
213     * jobConf.setInputPath(InputDir); } else { for (int i = 0; i <
214     * in.length; i++) { if (fs.isFile(in[i])) {
215     * jobConf.addInputPath(in[i]); } else { Path[] sub = listPaths(fs,
216     * in[i]); for (int j = 0; j < sub.length; j++) { if (fs.isFile(sub[j])) {
217     * jobConf.addInputPath(sub[j]); } } } } }
218     */
219    jobConf.setInputPath(InputPath);
220    jobConf.setOutputPath(tempDir);
221    jobConf.setMapperClass(MapClass.class);
222    JobClient client = new JobClient(jobConf);
223    ClusterStatus cluster = client.getClusterStatus();
224    jobConf.setNumMapTasks(cluster.getMapTasks());
225    jobConf.setNumReduceTasks(0);
226    JobClient.runJob(jobConf);
227    fs.delete(tempDir);
228    fs.close();
229  }
230
231  public static void creatTable(String table) throws IOException {
232    HBaseAdmin admin = new HBaseAdmin(conf);
233    if (!admin.tableExists(new Text(table))) {
234      System.out.println("1. " + table
235          + " table creating ... please wait");
236      HTableDescriptor tableDesc = new HTableDescriptor(table);
237      tableDesc.addFamily(new HColumnDescriptor("id:"));
238      tableDesc.addFamily(new HColumnDescriptor("name:"));
239      tableDesc.addFamily(new HColumnDescriptor("index:"));
240      tableDesc.addFamily(new HColumnDescriptor("payload:"));
241      tableDesc.addFamily(new HColumnDescriptor("priority:"));
242      admin.createTable(tableDesc);
243    } else {
244      System.out.println("1. " + table + " table already exists.");
245    }
246    System.out.println("2. access_log files fetching using map/reduce");
247  }
248
249  public static void main(String[] args) throws IOException, Exception {
250    String table_name = "snort";
251    String path = "/user/waue/alert_meta";
252
253    // 先省略掉 parse完後自動上傳部份
254    /*
255     * SnortParser sp = new
256     * SnortParser("/tmp/alert","/tmp/alert_SnortBase"); sp.parseToLine();
257     */
258    creatTable(table_name);
259
260    runMapReduce(table_name, path);
261
262  }
263
264}
Note: See TracBrowser for help on using the repository browser.