Changes between Version 1 and Version 2 of waue/2009/SEC_to_ICAS


Ignore:
Timestamp:
Aug 4, 2009, 5:39:00 PM (15 years ago)
Author:
waue
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • waue/2009/SEC_to_ICAS

    v1 v2  
    175175?>
    176176}}}
     177
     178
     179 = ICAS =
     180 == Map ==
     181
     182{{{
     183        public static class ICAS_M extends MapReduceBase implements
     184                        Mapper<LongWritable, Text, Text, Text> {
     185
     186                String getip(String str) {
     187                        String[] ret = str.split(":");
     188                        return ret[0];
     189                }
     190
     191                public void map(LongWritable key, Text value,
     192                                OutputCollector<Text, Text> output, Reporter reporter)
     193                                throws IOException {
     194                        // [3] sig [4]class [5]y [6]m [7]d [8]h [9]M [10]s [11]s [12]d [13]t
     195
     196                        String line = value.toString();
     197                        String[] str = line.split(";");
     198                        String source_ip = getip(str[11]);
     199                        String dest_ip = getip(str[12]);
     200                        String fkey = dest_ip ;
     201
     202                        String date = str[5] + "/" + str[6] + "/" + str[7] + "_" + str[8]
     203                                        + ":" + str[9] + ":" + str[10];
     204                        // source @ date @ sig @ class @ type
     205                        String fvalue = source_ip + "@" + date + "@" + str[3] +"@" +str[4]
     206                              + "@"+ str[13];
     207                        output.collect(new Text(fkey), new Text(fvalue));
     208                }
     209        }
     210
     211
     212}}}
     213
     214 == Reduce ==
     215
     216{{{
     217        public static class ICAS_R extends TableReduce<Text, Text> {
     218
     219                public void reduce(Text key, Iterator<Text> values,
     220                                OutputCollector<ImmutableBytesWritable, BatchUpdate> output,
     221                                Reporter reporter) throws IOException {
     222                        HTable table = new HTable("ICAS");
     223                        String source_ip;
     224                        String date;
     225                        String sig_class;
     226                        String type;
     227                        String signature;
     228                       
     229                        String rawstr = new String(values.next().getBytes());
     230                        String[] str = rawstr.split("@");
     231                        source_ip = str[0];
     232                        date = str[1];
     233                        signature = str[2];
     234                        sig_class = str[3];
     235                        type = str[4];
     236                        // values.next().getByte() can get value and transfer to byte form,
     237                        while (values.hasNext()) {
     238                                // source_ip + "@" + date + "@" + sig + "@" + class + "@" + type;
     239                                rawstr = new String(values.next().getBytes());
     240                                str = rawstr.split("@");
     241                                source_ip = source_ip + " ; " + str[0];
     242                                date = date + " ; " + str[1];
     243                                signature = signature + ";" + str[2];
     244                        }
     245                        reporter.setStatus("amp emitting cell for row'" + key.toString()
     246                                        + "'");
     247                        BatchUpdate map = new BatchUpdate(key.toString());
     248                        // map.setTimestamp(timestamp);
     249                        map.put("infor:source_ip", Bytes.toBytes(source_ip));
     250                        map.put("infor:signature", Bytes.toBytes(signature));
     251                        map.put("infor:date", Bytes.toBytes(date));
     252                        map.put("infor:class", Bytes.toBytes(sig_class));
     253                        map.put("infor:type", Bytes.toBytes(type));
     254                        table.commit(map);
     255                        // ImmutableBytesWritable Hkey = new ImmutableBytesWritable(Bytes
     256                        // .toBytes(key.toString()));
     257                        // output.collect(Hkey, map);
     258
     259                }
     260        }
     261}}}