Changes between Version 23 and Version 24 of LogParser


Ignore:
Timestamp:
Jul 8, 2008, 2:18:49 PM (16 years ago)
Author:
waue
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • LogParser

    v23 v24  
    11 = 目的 =
    2  This program will parse your apache log and store it into Hbase.
     2將apache log 匯入到HBase的table中
    33
    44 = 如何使用 =
     
    77$ bin/hadoop dfs -put /var/log/apache2/ apache-log
    88}}}
    9  2 Set the correct parameter "dir" in main contains the logs.
     9 2 Set the correct parameter "dir" in main contains the logs. 
    1010 3 Filter or delete the exception contents as below manually,
    1111{{{
    1212::1 - - [29/Jun/2008:07:35:15 +0800] "GET / HTTP/1.0" 200 729 "...
    1313}}}
    14  4 Run by Eclipse
     14 4 Run by Eclipse 
    1515= 結果 =
    1616 1 執行以下指令
     
    5151!LogParserGo共宣告了以下幾個全域變數及方法:
    5252
    53  1. HBaseConfiguration conf為重要的控制設定參數,其定義了很多方法可以設定或取得map reduce程式運作所需要的值
     53 1. HBaseConfiguration conf是Hbase中重要的設定參數,
    5454 2. 定義 TABLE 為 "table.name",table.name為 name property
    5555 3. String tableName 為資料表名稱
     
    7676
    7777首先看到main函數究竟搞了些什麼?[[br]][[br]]
    78 宣告了table的名稱,要parser的檔案放在'''hdfs'''當中的哪個路徑下,注意此路徑為hdfs,若給的是local file system的路徑的話,程式跑的時候會產生!NullPointer Exception的錯誤。然後呼叫creatTable函數其功能用來創建table,接著跑runMapReduce函數,而整個程式主體就是在runMapReduce
    79 
    80 ------------------------------------
    81 {{{
    82 #!java
    83         public static class MapClass extends MapReduceBase implements
    84                         Mapper<WritableComparable, Text, Text, Writable> {
    85                 public void configure(JobConf job) {
    86                         tableName = job.get(TABLE, "");
    87                 }
    88                 public void map(WritableComparable key, Text value,
    89                                 OutputCollector<Text, Writable> output, Reporter reporter)
    90                                 throws IOException {
    91                         try {
    92                                 LogParser log = new LogParser(value.toString());
    93                                 if (table == null)
    94                                         table = new HTable(conf, new Text(tableName));
    95                                 long lockId = table.startUpdate(new Text(log.getIp()));
    96                                 table.put(lockId, new Text("http:protocol"), log.getProtocol()
    97                                                 .getBytes());
    98                                 table.put(lockId, new Text("http:method"), log.getMethod()
    99                                                 .getBytes());
    100                                 table.put(lockId, new Text("http:code"), log.getCode()
    101                                                 .getBytes());
    102                                 table.put(lockId, new Text("http:bytesize"), log.getByteSize()
    103                                                 .getBytes());
    104                                 table.put(lockId, new Text("http:agent"), log.getAgent()
    105                                                 .getBytes());
    106                                 table.put(lockId, new Text("url:" + log.getUrl()), log
    107                                                 .getReferrer().getBytes());
    108                                 table.put(lockId, new Text("referrer:" + log.getReferrer()),
    109                                                 log.getUrl().getBytes());
    110                                 table.commit(lockId, log.getTimestamp());
    111                         } catch (Exception e) {
    112                                 e.printStackTrace();
    113                         }
    114                 }
    115         }
    116 }}}
    117 此內部類別繼承了 [http://hadoop.apache.org/core/docs/r0.16.4/api/org/apache/hadoop/mapred/MapReduceBase.html org.apache.hadoop.mapred.MapReduceBase] ,並實做Mapper<!WritableComparable, Text, Text, Writable> 介面,
    118 不見得所有map reduce程式都需要實做此介面,但若有要讓map能分配工作就需要寫在下面此函數中:[[BR]]
    119 map(!WritableComparable key, Text value,!OutputCollector<Text, Writable> output, Reporter reporter) [[BR]]
    120 變數key為hbase中的row key,value則為值,output 可以透過collect() 功能將值寫入hbase的table中。但在此範例中,
    121 並沒有用到 output的寫入方式,reporter也沒有用到。[[br]]
    122 此方法因為有IO的存取,因此要宣告trows !IOException, 且用try來起始。[[br]][[br]]
    123 首先!LogParser log = new !LogParser(value.toString()); value的值為要parser的內容的某一行,因為基於hdfs的map-reduce架構上,hadoop會幫我們把資料整合起來,因此程式的邏輯只要處理好這一行即可。!LogParser 在下面會介紹到,目前只要知道log物件是原始資料value透過 !LogParser 處理過的產物。透過log物件的方法getIP,getProtocol(),...等,我們可以輕易取得需要的資料,用table.put( Row_Key , Column_Qualify_Name , Value) 方法將Value值填入Row_Key中的Column_Qualify_Name欄位中。接著研究table物件。[[br]]
    124 table是全域變數之一,由 [http://hadoop.apache.org/hbase/docs/current/api/org/apache/hadoop/hbase/HTable.html org.apache.hadoop.hbase.HTable] 類別定義。產生出HTable物件'''必定要'''給兩個初始化的值,一個是另一個全域變數也是重要的設定檔conf,另一個是tableName也就是資料表的名稱,當HTable 的 table 物件產生出來之後,我們就可以利用put來放入資料。然而一個新的資料表,要如何給他row_key呢?
    125 因此 table.startUpdate(new Text(log.getIp())) 的功能就是 將 ip設定為table的row_key。有興趣的話可以參考[http://hadoop.apache.org/hbase/docs/current/api/org/apache/hadoop/hbase/HTable.html#startUpdate(org.apache.hadoop.io.Text) 官方的startUpdate說明] [[br]][[br]]
    126 用此方法可以回傳一個型態為Long的值,將他宣告成為lockId變數,之後就可以用他當主key,而將值一個一個輸入到對應的欄位中。因此我們可以把結構看成是這樣
    127  || key\欄位 || http:protocol || http:method || http:code || http:bytesize || http:agent || url: || referrer: || http:code ||
    128  || row_key  || value1 || value2 || value3 || value4 || value5 || value6 || value7 || value8 ||
    129 需要注意的是,由於Htable 是三維度的結構,row_key、column、timestamp。因此最後commit時間變數則是利用以下方法:[[br]]
    130 table.commit(lockId, log.getTimestamp()),將log內取得的時間update到整個row去。[[br]][[br]]
    131 
    132 configure(jobConf conf) 此為override org.apache.hadoop.mapred.MapReduceBase.configure(JobConf )
    133 內容只是取得並回傳Table的名字而已
    134 
    135 ------------------------------
    136 {{{
    137 #!java
    138         static public Path[] listPaths(FileSystem fsm, Path path)
    139                         throws IOException {
    140                 FileStatus[] fss = fsm.listStatus(path);
    141                 int length = fss.length;
    142                 Path[] pi = new Path[length];
    143                 for (int i = 0; i < length; i++) {
    144                         pi[i] = fss[i].getPath();
    145                 }
    146                 return pi;
    147         }
    148 }}}
    149 
     78宣告了table的名稱,要parser的檔案放在'''hdfs'''當中的哪個路徑下,注意此路徑為hdfs,若給的是local file system的路徑的話,程式跑的時候會產生!NullPointer Exception的錯誤。然後呼叫creatTable函數其功能用來創建table,接著跑runMapReduce函數,而整個程式主體就是在runMapReduce.
     79-------
     80{{{
     81#!java
     82        public static void creatTable(String table) throws IOException {
     83                HBaseAdmin admin = new HBaseAdmin(conf);
     84                if (!admin.tableExists(new Text(table))) {
     85                        System.out.println("1. " + table
     86                                        + " table creating ... please wait");
     87                        HTableDescriptor tableDesc = new HTableDescriptor(table);
     88                        tableDesc.addFamily(new HColumnDescriptor("http:"));
     89                        tableDesc.addFamily(new HColumnDescriptor("url:"));
     90                        tableDesc.addFamily(new HColumnDescriptor("referrer:"));
     91                        admin.createTable(tableDesc);
     92                } else {
     93                        System.out.println("1. " + table + " table already exists.");
     94                }
     95                System.out.println("2. access_log files fetching using map/reduce");
     96        }
     97}}}
     98
     99此段為建立Hbase 的table,首先建立[http://hadoop.apache.org/hbase/docs/current/api/org/apache/hadoop/hbase/HBaseAdmin.html HBaseAdmin]的 admin物件出來,在用admin的tableExists函數來檢查此資料表是否已經存在,若沒存在的話,則用HTableDescriptor類別建立 tableDesc物件,此物件的參數是table name,可以了解透過addFamily()建立http、url、referrer等column family,最後HBaseAdmin類別中有createTable的函數,將之前的tableDesc物件當參數餵給admin物件的createTable函數則表單建立完成。
     100---------
    150101{{{
    151102#!java
     
    186137        }
    187138}}}
    188 
    189 {{{
    190 #!java
    191         public static void creatTable(String table) throws IOException {
    192                 HBaseAdmin admin = new HBaseAdmin(conf);
    193                 if (!admin.tableExists(new Text(table))) {
    194                         System.out.println("1. " + table
    195                                         + " table creating ... please wait");
    196                         HTableDescriptor tableDesc = new HTableDescriptor(table);
    197                         tableDesc.addFamily(new HColumnDescriptor("http:"));
    198                         tableDesc.addFamily(new HColumnDescriptor("url:"));
    199                         tableDesc.addFamily(new HColumnDescriptor("referrer:"));
    200                         admin.createTable(tableDesc);
    201                 } else {
    202                         System.out.println("1. " + table + " table already exists.");
    203                 }
    204                 System.out.println("2. access_log files fetching using map/reduce");
    205         }
    206 }}}
    207 
     139這段function code中,[http://hadoop.apache.org/core/docs/r0.16.4/api/org/apache/hadoop/fs/Path.html Path] 是宣告檔案路徑的類別,可以把Path想做是標準JAVA IO的File類別,用來定義檔案路徑。[ http://hadoop.apache.org/core/docs/r0.16.4/api/org/apache/hadoop/fs/FileSystem.html FileSystem] 的物件生成方式不是用new的方式,而是用get(JobConf) 的方法,裡面填的變數是之前宣告過的全域變數HBase的設定參數conf。[[br]][[br]]
     140後面的參數jobConf 則是重要的map reduce 設定檔,由於其生命週期只在此function,因此並不用移到外面作全域變數。[[br]]
     141JobConf的建構子有很多種型態,詳細可以看[http://hadoop.apache.org/core/docs/r0.16.4/api/org/apache/hadoop/mapred/JobConf.html JobConf官網API] ,一般較基本的範例程式都是用JobConf(Class exampleClass)  即可,但此範例用JobConf(Configuration conf, Class exampleClass) ,是因有個HBaseConfiguration 的conf,HBaseConfiguration為Configuration的子類別,因此conf可以放入JonConf當其設定元素,因此用'''JobConf jobConf = new JobConf(conf, LogParserGo.class);''' 。[[br]][[br]]
     142接著看到jobConf.set(TABLE, table); jobConf並沒有set的方法,而是其父類別 org.apache.hadoop.conf.Configuration 有,因此用set將環境參數設定進去,按照格式設定table 的名稱。
     143 || jobConf.set|| (TABLE || table) ||
     144 ||  || name property || value ||
     145 ||  || table.name || table_name ||
     146
     147接下來一整段,都是在設定放在hdfs內要parser的log的檔案路徑。listPaths 這個函數原本在 FileSystem 是有定義的,但hadoop 0.16 API已經predecated,而Hadoop 0.17 API已經徹底拿掉,為了消錯也為了日後升級方便,因此在這類別裡寫了listPaths來用,其功能主要是將 path 裡的檔案或路徑都回傳紀錄到path [] 陣列。因此 isFile() 函數用來判斷是否為檔案,若不是檔案則為目錄,setInputPath 可以設定一個輸入路徑,但若有很多個路徑要加入,則可以用addInputPath[[br]][[br]]
     148
     149output目錄只是中間的暫時產物,因此當程式跑到最後就 fs.delete 來刪除之。[[br]][[br]]
     150
     151client.getClusterStatus(); 此段程式碼是取得環境中有多少個nodes可以使用,並設到設定參數中。[[br]][[br]]
     152
     153怎麼知道程式跑完map-reduce ,JobClient.runJob() 則是 hadoop 呼叫開始跑map reduce的,如同thread 的run() 相似。
     154
     155[[br]][[br]]
     156
     157jobConf.setMapperClass(MapClass.class); 是用來設定 mapClass 如何運作,也就是下一段就會介紹到的內部類別 MapClass.class,而像此程式沒有用到reduce,因此不用設定;而若map 或 reduce 其中有一個沒用到其功能的話,也可以設定基本元件 IdentityReducer、IdentityMapper來取代。
     158
     159
     160----
     161{{{
     162#!java
     163        public static class MapClass extends MapReduceBase implements
     164                        Mapper<WritableComparable, Text, Text, Writable> {
     165                public void configure(JobConf job) {
     166                        tableName = job.get(TABLE, "");
     167                }
     168                public void map(WritableComparable key, Text value,
     169                                OutputCollector<Text, Writable> output, Reporter reporter)
     170                                throws IOException {
     171                        try {
     172                                LogParser log = new LogParser(value.toString());
     173                                if (table == null)
     174                                        table = new HTable(conf, new Text(tableName));
     175                                long lockId = table.startUpdate(new Text(log.getIp()));
     176                                table.put(lockId, new Text("http:protocol"), log.getProtocol()
     177                                                .getBytes());
     178                                table.put(lockId, new Text("http:method"), log.getMethod()
     179                                                .getBytes());
     180                                table.put(lockId, new Text("http:code"), log.getCode()
     181                                                .getBytes());
     182                                table.put(lockId, new Text("http:bytesize"), log.getByteSize()
     183                                                .getBytes());
     184                                table.put(lockId, new Text("http:agent"), log.getAgent()
     185                                                .getBytes());
     186                                table.put(lockId, new Text("url:" + log.getUrl()), log
     187                                                .getReferrer().getBytes());
     188                                table.put(lockId, new Text("referrer:" + log.getReferrer()),
     189                                                log.getUrl().getBytes());
     190                                table.commit(lockId, log.getTimestamp());
     191                        } catch (Exception e) {
     192                                e.printStackTrace();
     193                        }
     194                }
     195        }
     196}}}
     197此內部類別繼承了 [http://hadoop.apache.org/core/docs/r0.16.4/api/org/apache/hadoop/mapred/MapReduceBase.html org.apache.hadoop.mapred.MapReduceBase] ,並實做Mapper<!WritableComparable, Text, Text, Writable> 介面,
     198不見得所有map reduce程式都需要實做此介面,但若有要讓map能分配工作就需要寫在下面此函數中:[[BR]]
     199map(!WritableComparable key, Text value,!OutputCollector<Text, Writable> output, Reporter reporter) [[BR]]
     200變數key為hbase中的row key,value則為值,output 可以透過collect() 功能將值寫入hbase的table中。但在此範例中,
     201並沒有用到 output的寫入方式,reporter也沒有用到。[[br]]
     202此方法因為有IO的存取,因此要宣告trows !IOException, 且用try來起始。[[br]][[br]]
     203首先!LogParser log = new !LogParser(value.toString()); value的值為要parser的內容的某一行,因為基於hdfs的map-reduce架構上,hadoop會幫我們把資料整合起來,因此程式的邏輯只要處理好這一行即可。!LogParser 在下面會介紹到,目前只要知道log物件是原始資料value透過 !LogParser 處理過的產物。透過log物件的方法getIP,getProtocol(),...等,我們可以輕易取得需要的資料,用table.put( Row_Key , Column_Qualify_Name , Value) 方法將Value值填入Row_Key中的Column_Qualify_Name欄位中。接著研究table物件。[[br]]
     204table是全域變數之一,由 [http://hadoop.apache.org/hbase/docs/current/api/org/apache/hadoop/hbase/HTable.html org.apache.hadoop.hbase.HTable] 類別定義。產生出HTable物件'''必定要'''給兩個初始化的值,一個是另一個全域變數也是重要的Hbase設定檔conf,另一個是tableName也就是資料表的名稱,當HTable 的 table 物件產生出來之後,我們就可以利用put來放入資料。然而一個新的資料表,要如何給他row_key呢?
     205因此 table.startUpdate(new Text(log.getIp())) 的功能就是 將 ip設定為table的row_key。有興趣的話可以參考[http://hadoop.apache.org/hbase/docs/current/api/org/apache/hadoop/hbase/HTable.html#startUpdate(org.apache.hadoop.io.Text) 官方的startUpdate說明] [[br]][[br]]
     206用此方法可以回傳一個型態為Long的值,將他宣告成為lockId變數,之後就可以用他當主key,而將值一個一個輸入到對應的欄位中。因此我們可以把結構看成是這樣
     207 || key\欄位 || http:protocol || http:method || http:code || http:bytesize || http:agent || url: || referrer: || http:code ||
     208 || row_key  || value1 || value2 || value3 || value4 || value5 || value6 || value7 || value8 ||
     209需要注意的是,由於Htable 是三維度的結構,row_key、column、timestamp。因此最後commit時間變數則是利用以下方法:[[br]]
     210table.commit(lockId, log.getTimestamp()),將log內取得的時間update到整個row去。[[br]][[br]]
     211
     212configure(!JobConf conf) 此為override org.apache.hadoop.mapred.MapReduceBase.configure(!JobConf )
     213內容只是取得並回傳Table的名字而已
     214
     215----
     216{{{
     217#!java
     218        static public Path[] listPaths(FileSystem fsm, Path path)
     219                        throws IOException {
     220                FileStatus[] fss = fsm.listStatus(path);
     221                int length = fss.length;
     222                Path[] pi = new Path[length];
     223                for (int i = 0; i < length; i++) {
     224                        pi[i] = fss[i].getPath();
     225                }
     226                return pi;
     227        }
     228}}}
     229此函數用來列出路徑中的目錄或檔案,是利用listStatus實做原本的listPaths。
     230----
    208231
    209232
     
    270293之後就很顯而易見,用matcher.group(n)取得值後,一一的用this.參數來作設定,但其實不用this 編譯依然能過關,只是習慣在建構子內用到該class的參數會這麼用(以跟繼承到父類別的參數作區別?)其中時間需要用!SimpleDateFormat小轉譯一下,http的內容需要用split()來作更細部的分解。
    271294
     295----
    272296{{{
    273297#!java