wiki:NCHCCloudCourse100928_4_EXM3_sol
package org.nchc.hadoop;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class HelloHadoopV3 {

  public static void main(String[] args) throws IOException,
      InterruptedException, ClassNotFoundException {
    
    // eclipse using
//    String[] argv = {"/home/hadoop/input","/home/hadoop/output-hh3"};
//    args = argv;
    
    String hdfs_input = "HH3_input";
    String hdfs_output = "HH3_output";
     
    Configuration conf = new Configuration();
    // 宣告取得參數
    String[] otherArgs = new GenericOptionsParser(conf, args)
        .getRemainingArgs();
    // 如果參數數量不為2 則印出提示訊息
    if (otherArgs.length != 2) {
      System.err
          .println("Usage: hadoop jar HelloHadoopV3.jar <local_input> <local_output>");
      System.exit(2);
    }
    Job job = new Job(conf, "Hadoop Hello World");
    job.setJarByClass(HelloHadoopV3.class);
    // set map and reduce class
    job.setMapperClass(HelloMapperV2.class); // 輸入正確的mapper
    job.setCombinerClass(HelloReducerV2.class); // 輸入正確的 class
    job.setReducerClass(HelloReducerV2.class); // 輸入正確的 reducer

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    
    
    // 用  checkAndDelete 函式防止overhead的錯誤
    CheckAndDelete.checkAndDelete(hdfs_input, conf);
    CheckAndDelete.checkAndDelete(hdfs_output, conf);
    
    // 放檔案到hdfs
    PutToHdfs.putToHdfs(args[0], hdfs_input, conf);
    
    // 設定hdfs 的輸入輸出來源路定
    FileInputFormat.addInputPath(job, new Path(hdfs_input));
    FileOutputFormat.setOutputPath(job, new Path(hdfs_output));

    
    long start = System.nanoTime();

    job.waitForCompletion(true);
    
    // 把hdfs的結果取下
    GetFromHdfs.getFromHdfs(hdfs_output, args[1], conf);

    boolean status = job.waitForCompletion(true);
    // 計算時間
    if (status) {
      System.err.println("Integrate Alert Job Finished !");
      long time = System.nanoTime() - start;
      System.err.println(time * (1E-9) + " secs.");

    } else {
      System.err.println("Integrate Alert Job Failed !");
      System.exit(1);
    }
  }
}


Last modified 13 years ago Last modified on Jul 21, 2011, 5:46:48 PM