Changes between Version 3 and Version 4 of waue/2009/0729


Ignore:
Timestamp:
Jul 29, 2009, 6:18:50 PM (15 years ago)
Author:
waue
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • waue/2009/0729

    v3 v4  
     1 = icas 0.1版完成 =
     2
     3{{{
     4#!java
     5/**
     6 * Program SnortProduce.java
     7 * Editor: Waue Chen
     8 * From :  GTD. NCHC. Taiwn
     9 * Last Update Date: 07/29/2009
     10 * support version : hadoop 0.18.3, hbase 0.18.1
     11 *
     12
     13package tw.org.nchc.code;
     14
     15import java.io.BufferedReader;
     16import java.io.IOException;
     17import java.io.InputStreamReader;
     18import java.util.Iterator;
     19
     20import org.apache.hadoop.conf.Configuration;
     21import org.apache.hadoop.conf.Configured;
     22import org.apache.hadoop.fs.FileSystem;
     23import org.apache.hadoop.fs.Path;
     24import org.apache.hadoop.hbase.HBaseConfiguration;
     25import org.apache.hadoop.hbase.HColumnDescriptor;
     26import org.apache.hadoop.hbase.HTableDescriptor;
     27import org.apache.hadoop.hbase.client.HBaseAdmin;
     28import org.apache.hadoop.hbase.client.HTable;
     29import org.apache.hadoop.hbase.io.BatchUpdate;
     30import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
     31import org.apache.hadoop.hbase.mapred.TableReduce;
     32import org.apache.hadoop.hbase.util.Bytes;
     33import org.apache.hadoop.io.LongWritable;
     34import org.apache.hadoop.io.Text;
     35import org.apache.hadoop.mapred.FileInputFormat;
     36import org.apache.hadoop.mapred.FileOutputFormat;
     37import org.apache.hadoop.mapred.JobClient;
     38import org.apache.hadoop.mapred.JobConf;
     39import org.apache.hadoop.mapred.MapReduceBase;
     40import org.apache.hadoop.mapred.Mapper;
     41import org.apache.hadoop.mapred.OutputCollector;
     42import org.apache.hadoop.mapred.Reducer;
     43import org.apache.hadoop.mapred.Reporter;
     44import org.apache.hadoop.util.Tool;
     45import org.apache.hadoop.util.ToolRunner;
     46
     47public class ICAS extends Configured implements Tool {
     48
     49        HBaseConfiguration hbase_conf;
     50        HBaseAdmin hbase_admin;
     51
     52        public ICAS() throws IOException {
     53                hbase_conf = new HBaseConfiguration();
     54                hbase_admin = new HBaseAdmin(hbase_conf);
     55
     56        }
     57
     58        public static class ICAS_M extends MapReduceBase implements
     59                        Mapper<LongWritable, Text, Text, Text> {
     60
     61                String getip(String str) {
     62                        String[] ret = str.split(":");
     63                        return ret[0];
     64                }
     65
     66                public void map(LongWritable key, Text value,
     67                                OutputCollector<Text, Text> output, Reporter reporter)
     68                                throws IOException {
     69                        // [3] sig [4]class [5]y [6]m [7]d [8]h [9]M [10]s [11]s [12]d [13]t
     70
     71                        String line = value.toString();
     72                        String[] str = line.split(";");
     73                        String source_ip = getip(str[11]);
     74                        String dest_ip = getip(str[12]);
     75                        String fkey = dest_ip ;
     76
     77                        String date = str[5] + "/" + str[6] + "/" + str[7] + "_" + str[8]
     78                                        + ":" + str[9] + ":" + str[10];
     79                        // source @ date @ sig @ class @ type
     80                        String fvalue = source_ip + "@" + date + "@" + str[3] +"@" +str[4]
     81                              + "@"+ str[13];
     82                        output.collect(new Text(fkey), new Text(fvalue));
     83                }
     84        }
     85
     86        public static class ICAS_R extends TableReduce<Text, Text> {
     87
     88                public void reduce(Text key, Iterator<Text> values,
     89                                OutputCollector<ImmutableBytesWritable, BatchUpdate> output,
     90                                Reporter reporter) throws IOException {
     91                        HTable table = new HTable("ICAS");
     92                        String source_ip;
     93                        String date;
     94                        String sig_class;
     95                        String type;
     96                        String signature;
     97                       
     98                        String rawstr = new String(values.next().getBytes());
     99                        String[] str = rawstr.split("@");
     100                        source_ip = str[0];
     101                        date = str[1];
     102                        signature = str[2];
     103                        sig_class = str[3];
     104                        type = str[4];
     105                        // values.next().getByte() can get value and transfer to byte form,
     106                        while (values.hasNext()) {
     107                                // source_ip + "@" + date + "@" + sig + "@" + class + "@" + type;
     108                                rawstr = new String(values.next().getBytes());
     109                                str = rawstr.split("@");
     110                                source_ip = source_ip + " ; " + str[0];
     111                                date = date + " ; " + str[1];
     112                                signature = signature + ";" + str[2];
     113                        }
     114                        reporter.setStatus("amp emitting cell for row'" + key.toString()
     115                                        + "'");
     116                        BatchUpdate map = new BatchUpdate(key.toString());
     117                        // map.setTimestamp(timestamp);
     118                        map.put("infor:source_ip", Bytes.toBytes(source_ip));
     119                        map.put("infor:signature", Bytes.toBytes(signature));
     120                        map.put("infor:date", Bytes.toBytes(date));
     121                        map.put("infor:class", Bytes.toBytes(sig_class));
     122                        map.put("infor:type", Bytes.toBytes(type));
     123                        table.commit(map);
     124                        // ImmutableBytesWritable Hkey = new ImmutableBytesWritable(Bytes
     125                        // .toBytes(key.toString()));
     126                        // output.collect(Hkey, map);
     127
     128                }
     129        }
     130
     131        public static class ICAS_T extends MapReduceBase implements
     132                        Reducer<Text, Text, Text, Text> {
     133
     134                public void reduce(Text key, Iterator<Text> values,
     135                                OutputCollector<Text, Text> output, Reporter reporter)
     136                                throws IOException {
     137
     138                        StringBuilder ret = new StringBuilder("\n");
     139                        while (values.hasNext()) {
     140                                String v = values.next().toString().trim();
     141                                if (v.length() > 0)
     142                                        ret.append(v + "\n");
     143                        }
     144                        output.collect((Text) key, new Text(ret.toString()));
     145                }
     146        }
     147
     148        /**
     149         * A reducer class that just emits the sum of the input values.
     150         */
     151
     152        public boolean checkTableExist(String table_name) throws IOException {
     153                if (!hbase_admin.tableExists(table_name)) {
     154                        return false;
     155                }
     156                return true;
     157        }
     158
     159        // create Hbase table , success = 1 ; failse = 0; Table_exist = 2;
     160        public boolean createTable(String table_name, String[] column_family)
     161                        throws IOException {
     162                // check whether Table name exite or not
     163                if (!checkTableExist(table_name)) {
     164                        HTableDescriptor tableDesc = new HTableDescriptor(table_name);
     165                        for (int i = 0; i < column_family.length; i++) {
     166                                String cf = column_family[i];
     167                                // check and correct name format "string:"
     168                                if (!cf.endsWith(":")) {
     169                                        column_family[i] = cf + ":";
     170                                }
     171                                // add column family
     172                                tableDesc.addFamily(new HColumnDescriptor(column_family[i]));
     173                        }
     174                        hbase_admin.createTable(tableDesc);
     175                        return true;
     176                } else {
     177                        return false;
     178                }
     179        }
     180
     181        static boolean hdfsput(String source_file, String text_tmp) {
     182                try {
     183                        Process p = Runtime.getRuntime().exec(
     184                                        "hadoop dfs -put " + source_file + " " + text_tmp);
     185                        BufferedReader in = new BufferedReader(new InputStreamReader(p
     186                                        .getErrorStream()));
     187                        String err = null;
     188                        while ((err = in.readLine()) != null) {
     189                                System.err.println(err);
     190                        }
     191                        System.err.println("finish");
     192                        return true;
     193                } catch (Exception e) {
     194                        e.printStackTrace();
     195                        return false;
     196                }
     197        }
     198
     199        int printUsage() {
     200                System.out
     201                                .println("ICAS <sourceFile> <tempFile> <tableName> <mapNumber> <reduceNumber> ");
     202                ToolRunner.printGenericCommandUsage(System.out);
     203                return -1;
     204        }
     205
     206        public int run(String[] args) throws Exception {
     207
     208                Path text_path = new Path(args[1]); // text path
     209                Path output = new Path("/tmp/output"); // /tmp/output
     210
     211                /* set conf */
     212                JobConf conf = new JobConf(getConf(), ICAS.class);
     213                conf.setJobName("SnortProduce");
     214
     215                // key point
     216                conf.setOutputKeyClass(Text.class);
     217                conf.setOutputValueClass(Text.class);
     218
     219                /* set mapper and reducer */
     220                conf.setMapperClass(ICAS_M.class);
     221                conf.setReducerClass(ICAS_R.class);
     222                // conf.setMapperClass(IdentityMapper.class);
     223                // conf.setReducerClass(IdentityReducer.class);
     224
     225                /* delete previous output dir */
     226                if (FileSystem.get(conf).exists(output))
     227                        FileSystem.get(conf).delete(output, true);
     228                conf.setNumMapTasks(Integer.parseInt(args[3]));
     229                conf.setNumReduceTasks(Integer.parseInt(args[4]));
     230
     231                /* set input path */
     232                FileInputFormat.setInputPaths(conf, text_path);
     233                FileOutputFormat.setOutputPath(conf, output);
     234
     235                /* run */
     236                JobClient.runJob(conf);
     237
     238                return 0;
     239        }
     240
     241        public static void main(String[] args) throws Exception {
     242                /* Major parameter */
     243                // it indicates local path, not hadoop file system path
     244                String source_file = "/home/waue/log";
     245                // data source tmp
     246                String text_tmp = "/user/waue/parsed";
     247                /* Minor parameter */
     248                String table_name = "ICAS";
     249
     250                String[] argv = { source_file, text_tmp, table_name, "1", "1" };
     251                args = argv;
     252
     253                /* build table */
     254                String[] col_family = { "infor:" };
     255                BuildHTable build_table = new BuildHTable();
     256                if (!build_table.checkTableExist(args[2])) {
     257                        if (!build_table.createTable(args[2], col_family)) {
     258                                System.err.println("create table error !");
     259                        }
     260                } else {
     261                        System.err.println("Table \"" + args[2]
     262                                        + "\" has already existed !");
     263                }
     264
     265                // if (!hdfsput(args[0], args[1])) {
     266                // System.out.println("directory is existed.");
     267                // }
     268
     269                int res = ToolRunner.run(new Configuration(), new ICAS(), args);
     270                System.exit(res);
     271        }
     272}
     273
     274}}}
     275
     276
    1277 * 問題1: 如何讓run裡宣告的物件,map可使用,reduce也可以呼叫到
    2278