/**
 * Copyright 2007 The Apache Software Foundation
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package tw.org.nchc.demo;

import java.io.IOException;
import java.text.ParseException;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseAdmin;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTable;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

import tw.org.nchc.code.Convert;

/**
 * Access_log fetcher. TODO: FgnStatLog, Error_log, Access_log (Default,
 * W3CExtended, IISw3cExtended)
 */
public class LogFetcher {
	static HBaseConfiguration conf = new HBaseConfiguration();

	public static final String TABLE = "table.name";

	static String tableName;

	static HTable table = null;

	static boolean eclipseRun = false;

	public static class MapClass extends MapReduceBase implements
			Mapper<WritableComparable, Text, Text, Writable> {

		@Override
		public void configure(JobConf job) {
			tableName = job.get(TABLE, "");
		}

		public void map(WritableComparable key, Text value,
				OutputCollector<Text, Writable> output, Reporter reporter)
				throws IOException {
			try {
				AccessLogParser log = new AccessLogParser(value.toString());
				if (table == null)
					table = new HTable(conf, new Text(tableName));
				long lockId = table.startUpdate(new Text(log.getIp()));
				table.put(lockId, new Text("http:protocol"), log.getProtocol()
						.getBytes());
				table.put(lockId, new Text("http:method"), log.getMethod()
						.getBytes());
				table.put(lockId, new Text("http:code"), log.getCode()
						.getBytes());
				table.put(lockId, new Text("http:bytesize"), log.getByteSize()
						.getBytes());
				table.put(lockId, new Text("http:agent"), log.getAgent()
						.getBytes());
				table.put(lockId, new Text("url:" + log.getUrl()), log
						.getReferrer().getBytes());
				table.put(lockId, new Text("referrer:" + log.getReferrer()),
						log.getUrl().getBytes());
				table.commit(lockId, log.getTimestamp());
			} catch (ParseException e) {
				e.printStackTrace();
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}

	public static void runMapReduce(String table, String dir)
			throws IOException {
		Path tempDir = new Path("log/temp");
		Path InputDir = new Path(dir);
		FileSystem fs = FileSystem.get(conf);
		JobConf jobConf = new JobConf(conf, LogFetcher.class);
		jobConf.setJobName("apache log fetcher");
		jobConf.set(TABLE, table);
		// my convert function from 0.16 to 0.17
		Path[] in = Convert.listPaths(fs, InputDir);
		if (fs.isFile(InputDir)) {
			// 0.16
//			jobConf.setInputPath(InputDir);
			Convert.setInputPath(jobConf, InputDir);
		} else {
			for (int i = 0; i < in.length; i++) {
				if (fs.isFile(in[i])) {
					// 0.16
//					jobConf.addInputPath(in[i]);
					Convert.addInputPath(jobConf,in[i]);
				} else {
					// my convert function from 0.16 to 0.17
					Path[] sub = Convert.listPaths(fs, in[i]);
					for (int j = 0; j < sub.length; j++) {
						if (fs.isFile(sub[j])) {
							// 0.16
//							jobConf.addInputPath(sub[j]);
							Convert.addInputPath(jobConf, sub[j]);
						}
					}
				}
			}
		}
		// 0.16
//		jobConf.setOutputPath(tempDir);
		Convert.setOutputPath(jobConf, tempDir);
		
		jobConf.setMapperClass(MapClass.class);

		JobClient client = new JobClient(jobConf);
		ClusterStatus cluster = client.getClusterStatus();
		jobConf.setNumMapTasks(cluster.getMapTasks());
		jobConf.setNumReduceTasks(0);

		JobClient.runJob(jobConf);
		// 0.16
//		fs.delete(tempDir);
		fs.delete(tempDir,true);
		
		fs.close();
	}

	public static void creatTable(String table) throws IOException {
		HBaseAdmin admin = new HBaseAdmin(conf);
		if (!admin.tableExists(new Text(table))) {
			System.out.println("1. " + table
					+ " table creating ... please wait");
			HTableDescriptor tableDesc = new HTableDescriptor(table);
			tableDesc.addFamily(new HColumnDescriptor("http:"));
			tableDesc.addFamily(new HColumnDescriptor("url:"));
			tableDesc.addFamily(new HColumnDescriptor("referrer:"));
			admin.createTable(tableDesc);
		} else {
			System.out.println("1. " + table + " table already exists.");
		}
		System.out.println("2. access_log files fetching using map/reduce");
	}

	@SuppressWarnings("deprecation")
	public static void main(String[] args) throws IOException {
		String table_name = "log";
		String dir = "apache-log";

		if (eclipseRun) {
			table_name = "log";
			dir = "apache-log";
		} else if (args.length < 2) {
			System.out
					.println("Usage: logfetcher <access_log file or directory> <table_name>");
			System.exit(1);
		} else {
			table_name = args[1];
			dir = args[0];
		}
		creatTable(table_name);
		runMapReduce(table_name, dir);

	}

}
