= 目的 = 將apache log 匯入到HBase的table中 = 如何使用 = 1 Upload apache logs ( /var/log/apache2/access.log* ) to hdfs (default: /user/waue/apache-log) {{{ $ bin/hadoop dfs -put /var/log/apache2/ apache-log }}} 2 Set the correct parameter "dir" in main contains the logs. 3 Filter or delete the exception contents as below manually, {{{ ::1 - - [29/Jun/2008:07:35:15 +0800] "GET / HTTP/1.0" 200 729 "... }}} 4 Run by Eclipse = 結果 = 1 執行以下指令 {{{ hql > select * from apache-log; }}} 2 原始的apache log 如下: {{{ 118.170.101.250 - - [19/Jun/2008:23:21:12 +0800] "GET http://203.187.1.180/goldchun555/index.htm HTTP/1.1" 404 318 "-" "Mozilla/4.0 (compatible; MSIE 4.01; Windows 95)" ... (skip) 87.65.93.58 - - [18/Jun/2008:06:54:57 +0800] "OPTIONS * HTTP/1.1" 400 300 "-" "-" }}} 3 結果 || Row || Column || Cell || || 118.170.101.250 || http:agent || Mozilla/4.0 (compatible; || || ..(skip).. || ..(skip).. || ..(skip).. || || 87.65.93.58 || http:method || OPTIONS || || 87.65.93.58 || http:protocol || HTTP/1.1 || 31 row(s) in set. (0.58 sec) = !LogParserGo.java = {{{ #!java public class LogParserGo { static HBaseConfiguration conf = new HBaseConfiguration(); public static final String TABLE = "table.name"; static String tableName; static HTable table = null; public static class MapClass; static public Path[] listPaths(FileSystem fsm, Path path); public static void runMapReduce(String table, String dir); public static void creatTable(String table) ; public static void main(String[] args) ; }}} !LogParserGo共宣告了以下幾個全域變數及方法: 1. HBaseConfiguration conf是Hbase中重要的設定參數, 2. 定義 TABLE 為 "table.name",table.name為 name property 3. String tableName 為資料表名稱 4. Htable table 在定義一個HBase的操作變數 5. class !MapClass 為實做map的一個內部類別 6. Path[] listPaths 是個可以列出指定路徑下的檔案和目錄,原本0.16 API即宣告 Deprecated,因此為了解決warning在此實做 7. void runMapReduce(String table, String dir) 跑!MapReduce的程序 8. void creatTable(String table) 建立hbase的資料表 9. void main(String[] args) main 函數 1~4為變數較為單純,之後將說明5~9的函數功能 ---------------------------------------------- {{{ #!java public static void main(String[] args) throws IOException { String table_name = "apache-log2"; String dir = "/user/waue/apache-log"; creatTable(table_name); runMapReduce(table_name, dir); } } }}} 首先看到main函數究竟搞了些什麼?[[br]][[br]] 宣告了table的名稱,要parser的檔案放在'''hdfs'''當中的哪個路徑下,注意此路徑為hdfs,若給的是local file system的路徑的話,程式跑的時候會產生!NullPointer Exception的錯誤。然後呼叫creatTable函數其功能用來創建table,接著跑runMapReduce函數,而整個程式主體就是在runMapReduce. ------- {{{ #!java public static void creatTable(String table) throws IOException { HBaseAdmin admin = new HBaseAdmin(conf); if (!admin.tableExists(new Text(table))) { System.out.println("1. " + table + " table creating ... please wait"); HTableDescriptor tableDesc = new HTableDescriptor(table); tableDesc.addFamily(new HColumnDescriptor("http:")); tableDesc.addFamily(new HColumnDescriptor("url:")); tableDesc.addFamily(new HColumnDescriptor("referrer:")); admin.createTable(tableDesc); } else { System.out.println("1. " + table + " table already exists."); } System.out.println("2. access_log files fetching using map/reduce"); } }}} 此段為建立Hbase 的table,首先建立[http://hadoop.apache.org/hbase/docs/current/api/org/apache/hadoop/hbase/HBaseAdmin.html HBaseAdmin]的 admin物件出來,在用admin的tableExists函數來檢查此資料表是否已經存在,若沒存在的話,則用HTableDescriptor類別建立 tableDesc物件,此物件的參數是table name,可以了解透過addFamily()建立http、url、referrer等column family,最後HBaseAdmin類別中有createTable的函數,將之前的tableDesc物件當參數餵給admin物件的createTable函數則表單建立完成。 --------- {{{ #!java public static void runMapReduce(String table, String dir) throws IOException { Path tempDir = new Path("/tmp/Mylog/"); Path InputDir = new Path(dir); FileSystem fs = FileSystem.get(conf); JobConf jobConf = new JobConf(conf, LogParserGo.class); jobConf.setJobName("apache log fetcher"); jobConf.set(TABLE, table); Path[] in = listPaths(fs, InputDir); if (fs.isFile(InputDir)) { jobConf.setInputPath(InputDir); } else { for (int i = 0; i < in.length; i++) { if (fs.isFile(in[i])) { jobConf.addInputPath(in[i]); } else { Path[] sub = listPaths(fs, in[i]); for (int j = 0; j < sub.length; j++) { if (fs.isFile(sub[j])) { jobConf.addInputPath(sub[j]); } } } } } jobConf.setOutputPath(tempDir); jobConf.setMapperClass(MapClass.class); JobClient client = new JobClient(jobConf); ClusterStatus cluster = client.getClusterStatus(); jobConf.setNumMapTasks(cluster.getMapTasks()); jobConf.setNumReduceTasks(0); JobClient.runJob(jobConf); fs.delete(tempDir); fs.close(); } }}} 這段function code中,[http://hadoop.apache.org/core/docs/r0.16.4/api/org/apache/hadoop/fs/Path.html Path] 是宣告檔案路徑的類別,可以把Path想做是標準JAVA IO的File類別,用來定義檔案路徑。[ http://hadoop.apache.org/core/docs/r0.16.4/api/org/apache/hadoop/fs/FileSystem.html FileSystem] 的物件生成方式不是用new的方式,而是用get(JobConf) 的方法,裡面填的變數是之前宣告過的全域變數HBase的設定參數conf。[[br]][[br]] 後面的參數jobConf 則是重要的map reduce 設定檔,由於其生命週期只在此function,因此並不用移到外面作全域變數。[[br]] JobConf的建構子有很多種型態,詳細可以看[http://hadoop.apache.org/core/docs/r0.16.4/api/org/apache/hadoop/mapred/JobConf.html JobConf官網API] ,一般較基本的範例程式都是用JobConf(Class exampleClass) 即可,但此範例用JobConf(Configuration conf, Class exampleClass) ,是因有個HBaseConfiguration 的conf,HBaseConfiguration為Configuration的子類別,因此conf可以放入JonConf當其設定元素,因此用'''JobConf jobConf = new JobConf(conf, LogParserGo.class);''' 。[[br]][[br]] 接著看到jobConf.set(TABLE, table); jobConf並沒有set的方法,而是其父類別 org.apache.hadoop.conf.Configuration 有,因此用set將環境參數設定進去,按照格式設定table 的名稱。 || jobConf.set|| (TABLE || table) || || || name property || value || || || table.name || table_name || 接下來一整段,都是在設定放在hdfs內要parser的log的檔案路徑。listPaths 這個函數原本在 FileSystem 是有定義的,但hadoop 0.16 API已經predecated,而Hadoop 0.17 API已經徹底拿掉,為了消錯也為了日後升級方便,因此在這類別裡寫了listPaths來用,其功能主要是將 path 裡的檔案或路徑都回傳紀錄到path [] 陣列。因此 isFile() 函數用來判斷是否為檔案,若不是檔案則為目錄,setInputPath 可以設定一個輸入路徑,但若有很多個路徑要加入,則可以用addInputPath[[br]][[br]] output目錄只是中間的暫時產物,因此當程式跑到最後就 fs.delete 來刪除之。[[br]][[br]] client.getClusterStatus(); 此段程式碼是取得環境中有多少個nodes可以使用,並設到設定參數中。[[br]][[br]] 怎麼知道程式跑完map-reduce ,JobClient.runJob() 則是 hadoop 呼叫開始跑map reduce的,如同thread 的run() 相似。 [[br]][[br]] jobConf.setMapperClass(MapClass.class); 是用來設定 mapClass 如何運作,也就是下一段就會介紹到的內部類別 MapClass.class,而像此程式沒有用到reduce,因此不用設定;而若map 或 reduce 其中有一個沒用到其功能的話,也可以設定基本元件 IdentityReducer、IdentityMapper來取代。 ---- {{{ #!java public static class MapClass extends MapReduceBase implements Mapper { public void configure(JobConf job) { tableName = job.get(TABLE, ""); } public void map(WritableComparable key, Text value, OutputCollector output, Reporter reporter) throws IOException { try { LogParser log = new LogParser(value.toString()); if (table == null) table = new HTable(conf, new Text(tableName)); long lockId = table.startUpdate(new Text(log.getIp())); table.put(lockId, new Text("http:protocol"), log.getProtocol() .getBytes()); table.put(lockId, new Text("http:method"), log.getMethod() .getBytes()); table.put(lockId, new Text("http:code"), log.getCode() .getBytes()); table.put(lockId, new Text("http:bytesize"), log.getByteSize() .getBytes()); table.put(lockId, new Text("http:agent"), log.getAgent() .getBytes()); table.put(lockId, new Text("url:" + log.getUrl()), log .getReferrer().getBytes()); table.put(lockId, new Text("referrer:" + log.getReferrer()), log.getUrl().getBytes()); table.commit(lockId, log.getTimestamp()); } catch (Exception e) { e.printStackTrace(); } } } }}} 此內部類別繼承了 [http://hadoop.apache.org/core/docs/r0.16.4/api/org/apache/hadoop/mapred/MapReduceBase.html org.apache.hadoop.mapred.MapReduceBase] ,並實做Mapper 介面, 不見得所有map reduce程式都需要實做此介面,但若有要讓map能分配工作就需要寫在下面此函數中:[[BR]] map(!WritableComparable key, Text value,!OutputCollector output, Reporter reporter) [[BR]] 變數key為hbase中的row key,value則為值,output 可以透過collect() 功能將值寫入hbase的table中。但在此範例中, 並沒有用到 output的寫入方式,reporter也沒有用到。[[br]] 此方法因為有IO的存取,因此要宣告trows !IOException, 且用try來起始。[[br]][[br]] 首先!LogParser log = new !LogParser(value.toString()); value的值為要parser的內容的某一行,因為基於hdfs的map-reduce架構上,hadoop會幫我們把資料整合起來,因此程式的邏輯只要處理好這一行即可。!LogParser 在下面會介紹到,目前只要知道log物件是原始資料value透過 !LogParser 處理過的產物。透過log物件的方法getIP,getProtocol(),...等,我們可以輕易取得需要的資料,用table.put( Row_Key , Column_Qualify_Name , Value) 方法將Value值填入Row_Key中的Column_Qualify_Name欄位中。接著研究table物件。[[br]] table是全域變數之一,由 [http://hadoop.apache.org/hbase/docs/current/api/org/apache/hadoop/hbase/HTable.html org.apache.hadoop.hbase.HTable] 類別定義。產生出HTable物件'''必定要'''給兩個初始化的值,一個是另一個全域變數也是重要的Hbase設定檔conf,另一個是tableName也就是資料表的名稱,當HTable 的 table 物件產生出來之後,我們就可以利用put來放入資料。然而一個新的資料表,要如何給他row_key呢? 因此 table.startUpdate(new Text(log.getIp())) 的功能就是 將 ip設定為table的row_key。有興趣的話可以參考[http://hadoop.apache.org/hbase/docs/current/api/org/apache/hadoop/hbase/HTable.html#startUpdate(org.apache.hadoop.io.Text) 官方的startUpdate說明] [[br]][[br]] 用此方法可以回傳一個型態為Long的值,將他宣告成為lockId變數,之後就可以用他當主key,而將值一個一個輸入到對應的欄位中。因此我們可以把結構看成是這樣 || key\欄位 || http:protocol || http:method || http:code || http:bytesize || http:agent || url: || referrer: || http:code || || row_key || value1 || value2 || value3 || value4 || value5 || value6 || value7 || value8 || 需要注意的是,由於Htable 是三維度的結構,row_key、column、timestamp。因此最後commit時間變數則是利用以下方法:[[br]] table.commit(lockId, log.getTimestamp()),將log內取得的時間update到整個row去。[[br]][[br]] configure(!JobConf conf) 此為override org.apache.hadoop.mapred.MapReduceBase.configure(!JobConf ) 內容只是取得並回傳Table的名字而已 ---- {{{ #!java static public Path[] listPaths(FileSystem fsm, Path path) throws IOException { FileStatus[] fss = fsm.listStatus(path); int length = fss.length; Path[] pi = new Path[length]; for (int i = 0; i < length; i++) { pi[i] = fss[i].getPath(); } return pi; } }}} 此函數用來列出路徑中的目錄或檔案,是利用listStatus實做原本的listPaths。 ---- = LogParser.java = 這個java檔的任務是分析log檔案中的每行資訊 {{{ #!java private String ip; private String protocol; private String method; private String url; private String code; private String byteSize; private String referrer; private String agent; private long timestamp; private static Pattern p = Pattern .compile("([︿ ]*) ([︿ ]*) ([︿ ]*) \\[([︿]]*)\\] \"([︿\"]*)\"" + " ([︿ ]*) ([︿ ]*) \"([︿\"]*)\" \"([︿\"]*)\".*"); }}} 首先先宣告產生一個物件 [http://java.sun.com/javase/6/docs/api/java/util/regex/Pattern.html java.util.regex.Pattern] 這個類別沒有建構子,因此宣告出來之後用compile(String regex)敘述來建立滿足正規表示式的物件,功能說明:[[br]] Compiles the given regular expression into a pattern.[[br]] 將正規表示式的字串當引數輸入之後,就可以得到一個p的Pattern物件,而此正規表示式:[[br]] '''([︿ ]*) ([︿ ]*) ([︿ ]*) \\[([︿]]*)\\] \"([︿\"]*)\" ([︿ ]*) ([︿ ]*) \"([︿\"]*)\" \"([︿\"]*)\".*''' [[br]] 若apache log範例為:[[br]] 140.110.138.176 - - [02/Jul/2008:16:55:02 +0800] "GET /hbase-0.1.3.zip HTTP/1.0" 200 10249801 "-" "Wget/1.10.2" [[br]] 則此正規表示法可看成[[br]] || ([︿ ]*) || ([︿ ]*) || ([︿ ]*) || \\[([︿]]*)\\] || \"([︿\"]*)\" || ([︿ ]*) || ([︿ ]*) || \"([︿\"]*)\" || \"([︿\"]*)\".* || || ip || - || - || 時間 || "http " || 回傳碼 || 長度 || "指引" || "代理器" || || 140.110.138.176 || - || - || [02/Jul/2008:16:55:02 +0800] || "GET /hbase-0.1.3.zip HTTP/1.0" || 200 || 10249801 || " -" || "Wget/1.10.2" || 在此可以把Pattern 當成是一個雛型類別,用compiler(表示式) 則告知了 以"表示式"為規則產生一個p的模板出來 ---------------------------- {{{ #!java public LogParser(String line) throws ParseException, Exception{ Matcher matcher = p.matcher(line); if(matcher.matches()){ this.ip = matcher.group(1); // IP address of the client requesting the web page. if(isIpAddress(ip)){ SimpleDateFormat sdf = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss Z",Locale.US); this.timestamp = sdf.parse(matcher.group(4)).getTime(); String[] http = matcher.group(5).split(" "); this.method = http[0]; this.url = http[1]; this.protocol = http[2]; this.code = matcher.group(6); this.byteSize = matcher.group(7); this.referrer = matcher.group(8); this.agent = matcher.group(9); } } } }}} 接著定義建構子,宣告了一個 [http://java.sun.com/javase/6/docs/api/java/util/regex/Matcher.html java.util.regex.Matcher] 此物件可以用來與之前的 Pattern搭配。[[br]] 剛剛宣告的模板p有個函數 matcher(String) ,此功能會將材料(String敘述 )壓印成模板的形狀,並把這個壓出物件叫做matcher。 之後要取用matcher的第n段,只要用matcher.group(n)就可以把第n段的內容以String的形式取回。[[br]] 回頭對照傳近來的內容 || 1 || 2 || 3 || 4 || 5 || 6 || 7 || 8 || 9 || || ip || - || - || 時間 || "http " || 回傳碼 || 長度 || "指引" || "代理器" || || 140.110.138.176 || - || - || [02/Jul/2008:16:55:02 +0800] || "GET /hbase-0.1.3.zip HTTP/1.0" || 200 || 10249801 || " -" || "Wget/1.10.2" || 之後就很顯而易見,用matcher.group(n)取得值後,一一的用this.參數來作設定,但其實不用this 編譯依然能過關,只是習慣在建構子內用到該class的參數會這麼用(以跟繼承到父類別的參數作區別?)其中時間需要用!SimpleDateFormat小轉譯一下,http的內容需要用split()來作更細部的分解。 ---- {{{ #!java public static boolean isIpAddress(String inputString) { StringTokenizer tokenizer = new StringTokenizer(inputString, "."); if (tokenizer.countTokens() != 4) { return false; } try { for (int i = 0; i < 4; i++) { String t = tokenizer.nextToken(); int chunk = Integer.parseInt(t); if ((chunk & 255) != chunk) { return false; } } } catch (NumberFormatException e) { return false; } if (inputString.indexOf("..") >= 0) { return false; } return true; } } }}} 此函數用來檢查IP的格式是否正確而已