| 78 |   | 宣告了table的名稱,要parser的檔案放在'''hdfs'''當中的哪個路徑下,注意此路徑為hdfs,若給的是local file system的路徑的話,程式跑的時候會產生!NullPointer Exception的錯誤。然後呼叫creatTable函數其功能用來創建table,接著跑runMapReduce函數,而整個程式主體就是在runMapReduce | 
                      
                        | 79 |   |  | 
                      
                        | 80 |   | ------------------------------------ | 
                      
                        | 81 |   | {{{ | 
                      
                        | 82 |   | #!java | 
                      
                        | 83 |   |         public static class MapClass extends MapReduceBase implements | 
                      
                        | 84 |   |                         Mapper<WritableComparable, Text, Text, Writable> { | 
                      
                        | 85 |   |                 public void configure(JobConf job) { | 
                      
                        | 86 |   |                         tableName = job.get(TABLE, ""); | 
                      
                        | 87 |   |                 } | 
                      
                        | 88 |   |                 public void map(WritableComparable key, Text value, | 
                      
                        | 89 |   |                                 OutputCollector<Text, Writable> output, Reporter reporter) | 
                      
                        | 90 |   |                                 throws IOException { | 
                      
                        | 91 |   |                         try { | 
                      
                        | 92 |   |                                 LogParser log = new LogParser(value.toString()); | 
                      
                        | 93 |   |                                 if (table == null) | 
                      
                        | 94 |   |                                         table = new HTable(conf, new Text(tableName)); | 
                      
                        | 95 |   |                                 long lockId = table.startUpdate(new Text(log.getIp())); | 
                      
                        | 96 |   |                                 table.put(lockId, new Text("http:protocol"), log.getProtocol() | 
                      
                        | 97 |   |                                                 .getBytes()); | 
                      
                        | 98 |   |                                 table.put(lockId, new Text("http:method"), log.getMethod() | 
                      
                        | 99 |   |                                                 .getBytes()); | 
                      
                        | 100 |   |                                 table.put(lockId, new Text("http:code"), log.getCode() | 
                      
                        | 101 |   |                                                 .getBytes()); | 
                      
                        | 102 |   |                                 table.put(lockId, new Text("http:bytesize"), log.getByteSize() | 
                      
                        | 103 |   |                                                 .getBytes()); | 
                      
                        | 104 |   |                                 table.put(lockId, new Text("http:agent"), log.getAgent() | 
                      
                        | 105 |   |                                                 .getBytes()); | 
                      
                        | 106 |   |                                 table.put(lockId, new Text("url:" + log.getUrl()), log | 
                      
                        | 107 |   |                                                 .getReferrer().getBytes()); | 
                      
                        | 108 |   |                                 table.put(lockId, new Text("referrer:" + log.getReferrer()), | 
                      
                        | 109 |   |                                                 log.getUrl().getBytes()); | 
                      
                        | 110 |   |                                 table.commit(lockId, log.getTimestamp()); | 
                      
                        | 111 |   |                         } catch (Exception e) { | 
                      
                        | 112 |   |                                 e.printStackTrace(); | 
                      
                        | 113 |   |                         } | 
                      
                        | 114 |   |                 } | 
                      
                        | 115 |   |         } | 
                      
                        | 116 |   | }}} | 
                      
                        | 117 |   | 此內部類別繼承了 [http://hadoop.apache.org/core/docs/r0.16.4/api/org/apache/hadoop/mapred/MapReduceBase.html org.apache.hadoop.mapred.MapReduceBase] ,並實做Mapper<!WritableComparable, Text, Text, Writable> 介面, | 
                      
                        | 118 |   | 不見得所有map reduce程式都需要實做此介面,但若有要讓map能分配工作就需要寫在下面此函數中:[[BR]] | 
                      
                        | 119 |   | map(!WritableComparable key, Text value,!OutputCollector<Text, Writable> output, Reporter reporter) [[BR]] | 
                      
                        | 120 |   | 變數key為hbase中的row key,value則為值,output 可以透過collect() 功能將值寫入hbase的table中。但在此範例中, | 
                      
                        | 121 |   | 並沒有用到 output的寫入方式,reporter也沒有用到。[[br]] | 
                      
                        | 122 |   | 此方法因為有IO的存取,因此要宣告trows !IOException, 且用try來起始。[[br]][[br]] | 
                      
                        | 123 |   | 首先!LogParser log = new !LogParser(value.toString()); value的值為要parser的內容的某一行,因為基於hdfs的map-reduce架構上,hadoop會幫我們把資料整合起來,因此程式的邏輯只要處理好這一行即可。!LogParser 在下面會介紹到,目前只要知道log物件是原始資料value透過 !LogParser 處理過的產物。透過log物件的方法getIP,getProtocol(),...等,我們可以輕易取得需要的資料,用table.put( Row_Key , Column_Qualify_Name , Value) 方法將Value值填入Row_Key中的Column_Qualify_Name欄位中。接著研究table物件。[[br]] | 
                      
                        | 124 |   | table是全域變數之一,由 [http://hadoop.apache.org/hbase/docs/current/api/org/apache/hadoop/hbase/HTable.html org.apache.hadoop.hbase.HTable] 類別定義。產生出HTable物件'''必定要'''給兩個初始化的值,一個是另一個全域變數也是重要的設定檔conf,另一個是tableName也就是資料表的名稱,當HTable 的 table 物件產生出來之後,我們就可以利用put來放入資料。然而一個新的資料表,要如何給他row_key呢? | 
                      
                        | 125 |   | 因此 table.startUpdate(new Text(log.getIp())) 的功能就是 將 ip設定為table的row_key。有興趣的話可以參考[http://hadoop.apache.org/hbase/docs/current/api/org/apache/hadoop/hbase/HTable.html#startUpdate(org.apache.hadoop.io.Text) 官方的startUpdate說明] [[br]][[br]] | 
                      
                        | 126 |   | 用此方法可以回傳一個型態為Long的值,將他宣告成為lockId變數,之後就可以用他當主key,而將值一個一個輸入到對應的欄位中。因此我們可以把結構看成是這樣 | 
                      
                        | 127 |   |  || key\欄位 || http:protocol || http:method || http:code || http:bytesize || http:agent || url: || referrer: || http:code ||  | 
                      
                        | 128 |   |  || row_key  || value1 || value2 || value3 || value4 || value5 || value6 || value7 || value8 ||  | 
                      
                        | 129 |   | 需要注意的是,由於Htable 是三維度的結構,row_key、column、timestamp。因此最後commit時間變數則是利用以下方法:[[br]] | 
                      
                        | 130 |   | table.commit(lockId, log.getTimestamp()),將log內取得的時間update到整個row去。[[br]][[br]] | 
                      
                        | 131 |   |  | 
                      
                        | 132 |   | configure(jobConf conf) 此為override org.apache.hadoop.mapred.MapReduceBase.configure(JobConf ) | 
                      
                        | 133 |   | 內容只是取得並回傳Table的名字而已 | 
                      
                        | 134 |   |  | 
                      
                        | 135 |   | ------------------------------ | 
                      
                        | 136 |   | {{{ | 
                      
                        | 137 |   | #!java | 
                      
                        | 138 |   |         static public Path[] listPaths(FileSystem fsm, Path path) | 
                      
                        | 139 |   |                         throws IOException { | 
                      
                        | 140 |   |                 FileStatus[] fss = fsm.listStatus(path); | 
                      
                        | 141 |   |                 int length = fss.length; | 
                      
                        | 142 |   |                 Path[] pi = new Path[length]; | 
                      
                        | 143 |   |                 for (int i = 0; i < length; i++) { | 
                      
                        | 144 |   |                         pi[i] = fss[i].getPath(); | 
                      
                        | 145 |   |                 } | 
                      
                        | 146 |   |                 return pi; | 
                      
                        | 147 |   |         } | 
                      
                        | 148 |   | }}} | 
                      
                        | 149 |   |  | 
                      
                      
                        |   | 78 | 宣告了table的名稱,要parser的檔案放在'''hdfs'''當中的哪個路徑下,注意此路徑為hdfs,若給的是local file system的路徑的話,程式跑的時候會產生!NullPointer Exception的錯誤。然後呼叫creatTable函數其功能用來創建table,接著跑runMapReduce函數,而整個程式主體就是在runMapReduce. | 
                      
                        |   | 79 | ------- | 
                      
                        |   | 80 | {{{ | 
                      
                        |   | 81 | #!java | 
                      
                        |   | 82 |         public static void creatTable(String table) throws IOException { | 
                      
                        |   | 83 |                 HBaseAdmin admin = new HBaseAdmin(conf); | 
                      
                        |   | 84 |                 if (!admin.tableExists(new Text(table))) { | 
                      
                        |   | 85 |                         System.out.println("1. " + table | 
                      
                        |   | 86 |                                         + " table creating ... please wait"); | 
                      
                        |   | 87 |                         HTableDescriptor tableDesc = new HTableDescriptor(table); | 
                      
                        |   | 88 |                         tableDesc.addFamily(new HColumnDescriptor("http:")); | 
                      
                        |   | 89 |                         tableDesc.addFamily(new HColumnDescriptor("url:")); | 
                      
                        |   | 90 |                         tableDesc.addFamily(new HColumnDescriptor("referrer:")); | 
                      
                        |   | 91 |                         admin.createTable(tableDesc); | 
                      
                        |   | 92 |                 } else { | 
                      
                        |   | 93 |                         System.out.println("1. " + table + " table already exists."); | 
                      
                        |   | 94 |                 } | 
                      
                        |   | 95 |                 System.out.println("2. access_log files fetching using map/reduce"); | 
                      
                        |   | 96 |         } | 
                      
                        |   | 97 | }}} | 
                      
                        |   | 98 |  | 
                      
                        |   | 99 | 此段為建立Hbase 的table,首先建立[http://hadoop.apache.org/hbase/docs/current/api/org/apache/hadoop/hbase/HBaseAdmin.html HBaseAdmin]的 admin物件出來,在用admin的tableExists函數來檢查此資料表是否已經存在,若沒存在的話,則用HTableDescriptor類別建立 tableDesc物件,此物件的參數是table name,可以了解透過addFamily()建立http、url、referrer等column family,最後HBaseAdmin類別中有createTable的函數,將之前的tableDesc物件當參數餵給admin物件的createTable函數則表單建立完成。 | 
                      
                        |   | 100 | --------- | 
                      
            
                      
                        | 188 |   |  | 
                      
                        | 189 |   | {{{ | 
                      
                        | 190 |   | #!java | 
                      
                        | 191 |   |         public static void creatTable(String table) throws IOException { | 
                      
                        | 192 |   |                 HBaseAdmin admin = new HBaseAdmin(conf); | 
                      
                        | 193 |   |                 if (!admin.tableExists(new Text(table))) { | 
                      
                        | 194 |   |                         System.out.println("1. " + table | 
                      
                        | 195 |   |                                         + " table creating ... please wait"); | 
                      
                        | 196 |   |                         HTableDescriptor tableDesc = new HTableDescriptor(table); | 
                      
                        | 197 |   |                         tableDesc.addFamily(new HColumnDescriptor("http:")); | 
                      
                        | 198 |   |                         tableDesc.addFamily(new HColumnDescriptor("url:")); | 
                      
                        | 199 |   |                         tableDesc.addFamily(new HColumnDescriptor("referrer:")); | 
                      
                        | 200 |   |                         admin.createTable(tableDesc); | 
                      
                        | 201 |   |                 } else { | 
                      
                        | 202 |   |                         System.out.println("1. " + table + " table already exists."); | 
                      
                        | 203 |   |                 } | 
                      
                        | 204 |   |                 System.out.println("2. access_log files fetching using map/reduce"); | 
                      
                        | 205 |   |         } | 
                      
                        | 206 |   | }}} | 
                      
                        | 207 |   |  | 
                      
                      
                        |   | 139 | 這段function code中,[http://hadoop.apache.org/core/docs/r0.16.4/api/org/apache/hadoop/fs/Path.html Path] 是宣告檔案路徑的類別,可以把Path想做是標準JAVA IO的File類別,用來定義檔案路徑。[ http://hadoop.apache.org/core/docs/r0.16.4/api/org/apache/hadoop/fs/FileSystem.html FileSystem] 的物件生成方式不是用new的方式,而是用get(JobConf) 的方法,裡面填的變數是之前宣告過的全域變數HBase的設定參數conf。[[br]][[br]] | 
                      
                        |   | 140 | 後面的參數jobConf 則是重要的map reduce 設定檔,由於其生命週期只在此function,因此並不用移到外面作全域變數。[[br]] | 
                      
                        |   | 141 | JobConf的建構子有很多種型態,詳細可以看[http://hadoop.apache.org/core/docs/r0.16.4/api/org/apache/hadoop/mapred/JobConf.html JobConf官網API] ,一般較基本的範例程式都是用JobConf(Class exampleClass)  即可,但此範例用JobConf(Configuration conf, Class exampleClass) ,是因有個HBaseConfiguration 的conf,HBaseConfiguration為Configuration的子類別,因此conf可以放入JonConf當其設定元素,因此用'''JobConf jobConf = new JobConf(conf, LogParserGo.class);''' 。[[br]][[br]] | 
                      
                        |   | 142 | 接著看到jobConf.set(TABLE, table); jobConf並沒有set的方法,而是其父類別 org.apache.hadoop.conf.Configuration 有,因此用set將環境參數設定進去,按照格式設定table 的名稱。 | 
                      
                        |   | 143 |  || jobConf.set|| (TABLE || table) ||  | 
                      
                        |   | 144 |  ||  || name property || value ||  | 
                      
                        |   | 145 |  ||  || table.name || table_name ||  | 
                      
                        |   | 146 |  | 
                      
                        |   | 147 | 接下來一整段,都是在設定放在hdfs內要parser的log的檔案路徑。listPaths 這個函數原本在 FileSystem 是有定義的,但hadoop 0.16 API已經predecated,而Hadoop 0.17 API已經徹底拿掉,為了消錯也為了日後升級方便,因此在這類別裡寫了listPaths來用,其功能主要是將 path 裡的檔案或路徑都回傳紀錄到path [] 陣列。因此 isFile() 函數用來判斷是否為檔案,若不是檔案則為目錄,setInputPath 可以設定一個輸入路徑,但若有很多個路徑要加入,則可以用addInputPath[[br]][[br]] | 
                      
                        |   | 148 |  | 
                      
                        |   | 149 | output目錄只是中間的暫時產物,因此當程式跑到最後就 fs.delete 來刪除之。[[br]][[br]] | 
                      
                        |   | 150 |  | 
                      
                        |   | 151 | client.getClusterStatus(); 此段程式碼是取得環境中有多少個nodes可以使用,並設到設定參數中。[[br]][[br]] | 
                      
                        |   | 152 |  | 
                      
                        |   | 153 | 怎麼知道程式跑完map-reduce ,JobClient.runJob() 則是 hadoop 呼叫開始跑map reduce的,如同thread 的run() 相似。 | 
                      
                        |   | 154 |  | 
                      
                        |   | 155 | [[br]][[br]] | 
                      
                        |   | 156 |  | 
                      
                        |   | 157 | jobConf.setMapperClass(MapClass.class); 是用來設定 mapClass 如何運作,也就是下一段就會介紹到的內部類別 MapClass.class,而像此程式沒有用到reduce,因此不用設定;而若map 或 reduce 其中有一個沒用到其功能的話,也可以設定基本元件 IdentityReducer、IdentityMapper來取代。 | 
                      
                        |   | 158 |  | 
                      
                        |   | 159 |  | 
                      
                        |   | 160 | ---- | 
                      
                        |   | 161 | {{{ | 
                      
                        |   | 162 | #!java | 
                      
                        |   | 163 |         public static class MapClass extends MapReduceBase implements | 
                      
                        |   | 164 |                         Mapper<WritableComparable, Text, Text, Writable> { | 
                      
                        |   | 165 |                 public void configure(JobConf job) { | 
                      
                        |   | 166 |                         tableName = job.get(TABLE, ""); | 
                      
                        |   | 167 |                 } | 
                      
                        |   | 168 |                 public void map(WritableComparable key, Text value, | 
                      
                        |   | 169 |                                 OutputCollector<Text, Writable> output, Reporter reporter) | 
                      
                        |   | 170 |                                 throws IOException { | 
                      
                        |   | 171 |                         try { | 
                      
                        |   | 172 |                                 LogParser log = new LogParser(value.toString()); | 
                      
                        |   | 173 |                                 if (table == null) | 
                      
                        |   | 174 |                                         table = new HTable(conf, new Text(tableName)); | 
                      
                        |   | 175 |                                 long lockId = table.startUpdate(new Text(log.getIp())); | 
                      
                        |   | 176 |                                 table.put(lockId, new Text("http:protocol"), log.getProtocol() | 
                      
                        |   | 177 |                                                 .getBytes()); | 
                      
                        |   | 178 |                                 table.put(lockId, new Text("http:method"), log.getMethod() | 
                      
                        |   | 179 |                                                 .getBytes()); | 
                      
                        |   | 180 |                                 table.put(lockId, new Text("http:code"), log.getCode() | 
                      
                        |   | 181 |                                                 .getBytes()); | 
                      
                        |   | 182 |                                 table.put(lockId, new Text("http:bytesize"), log.getByteSize() | 
                      
                        |   | 183 |                                                 .getBytes()); | 
                      
                        |   | 184 |                                 table.put(lockId, new Text("http:agent"), log.getAgent() | 
                      
                        |   | 185 |                                                 .getBytes()); | 
                      
                        |   | 186 |                                 table.put(lockId, new Text("url:" + log.getUrl()), log | 
                      
                        |   | 187 |                                                 .getReferrer().getBytes()); | 
                      
                        |   | 188 |                                 table.put(lockId, new Text("referrer:" + log.getReferrer()), | 
                      
                        |   | 189 |                                                 log.getUrl().getBytes()); | 
                      
                        |   | 190 |                                 table.commit(lockId, log.getTimestamp()); | 
                      
                        |   | 191 |                         } catch (Exception e) { | 
                      
                        |   | 192 |                                 e.printStackTrace(); | 
                      
                        |   | 193 |                         } | 
                      
                        |   | 194 |                 } | 
                      
                        |   | 195 |         } | 
                      
                        |   | 196 | }}} | 
                      
                        |   | 197 | 此內部類別繼承了 [http://hadoop.apache.org/core/docs/r0.16.4/api/org/apache/hadoop/mapred/MapReduceBase.html org.apache.hadoop.mapred.MapReduceBase] ,並實做Mapper<!WritableComparable, Text, Text, Writable> 介面, | 
                      
                        |   | 198 | 不見得所有map reduce程式都需要實做此介面,但若有要讓map能分配工作就需要寫在下面此函數中:[[BR]] | 
                      
                        |   | 199 | map(!WritableComparable key, Text value,!OutputCollector<Text, Writable> output, Reporter reporter) [[BR]] | 
                      
                        |   | 200 | 變數key為hbase中的row key,value則為值,output 可以透過collect() 功能將值寫入hbase的table中。但在此範例中, | 
                      
                        |   | 201 | 並沒有用到 output的寫入方式,reporter也沒有用到。[[br]] | 
                      
                        |   | 202 | 此方法因為有IO的存取,因此要宣告trows !IOException, 且用try來起始。[[br]][[br]] | 
                      
                        |   | 203 | 首先!LogParser log = new !LogParser(value.toString()); value的值為要parser的內容的某一行,因為基於hdfs的map-reduce架構上,hadoop會幫我們把資料整合起來,因此程式的邏輯只要處理好這一行即可。!LogParser 在下面會介紹到,目前只要知道log物件是原始資料value透過 !LogParser 處理過的產物。透過log物件的方法getIP,getProtocol(),...等,我們可以輕易取得需要的資料,用table.put( Row_Key , Column_Qualify_Name , Value) 方法將Value值填入Row_Key中的Column_Qualify_Name欄位中。接著研究table物件。[[br]] | 
                      
                        |   | 204 | table是全域變數之一,由 [http://hadoop.apache.org/hbase/docs/current/api/org/apache/hadoop/hbase/HTable.html org.apache.hadoop.hbase.HTable] 類別定義。產生出HTable物件'''必定要'''給兩個初始化的值,一個是另一個全域變數也是重要的Hbase設定檔conf,另一個是tableName也就是資料表的名稱,當HTable 的 table 物件產生出來之後,我們就可以利用put來放入資料。然而一個新的資料表,要如何給他row_key呢? | 
                      
                        |   | 205 | 因此 table.startUpdate(new Text(log.getIp())) 的功能就是 將 ip設定為table的row_key。有興趣的話可以參考[http://hadoop.apache.org/hbase/docs/current/api/org/apache/hadoop/hbase/HTable.html#startUpdate(org.apache.hadoop.io.Text) 官方的startUpdate說明] [[br]][[br]] | 
                      
                        |   | 206 | 用此方法可以回傳一個型態為Long的值,將他宣告成為lockId變數,之後就可以用他當主key,而將值一個一個輸入到對應的欄位中。因此我們可以把結構看成是這樣 | 
                      
                        |   | 207 |  || key\欄位 || http:protocol || http:method || http:code || http:bytesize || http:agent || url: || referrer: || http:code ||  | 
                      
                        |   | 208 |  || row_key  || value1 || value2 || value3 || value4 || value5 || value6 || value7 || value8 ||  | 
                      
                        |   | 209 | 需要注意的是,由於Htable 是三維度的結構,row_key、column、timestamp。因此最後commit時間變數則是利用以下方法:[[br]] | 
                      
                        |   | 210 | table.commit(lockId, log.getTimestamp()),將log內取得的時間update到整個row去。[[br]][[br]] | 
                      
                        |   | 211 |  | 
                      
                        |   | 212 | configure(!JobConf conf) 此為override org.apache.hadoop.mapred.MapReduceBase.configure(!JobConf ) | 
                      
                        |   | 213 | 內容只是取得並回傳Table的名字而已 | 
                      
                        |   | 214 |  | 
                      
                        |   | 215 | ---- | 
                      
                        |   | 216 | {{{ | 
                      
                        |   | 217 | #!java | 
                      
                        |   | 218 |         static public Path[] listPaths(FileSystem fsm, Path path) | 
                      
                        |   | 219 |                         throws IOException { | 
                      
                        |   | 220 |                 FileStatus[] fss = fsm.listStatus(path); | 
                      
                        |   | 221 |                 int length = fss.length; | 
                      
                        |   | 222 |                 Path[] pi = new Path[length]; | 
                      
                        |   | 223 |                 for (int i = 0; i < length; i++) { | 
                      
                        |   | 224 |                         pi[i] = fss[i].getPath(); | 
                      
                        |   | 225 |                 } | 
                      
                        |   | 226 |                 return pi; | 
                      
                        |   | 227 |         } | 
                      
                        |   | 228 | }}} | 
                      
                        |   | 229 | 此函數用來列出路徑中的目錄或檔案,是利用listStatus實做原本的listPaths。 | 
                      
                        |   | 230 | ---- |